Compare commits

...

24 Commits

Author SHA1 Message Date
Eric Eastwood
3e3e11ff7c Fix make_fake_db_pool 2025-07-25 15:59:42 -05:00
Eric Eastwood
bc8c770223 Fix leftover missing self.server_name now that #18656 is merged 2025-07-25 15:45:37 -05:00
Eric Eastwood
12a9a8ce7e Merge branch 'develop' into madlittlemods/18592-refactor-histogram
Conflicts:
	scripts-dev/mypy_synapse_plugin.py
	synapse/http/request_metrics.py
	synapse/replication/tcp/external_cache.py
	synapse/storage/database.py
2025-07-25 15:43:41 -05:00
Eric Eastwood
4c84ffae69 Remove debug log 2025-07-24 15:36:36 -05:00
Eric Eastwood
4d63c55b6d Merge branch 'develop' into madlittlemods/18592-refactor-histogram 2025-07-24 11:27:44 -05:00
Eric Eastwood
4b0a4bbcc3 Add changelog 2025-07-23 16:27:43 -05:00
Eric Eastwood
adf1028fa7 Make self.server_name available for synapse/storage/database.py 2025-07-23 16:27:43 -05:00
Eric Eastwood
99b862e437 Fill in synapse/util/ratelimitutils.py
Wait for https://github.com/element-hq/synapse/pull/18656 to merge
so we have access to `self.server_name`
2025-07-23 16:27:43 -05:00
Eric Eastwood
36155d5cb9 Fill in synapse/storage/controllers/persist_events.py 2025-07-23 16:27:43 -05:00
Eric Eastwood
a1fb7d41ff Fill in synapse/storage/database.py 2025-07-23 16:27:42 -05:00
Eric Eastwood
8fc294413d Fill in synapse/state/__init__.py 2025-07-23 16:27:42 -05:00
Eric Eastwood
4d4ada23c5 Fill in synapse/rest/client/room.py 2025-07-23 16:27:42 -05:00
Eric Eastwood
30f500894d Fill in synapse/replication/tcp/external_cache.py 2025-07-23 16:27:42 -05:00
Eric Eastwood
cba3c659ec Fill in synapse/metrics/_reactor_metrics.py 2025-07-23 16:27:42 -05:00
Eric Eastwood
8c5beeea81 Fill in synapse/metrics/_gc.py 2025-07-23 16:27:42 -05:00
Eric Eastwood
31bcbbdeff Support labelnames argument being a Tuple expression
```
synapse/metrics/__init__.py:522: error: Expected the `labelnames` argument of Histogram to be a list of label names (including `SERVER_NAME_LABEL`), but got TupleExpr:528(
  StrExpr(type)
  StrExpr(reason)
  NameExpr(SERVER_NAME_LABEL [synapse.metrics.SERVER_NAME_LABEL])). If this is a process-level metric (vs homeserver-level), use a type ignore comment to disable this check.  [missing-server-name-label]
```
2025-07-23 16:27:42 -05:00
Eric Eastwood
fa739e2412 Fill in synapse/metrics/__init__.py 2025-07-23 16:27:42 -05:00
Eric Eastwood
1f4cc8f752 Fill in `synapse/http/request_metrics.py
Wait for https://github.com/element-hq/synapse/pull/18656 to be merged
so we have `self.server_name` available.
2025-07-23 16:27:42 -05:00
Eric Eastwood
68fa7ec81f Fill in synapse/handlers/sliding_sync/__init__.py 2025-07-23 16:27:33 -05:00
Eric Eastwood
04b95acbd2 Fill in synapse/handlers/federation.py 2025-07-23 16:27:33 -05:00
Eric Eastwood
b699e8bec7 Fill in synapse/handlers/federation_event.py 2025-07-23 16:27:33 -05:00
Eric Eastwood
ee223c6b90 Fill in synapse/federation/federation_server.py
Fix `synapse/federation/federation_server.py`
2025-07-23 16:27:32 -05:00
Eric Eastwood
8d209650cd Fill in synapse/api/auth/msc3861_delegated.py 2025-07-23 14:56:39 -05:00
Eric Eastwood
53f9c4c458 Add in base linting for metrics
From https://github.com/element-hq/synapse/pull/18656
2025-07-23 14:51:04 -05:00
29 changed files with 275 additions and 91 deletions

1
changelog.d/18724.misc Normal file
View File

@@ -0,0 +1 @@
Refactor `Histogram` metrics to be homeserver-scoped.

View File

@@ -28,7 +28,7 @@ from typing import Callable, Optional, Tuple, Type, Union
import mypy.types
from mypy.erasetype import remove_instance_last_known_values
from mypy.errorcodes import ErrorCode
from mypy.nodes import ARG_NAMED_OPT, ListExpr, NameExpr, TempNode, Var
from mypy.nodes import ARG_NAMED_OPT, ListExpr, NameExpr, TempNode, TupleExpr, Var
from mypy.plugin import (
FunctionLike,
FunctionSigContext,
@@ -61,6 +61,7 @@ class SynapsePlugin(Plugin):
) -> Optional[Callable[[FunctionSigContext], FunctionLike]]:
if fullname in (
"prometheus_client.metrics.Counter",
"prometheus_client.metrics.Histogram",
# TODO: Add other prometheus_client metrics that need checking as we
# refactor, see https://github.com/element-hq/synapse/issues/18592
):
@@ -136,7 +137,7 @@ def check_prometheus_metric_instantiation(ctx: FunctionSigContext) -> CallableTy
# ]
# ```
labelnames_arg_expression = ctx.args[2][0] if len(ctx.args[2]) > 0 else None
if isinstance(labelnames_arg_expression, ListExpr):
if isinstance(labelnames_arg_expression, (ListExpr, TupleExpr)):
# Check if the `labelnames` argument includes the `server_name` label (`SERVER_NAME_LABEL`).
for labelname_expression in labelnames_arg_expression.items:
if (

View File

@@ -29,19 +29,21 @@ import attr
from synapse.config._base import (
Config,
ConfigError,
RootConfig,
find_config_files,
read_config_files,
)
from synapse.config.database import DatabaseConfig
from synapse.config.server import ServerConfig
from synapse.storage.database import DatabasePool, LoggingTransaction, make_conn
from synapse.storage.engines import create_engine
class ReviewConfig(RootConfig):
"A config class that just pulls out the database config"
"A config class that just pulls out the server and database config"
config_classes = [DatabaseConfig]
config_classes = [ServerConfig, DatabaseConfig]
@attr.s(auto_attribs=True)
@@ -148,6 +150,10 @@ def main() -> None:
config_dict = read_config_files(config_files)
config.parse_config_dict(config_dict, "", "")
server_name = config.server.server_name
if not isinstance(server_name, str):
raise ConfigError("Must be a string", ("server_name",))
since_ms = time.time() * 1000 - Config.parse_duration(config_args.since)
exclude_users_with_email = config_args.exclude_emails
exclude_users_with_appservice = config_args.exclude_app_service
@@ -159,7 +165,12 @@ def main() -> None:
engine = create_engine(database_config.config)
with make_conn(database_config, engine, "review_recent_signups") as db_conn:
with make_conn(
db_config=database_config,
engine=engine,
default_txn_name="review_recent_signups",
server_name=server_name,
) as db_conn:
# This generates a type of Cursor, not LoggingTransaction.
user_infos = get_recent_users(
db_conn.cursor(),

View File

@@ -672,8 +672,14 @@ class Porter:
engine = create_engine(db_config.config)
hs = MockHomeserver(self.hs_config)
server_name = hs.hostname
with make_conn(db_config, engine, "portdb") as db_conn:
with make_conn(
db_config=db_config,
engine=engine,
default_txn_name="portdb",
server_name=server_name,
) as db_conn:
engine.check_database(
db_conn, allow_outdated_version=allow_outdated_version
)

View File

@@ -47,6 +47,7 @@ from synapse.logging.opentracing import (
inject_request_headers,
start_active_span,
)
from synapse.metrics import SERVER_NAME_LABEL
from synapse.synapse_rust.http_client import HttpClient
from synapse.types import Requester, UserID, create_requester
from synapse.util import json_decoder
@@ -62,7 +63,7 @@ logger = logging.getLogger(__name__)
introspection_response_timer = Histogram(
"synapse_api_auth_delegated_introspection_response",
"Time taken to get a response for an introspection request",
["code"],
labelnames=["code", SERVER_NAME_LABEL],
)
@@ -341,17 +342,23 @@ class MSC3861DelegatedAuth(BaseAuth):
)
except HttpResponseException as e:
end_time = self._clock.time()
introspection_response_timer.labels(e.code).observe(end_time - start_time)
introspection_response_timer.labels(
code=e.code, **{SERVER_NAME_LABEL: self.server_name}
).observe(end_time - start_time)
raise
except Exception:
end_time = self._clock.time()
introspection_response_timer.labels("ERR").observe(end_time - start_time)
introspection_response_timer.labels(
code="ERR", **{SERVER_NAME_LABEL: self.server_name}
).observe(end_time - start_time)
raise
logger.debug("Fetched token from MAS")
end_time = self._clock.time()
introspection_response_timer.labels(200).observe(end_time - start_time)
introspection_response_timer.labels(
code=200, **{SERVER_NAME_LABEL: self.server_name}
).observe(end_time - start_time)
resp = json_decoder.decode(resp_body.decode("utf-8"))

View File

@@ -122,6 +122,7 @@ received_queries_counter = Counter(
pdu_process_time = Histogram(
"synapse_federation_server_pdu_process_time",
"Time taken to process an event",
labelnames=[SERVER_NAME_LABEL],
)
last_pdu_ts_metric = Gauge(
@@ -1322,9 +1323,9 @@ class FederationServer(FederationBase):
origin, event.event_id
)
if received_ts is not None:
pdu_process_time.observe(
(self._clock.time_msec() - received_ts) / 1000
)
pdu_process_time.labels(
**{SERVER_NAME_LABEL: self.server_name}
).observe((self._clock.time_msec() - received_ts) / 1000)
next = await self._get_next_nonspam_staged_event_for_room(
room_id, room_version

View File

@@ -661,7 +661,8 @@ class FederationSender(AbstractFederationSender):
ts = event_to_received_ts[event.event_id]
assert ts is not None
synapse.metrics.event_processing_lag_by_event.labels(
"federation_sender"
name="federation_sender",
**{SERVER_NAME_LABEL: self.server_name},
).observe((now - ts) / 1000)
async def handle_room_events(events: List[EventBase]) -> None:

View File

@@ -187,7 +187,8 @@ class ApplicationServicesHandler:
assert ts is not None
synapse.metrics.event_processing_lag_by_event.labels(
"appservice_sender"
name="appservice_sender",
**{SERVER_NAME_LABEL: self.server_name},
).observe((now - ts) / 1000)
async def handle_room_events(events: Iterable[EventBase]) -> None:

View File

@@ -71,6 +71,7 @@ from synapse.handlers.pagination import PURGE_PAGINATION_LOCK_NAME
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
from synapse.metrics import SERVER_NAME_LABEL
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.module_api import NOT_SPAM
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
@@ -90,7 +91,7 @@ logger = logging.getLogger(__name__)
backfill_processing_before_timer = Histogram(
"synapse_federation_backfill_processing_before_time_seconds",
"sec",
[],
labelnames=[SERVER_NAME_LABEL],
buckets=(
0.1,
0.5,
@@ -533,9 +534,9 @@ class FederationHandler:
# backfill points regardless of `current_depth`.
if processing_start_time is not None:
processing_end_time = self.clock.time_msec()
backfill_processing_before_timer.observe(
(processing_end_time - processing_start_time) / 1000
)
backfill_processing_before_timer.labels(
**{SERVER_NAME_LABEL: self.server_name}
).observe((processing_end_time - processing_start_time) / 1000)
success = await try_backfill(likely_domains)
if success:

View File

@@ -113,7 +113,7 @@ soft_failed_event_counter = Counter(
backfill_processing_after_timer = Histogram(
"synapse_federation_backfill_processing_after_time_seconds",
"sec",
[],
labelnames=[SERVER_NAME_LABEL],
buckets=(
0.1,
0.25,
@@ -692,7 +692,9 @@ class FederationEventHandler:
if not events:
return
with backfill_processing_after_timer.time():
with backfill_processing_after_timer.labels(
**{SERVER_NAME_LABEL: self.server_name}
).time():
# if there are any events in the wrong room, the remote server is buggy and
# should not be trusted.
for ev in events:

View File

@@ -38,6 +38,7 @@ from synapse.logging.opentracing import (
tag_args,
trace,
)
from synapse.metrics import SERVER_NAME_LABEL
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.storage.databases.main.stream import PaginateFunction
@@ -79,7 +80,7 @@ logger = logging.getLogger(__name__)
sync_processing_time = Histogram(
"synapse_sliding_sync_processing_time",
"Time taken to generate a sliding sync response, ignoring wait times.",
["initial"],
labelnames=["initial", SERVER_NAME_LABEL],
)
# Limit the number of state_keys we should remember sending down the connection for each
@@ -94,6 +95,7 @@ MAX_NUMBER_PREVIOUS_STATE_KEYS_TO_REMEMBER = 100
class SlidingSyncHandler:
def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.store = hs.get_datastores().main
self.storage_controllers = hs.get_storage_controllers()
@@ -368,9 +370,9 @@ class SlidingSyncHandler:
set_tag(SynapseTags.FUNC_ARG_PREFIX + "sync_config.user", user_id)
end_time_s = self.clock.time()
sync_processing_time.labels(from_token is not None).observe(
end_time_s - start_time_s
)
sync_processing_time.labels(
initial=from_token is not None, **{SERVER_NAME_LABEL: self.server_name}
).observe(end_time_s - start_time_s)
return sliding_sync_result

View File

@@ -236,9 +236,11 @@ class RequestMetrics:
response_count.labels(**response_base_labels).inc()
response_timer.labels(code=response_code_str, **response_base_labels).observe(
time_sec - self.start_ts
)
response_timer.labels(
code=response_code_str,
**response_base_labels,
**{SERVER_NAME_LABEL: self.our_server_name},
).observe(time_sec - self.start_ts)
resource_usage = context.get_resource_usage()

View File

@@ -512,7 +512,7 @@ event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
event_processing_lag_by_event = Histogram(
"synapse_event_processing_lag_by_event",
"Time between an event being persisted and it being queued up to be sent to the relevant remote servers",
["name"],
labelnames=["name", SERVER_NAME_LABEL],
)
# Build info of the running server.
@@ -532,7 +532,7 @@ threepid_send_requests = Histogram(
" there is a request with try count of 4, then there would have been one"
" each for 1, 2 and 3",
buckets=(1, 2, 3, 4, 5, 10),
labelnames=("type", "reason"),
labelnames=("type", "reason", SERVER_NAME_LABEL),
)
threadpool_total_threads = Gauge(

View File

@@ -55,7 +55,8 @@ running_on_pypy = platform.python_implementation() == "PyPy"
#
gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"])
gc_time = Histogram(
# This is a process-level metric, so it does not have the `SERVER_NAME_LABEL`.
gc_time = Histogram( # type: ignore[missing-server-name-label]
"python_gc_time",
"Time taken to GC (sec)",
["gen"],

View File

@@ -62,7 +62,8 @@ logger = logging.getLogger(__name__)
# Twisted reactor metrics
#
tick_time = Histogram(
# This is a process-level metric, so it does not have the `SERVER_NAME_LABEL`.
tick_time = Histogram( # type: ignore[missing-server-name-label]
"python_twisted_reactor_tick_time",
"Tick time of the Twisted reactor (sec)",
buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5],

View File

@@ -49,7 +49,7 @@ get_counter = Counter(
response_timer = Histogram(
"synapse_external_cache_response_time_seconds",
"Time taken to get a response from Redis for a cache get/set request",
labelnames=["method"],
labelnames=["method", SERVER_NAME_LABEL],
buckets=(
0.001,
0.002,
@@ -110,7 +110,9 @@ class ExternalCache:
"ExternalCache.set",
tags={opentracing.SynapseTags.CACHE_NAME: cache_name},
):
with response_timer.labels("set").time():
with response_timer.labels(
method="set", **{SERVER_NAME_LABEL: self.server_name}
).time():
return await make_deferred_yieldable(
self._redis_connection.set(
self._get_redis_key(cache_name, key),
@@ -129,7 +131,9 @@ class ExternalCache:
"ExternalCache.get",
tags={opentracing.SynapseTags.CACHE_NAME: cache_name},
):
with response_timer.labels("get").time():
with response_timer.labels(
method="get", **{SERVER_NAME_LABEL: self.server_name}
).time():
result = await make_deferred_yieldable(
self._redis_connection.get(self._get_redis_key(cache_name, key))
)

View File

@@ -47,7 +47,7 @@ from synapse.http.servlet import (
parse_string,
)
from synapse.http.site import SynapseRequest
from synapse.metrics import threepid_send_requests
from synapse.metrics import SERVER_NAME_LABEL, threepid_send_requests
from synapse.push.mailer import Mailer
from synapse.types import JsonDict
from synapse.types.rest import RequestBodyModel
@@ -76,6 +76,7 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__()
self.hs = hs
self.server_name = hs.hostname
self.datastore = hs.get_datastores().main
self.config = hs.config
self.identity_handler = hs.get_identity_handler()
@@ -136,9 +137,11 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
self.mailer.send_password_reset_mail,
body.next_link,
)
threepid_send_requests.labels(type="email", reason="password_reset").observe(
body.send_attempt
)
threepid_send_requests.labels(
type="email",
reason="password_reset",
**{SERVER_NAME_LABEL: self.server_name},
).observe(body.send_attempt)
# Wrap the session id in a JSON object
return 200, {"sid": sid}
@@ -325,6 +328,7 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__()
self.hs = hs
self.server_name = hs.hostname
self.config = hs.config
self.identity_handler = hs.get_identity_handler()
self.store = self.hs.get_datastores().main
@@ -394,9 +398,11 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
body.next_link,
)
threepid_send_requests.labels(type="email", reason="add_threepid").observe(
body.send_attempt
)
threepid_send_requests.labels(
type="email",
reason="add_threepid",
**{SERVER_NAME_LABEL: self.server_name},
).observe(body.send_attempt)
# Wrap the session id in a JSON object
return 200, {"sid": sid}
@@ -407,6 +413,7 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.server_name = hs.hostname
super().__init__()
self.store = self.hs.get_datastores().main
self.identity_handler = hs.get_identity_handler()
@@ -469,9 +476,11 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
body.next_link,
)
threepid_send_requests.labels(type="msisdn", reason="add_threepid").observe(
body.send_attempt
)
threepid_send_requests.labels(
type="msisdn",
reason="add_threepid",
**{SERVER_NAME_LABEL: self.server_name},
).observe(body.send_attempt)
logger.info("MSISDN %s: got response from identity server: %s", msisdn, ret)
return 200, ret

View File

@@ -56,7 +56,7 @@ from synapse.http.servlet import (
parse_string,
)
from synapse.http.site import SynapseRequest
from synapse.metrics import threepid_send_requests
from synapse.metrics import SERVER_NAME_LABEL, threepid_send_requests
from synapse.push.mailer import Mailer
from synapse.types import JsonDict
from synapse.util.msisdn import phone_number_to_msisdn
@@ -82,6 +82,7 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__()
self.hs = hs
self.server_name = hs.hostname
self.identity_handler = hs.get_identity_handler()
self.config = hs.config
@@ -163,9 +164,11 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
next_link,
)
threepid_send_requests.labels(type="email", reason="register").observe(
send_attempt
)
threepid_send_requests.labels(
type="email",
reason="register",
**{SERVER_NAME_LABEL: self.server_name},
).observe(send_attempt)
# Wrap the session id in a JSON object
return 200, {"sid": sid}
@@ -177,6 +180,7 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__()
self.hs = hs
self.server_name = hs.hostname
self.identity_handler = hs.get_identity_handler()
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
@@ -240,9 +244,11 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet):
next_link,
)
threepid_send_requests.labels(type="msisdn", reason="register").observe(
send_attempt
)
threepid_send_requests.labels(
type="msisdn",
reason="register",
**{SERVER_NAME_LABEL: self.server_name},
).observe(send_attempt)
return 200, ret

View File

@@ -65,6 +65,7 @@ from synapse.http.servlet import (
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import set_tag
from synapse.metrics import SERVER_NAME_LABEL
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.client._base import client_patterns
from synapse.rest.client.transactions import HttpTransactionCache
@@ -120,7 +121,7 @@ messsages_response_timer = Histogram(
# picture of /messages response time for bigger rooms. We don't want the
# tiny rooms that can always respond fast skewing our results when we're trying
# to optimize the bigger cases.
["room_size"],
labelnames=["room_size", SERVER_NAME_LABEL],
buckets=(
0.005,
0.01,
@@ -801,6 +802,7 @@ class RoomMessageListRestServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__()
self._hs = hs
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.pagination_handler = hs.get_pagination_handler()
self.auth = hs.get_auth()
@@ -849,7 +851,8 @@ class RoomMessageListRestServlet(RestServlet):
processing_end_time = self.clock.time_msec()
room_member_count = await make_deferred_yieldable(room_member_count_deferred)
messsages_response_timer.labels(
room_size=_RoomSize.from_member_count(room_member_count)
room_size=_RoomSize.from_member_count(room_member_count),
**{SERVER_NAME_LABEL: self.server_name},
).observe((processing_end_time - processing_start_time) / 1000)
return 200, msgs

View File

@@ -75,6 +75,7 @@ metrics_logger = logging.getLogger("synapse.state.metrics")
state_groups_histogram = Histogram(
"synapse_state_number_state_groups_in_resolution",
"Number of state groups used when performing a state resolution",
labelnames=[SERVER_NAME_LABEL],
buckets=(1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"),
)
@@ -620,10 +621,12 @@ _biggest_room_by_db_counter = Counter(
_cpu_times = Histogram(
"synapse_state_res_cpu_for_all_rooms_seconds",
"CPU time (utime+stime) spent computing a single state resolution",
labelnames=[SERVER_NAME_LABEL],
)
_db_times = Histogram(
"synapse_state_res_db_for_all_rooms_seconds",
"Database time spent computing a single state resolution",
labelnames=[SERVER_NAME_LABEL],
)
@@ -738,7 +741,9 @@ class StateResolutionHandler:
f"State groups have been deleted: {shortstr(missing_state_groups)}"
)
state_groups_histogram.observe(len(state_groups_ids))
state_groups_histogram.labels(
**{SERVER_NAME_LABEL: self.server_name}
).observe(len(state_groups_ids))
new_state = await self.resolve_events_with_store(
room_id,
@@ -825,8 +830,12 @@ class StateResolutionHandler:
room_metrics.db_time += rusage.db_txn_duration_sec
room_metrics.db_events += rusage.evt_db_fetch_count
_cpu_times.observe(rusage.ru_utime + rusage.ru_stime)
_db_times.observe(rusage.db_txn_duration_sec)
_cpu_times.labels(**{SERVER_NAME_LABEL: self.server_name}).observe(
rusage.ru_utime + rusage.ru_stime
)
_db_times.labels(**{SERVER_NAME_LABEL: self.server_name}).observe(
rusage.db_txn_duration_sec
)
def _report_metrics(self) -> None:
if not self._state_res_metrics:

View File

@@ -106,6 +106,7 @@ state_delta_reuse_delta_counter = Counter(
forward_extremities_counter = Histogram(
"synapse_storage_events_forward_extremities_persisted",
"Number of forward extremities for each new event",
labelnames=[SERVER_NAME_LABEL],
buckets=(1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"),
)
@@ -114,6 +115,7 @@ forward_extremities_counter = Histogram(
stale_forward_extremities_counter = Histogram(
"synapse_storage_events_stale_forward_extremities_persisted",
"Number of unchanged forward extremities for each new event",
labelnames=[SERVER_NAME_LABEL],
buckets=(0, 1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"),
)
@@ -847,9 +849,13 @@ class EventsPersistenceStorageController:
# We only update metrics for events that change forward extremities
# (e.g. we ignore backfill/outliers/etc)
if result != latest_event_ids:
forward_extremities_counter.observe(len(result))
forward_extremities_counter.labels(
**{SERVER_NAME_LABEL: self.server_name}
).observe(len(result))
stale = latest_event_ids & result
stale_forward_extremities_counter.observe(len(stale))
stale_forward_extremities_counter.labels(
**{SERVER_NAME_LABEL: self.server_name}
).observe(len(stale))
return result

View File

@@ -82,9 +82,13 @@ sql_logger = logging.getLogger("synapse.storage.SQL")
transaction_logger = logging.getLogger("synapse.storage.txn")
perf_logger = logging.getLogger("synapse.storage.TIME")
sql_scheduling_timer = Histogram("synapse_storage_schedule_time", "sec")
sql_scheduling_timer = Histogram(
"synapse_storage_schedule_time", "sec", labelnames=[SERVER_NAME_LABEL]
)
sql_query_timer = Histogram("synapse_storage_query_time", "sec", ["verb"])
sql_query_timer = Histogram(
"synapse_storage_query_time", "sec", labelnames=["verb", SERVER_NAME_LABEL]
)
sql_txn_count = Counter(
"synapse_storage_transaction_time_count",
"sec",
@@ -126,9 +130,11 @@ class _PoolConnection(Connection):
def make_pool(
*,
reactor: IReactorCore,
db_config: DatabaseConnectionConfig,
engine: BaseDatabaseEngine,
server_name: str,
) -> adbapi.ConnectionPool:
"""Get the connection pool for the database."""
@@ -142,7 +148,12 @@ def make_pool(
# etc.
with LoggingContext("db.on_new_connection"):
engine.on_new_connection(
LoggingDatabaseConnection(conn, engine, "on_new_connection")
LoggingDatabaseConnection(
conn=conn,
engine=engine,
default_txn_name="on_new_connection",
server_name=server_name,
)
)
connection_pool = adbapi.ConnectionPool(
@@ -158,9 +169,11 @@ def make_pool(
def make_conn(
*,
db_config: DatabaseConnectionConfig,
engine: BaseDatabaseEngine,
default_txn_name: str,
server_name: str,
) -> "LoggingDatabaseConnection":
"""Make a new connection to the database and return it.
@@ -174,13 +187,18 @@ def make_conn(
if not k.startswith("cp_")
}
native_db_conn = engine.module.connect(**db_params)
db_conn = LoggingDatabaseConnection(native_db_conn, engine, default_txn_name)
db_conn = LoggingDatabaseConnection(
conn=native_db_conn,
engine=engine,
default_txn_name=default_txn_name,
server_name=server_name,
)
engine.on_new_connection(db_conn)
return db_conn
@attr.s(slots=True, auto_attribs=True)
@attr.s(slots=True, auto_attribs=True, kw_only=True)
class LoggingDatabaseConnection:
"""A wrapper around a database connection that returns `LoggingTransaction`
as its cursor class.
@@ -191,6 +209,7 @@ class LoggingDatabaseConnection:
conn: Connection
engine: BaseDatabaseEngine
default_txn_name: str
server_name: str
def cursor(
self,
@@ -204,8 +223,9 @@ class LoggingDatabaseConnection:
txn_name = self.default_txn_name
return LoggingTransaction(
self.conn.cursor(),
txn=self.conn.cursor(),
name=txn_name,
server_name=self.server_name,
database_engine=self.engine,
after_callbacks=after_callbacks,
async_after_callbacks=async_after_callbacks,
@@ -274,6 +294,7 @@ class LoggingTransaction:
__slots__ = [
"txn",
"name",
"server_name",
"database_engine",
"after_callbacks",
"async_after_callbacks",
@@ -282,8 +303,10 @@ class LoggingTransaction:
def __init__(
self,
*,
txn: Cursor,
name: str,
server_name: str,
database_engine: BaseDatabaseEngine,
after_callbacks: Optional[List[_CallbackListEntry]] = None,
async_after_callbacks: Optional[List[_AsyncCallbackListEntry]] = None,
@@ -291,6 +314,7 @@ class LoggingTransaction:
):
self.txn = txn
self.name = name
self.server_name = server_name
self.database_engine = database_engine
self.after_callbacks = after_callbacks
self.async_after_callbacks = async_after_callbacks
@@ -501,7 +525,9 @@ class LoggingTransaction:
finally:
secs = time.time() - start
sql_logger.debug("[SQL time] {%s} %f sec", self.name, secs)
sql_query_timer.labels(sql.split()[0]).observe(secs)
sql_query_timer.labels(
verb=sql.split()[0], **{SERVER_NAME_LABEL: self.server_name}
).observe(secs)
def close(self) -> None:
self.txn.close()
@@ -573,7 +599,12 @@ class DatabasePool:
self._clock = hs.get_clock()
self._txn_limit = database_config.config.get("txn_limit", 0)
self._database_config = database_config
self._db_pool = make_pool(hs.get_reactor(), database_config, engine)
self._db_pool = make_pool(
reactor=hs.get_reactor(),
db_config=database_config,
engine=engine,
server_name=self.server_name,
)
self.updates = BackgroundUpdater(hs, self)
LaterGauge(
@@ -1020,7 +1051,9 @@ class DatabasePool:
operation_name="db.connection",
):
sched_duration_sec = monotonic_time() - start_time
sql_scheduling_timer.observe(sched_duration_sec)
sql_scheduling_timer.labels(
**{SERVER_NAME_LABEL: self.server_name}
).observe(sched_duration_sec)
context.add_database_scheduled(sched_duration_sec)
if self._txn_limit > 0:
@@ -1053,7 +1086,10 @@ class DatabasePool:
)
db_conn = LoggingDatabaseConnection(
conn, self.engine, "runWithConnection"
conn=conn,
engine=self.engine,
default_txn_name="runWithConnection",
server_name=self.server_name,
)
return func(db_conn, *args, **kwargs)
finally:

View File

@@ -69,11 +69,18 @@ class Databases(Generic[DataStoreT]):
state_deletion: Optional[StateDeletionDataStore] = None
persist_events: Optional[PersistEventsStore] = None
server_name = hs.hostname
for database_config in hs.config.database.databases:
db_name = database_config.name
engine = create_engine(database_config.config)
with make_conn(database_config, engine, "startup") as db_conn:
with make_conn(
db_config=database_config,
engine=engine,
default_txn_name="startup",
server_name=server_name,
) as db_conn:
logger.info("[database config %r]: Checking database server", db_name)
engine.check_database(db_conn)

View File

@@ -75,7 +75,7 @@ rate_limit_reject_counter = Counter(
queue_wait_timer = Histogram(
"synapse_rate_limit_queue_wait_time_seconds",
"Amount of time spent waiting for the rate limiter to let our request through.",
["rate_limiter_name"],
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
buckets=(
0.005,
0.01,
@@ -285,7 +285,10 @@ class _PerHostRatelimiter:
async def _on_enter_with_tracing(self, request_id: object) -> None:
maybe_metrics_cm: ContextManager = contextlib.nullcontext()
if self.metrics_name:
maybe_metrics_cm = queue_wait_timer.labels(self.metrics_name).time()
maybe_metrics_cm = queue_wait_timer.labels(
rate_limiter_name=self.metrics_name,
**{SERVER_NAME_LABEL: self.our_server_name},
).time()
with start_active_span("ratelimit wait"), maybe_metrics_cm:
await self._on_enter(request_id)

View File

@@ -97,6 +97,7 @@ from synapse.module_api.callbacks.third_party_event_rules_callbacks import (
load_legacy_third_party_event_rules,
)
from synapse.server import HomeServer
from synapse.server_notices.consent_server_notices import ConfigError
from synapse.storage import DataStore
from synapse.storage.database import LoggingDatabaseConnection, make_pool
from synapse.storage.engines import BaseDatabaseEngine, create_engine
@@ -702,6 +703,7 @@ def make_fake_db_pool(
reactor: ISynapseReactor,
db_config: DatabaseConnectionConfig,
engine: BaseDatabaseEngine,
server_name: str,
) -> adbapi.ConnectionPool:
"""Wrapper for `make_pool` which builds a pool which runs db queries synchronously.
@@ -710,7 +712,9 @@ def make_fake_db_pool(
is a drop-in replacement for the normal `make_pool` which builds such a connection
pool.
"""
pool = make_pool(reactor, db_config, engine)
pool = make_pool(
reactor=reactor, db_config=db_config, engine=engine, server_name=server_name
)
def runWithConnection(
func: Callable[..., R], *args: Any, **kwargs: Any
@@ -1084,12 +1088,19 @@ def setup_test_homeserver(
"args": {"database": test_db_location, "cp_min": 1, "cp_max": 1},
}
server_name = config.server.server_name
if not isinstance(server_name, str):
raise ConfigError("Must be a string", ("server_name",))
# Check if we have set up a DB that we can use as a template.
global PREPPED_SQLITE_DB_CONN
if PREPPED_SQLITE_DB_CONN is None:
temp_engine = create_engine(database_config)
PREPPED_SQLITE_DB_CONN = LoggingDatabaseConnection(
sqlite3.connect(":memory:"), temp_engine, "PREPPED_CONN"
conn=sqlite3.connect(":memory:"),
engine=temp_engine,
default_txn_name="PREPPED_CONN",
server_name=server_name,
)
database = DatabaseConnectionConfig("master", database_config)

View File

@@ -63,9 +63,15 @@ class ApplicationServiceStoreTestCase(unittest.HomeserverTestCase):
self._add_appservice("token3", "as3", "some_url", "some_hs_token", "bob")
# must be done after inserts
database = self.hs.get_datastores().databases[0]
self.server_name = self.hs.hostname
self.store = ApplicationServiceStore(
database,
make_conn(database._database_config, database.engine, "test"),
make_conn(
db_config=database._database_config,
engine=database.engine,
default_txn_name="test",
server_name=self.server_name,
),
self.hs,
)
@@ -138,9 +144,17 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase):
self.db_pool = database._db_pool
self.engine = database.engine
server_name = self.hs.hostname
db_config = self.hs.config.database.get_single_database()
self.store = TestTransactionStore(
database, make_conn(db_config, self.engine, "test"), self.hs
database,
make_conn(
db_config=db_config,
engine=self.engine,
default_txn_name="test",
server_name=server_name,
),
self.hs,
)
def _add_service(self, url: str, as_token: str, id: str) -> None:
@@ -488,10 +502,16 @@ class ApplicationServiceStoreConfigTestCase(unittest.HomeserverTestCase):
self.hs.config.appservice.app_service_config_files = [f1, f2]
self.hs.config.caches.event_cache_size = 1
server_name = self.hs.hostname
database = self.hs.get_datastores().databases[0]
ApplicationServiceStore(
database,
make_conn(database._database_config, database.engine, "test"),
make_conn(
db_config=database._database_config,
engine=database.engine,
default_txn_name="test",
server_name=server_name,
),
self.hs,
)
@@ -503,10 +523,16 @@ class ApplicationServiceStoreConfigTestCase(unittest.HomeserverTestCase):
self.hs.config.caches.event_cache_size = 1
with self.assertRaises(ConfigError) as cm:
server_name = self.hs.hostname
database = self.hs.get_datastores().databases[0]
ApplicationServiceStore(
database,
make_conn(database._database_config, database.engine, "test"),
make_conn(
db_config=database._database_config,
engine=database.engine,
default_txn_name="test",
server_name=server_name,
),
self.hs,
)
@@ -523,10 +549,16 @@ class ApplicationServiceStoreConfigTestCase(unittest.HomeserverTestCase):
self.hs.config.caches.event_cache_size = 1
with self.assertRaises(ConfigError) as cm:
server_name = self.hs.hostname
database = self.hs.get_datastores().databases[0]
ApplicationServiceStore(
database,
make_conn(database._database_config, database.engine, "test"),
make_conn(
db_config=database._database_config,
engine=database.engine,
default_txn_name="test",
server_name=server_name,
),
self.hs,
)

View File

@@ -69,9 +69,10 @@ class WorkerSchemaTests(HomeserverTestCase):
db_pool = self.hs.get_datastores().main.db_pool
db_conn = LoggingDatabaseConnection(
db_pool._db_pool.connect(),
db_pool.engine,
"tests",
conn=db_pool._db_pool.connect(),
engine=db_pool.engine,
default_txn_name="tests",
server_name="test_server",
)
cur = db_conn.cursor()
@@ -85,9 +86,10 @@ class WorkerSchemaTests(HomeserverTestCase):
"""Test that workers don't start if the DB has an older schema version"""
db_pool = self.hs.get_datastores().main.db_pool
db_conn = LoggingDatabaseConnection(
db_pool._db_pool.connect(),
db_pool.engine,
"tests",
conn=db_pool._db_pool.connect(),
engine=db_pool.engine,
default_txn_name="tests",
server_name="test_server",
)
cur = db_conn.cursor()
@@ -105,9 +107,10 @@ class WorkerSchemaTests(HomeserverTestCase):
"""
db_pool = self.hs.get_datastores().main.db_pool
db_conn = LoggingDatabaseConnection(
db_pool._db_pool.connect(),
db_pool.engine,
"tests",
conn=db_pool._db_pool.connect(),
engine=db_pool.engine,
default_txn_name="tests",
server_name="test_server",
)
# Set the schema version of the database to the current version

View File

@@ -36,8 +36,14 @@ class UnsafeLocaleTest(HomeserverTestCase):
def test_unsafe_locale(self, mock_db_locale: MagicMock) -> None:
mock_db_locale.return_value = ("B", "B")
database = self.hs.get_datastores().databases[0]
server_name = self.hs.hostname
db_conn = make_conn(database._database_config, database.engine, "test_unsafe")
db_conn = make_conn(
db_config=database._database_config,
engine=database.engine,
default_txn_name="test_unsafe",
server_name=server_name,
)
with self.assertRaises(IncorrectDatabaseSetup):
database.engine.check_database(db_conn)
with self.assertRaises(IncorrectDatabaseSetup):
@@ -47,8 +53,14 @@ class UnsafeLocaleTest(HomeserverTestCase):
def test_safe_locale(self) -> None:
database = self.hs.get_datastores().databases[0]
assert isinstance(database.engine, PostgresEngine)
server_name = self.hs.hostname
db_conn = make_conn(database._database_config, database.engine, "test_unsafe")
db_conn = make_conn(
db_config=database._database_config,
engine=database.engine,
default_txn_name="test_unsafe",
server_name=server_name,
)
with db_conn.cursor() as txn:
res = database.engine.get_db_locale(txn)
self.assertEqual(res, ("C", "C"))

View File

@@ -113,7 +113,12 @@ def setupdb() -> None:
port=POSTGRES_PORT,
password=POSTGRES_PASSWORD,
)
logging_conn = LoggingDatabaseConnection(db_conn, db_engine, "tests")
logging_conn = LoggingDatabaseConnection(
conn=db_conn,
engine=db_engine,
default_txn_name="tests",
server_name="test_server",
)
prepare_database(logging_conn, db_engine, None)
logging_conn.close()