Refactor LaterGauge metrics to be homeserver-scoped (#18714)

Part of https://github.com/element-hq/synapse/issues/18592
This commit is contained in:
Eric Eastwood
2025-07-29 13:49:41 -05:00
committed by GitHub
parent 106afe4984
commit 3d683350e9
13 changed files with 143 additions and 104 deletions

1
changelog.d/18714.misc Normal file
View File

@@ -0,0 +1 @@
Refactor `LaterGauge` metrics to be homeserver-scoped.

View File

@@ -54,7 +54,7 @@ from sortedcontainers import SortedDict
from synapse.api.presence import UserPresenceState
from synapse.federation.sender import AbstractFederationSender, FederationSender
from synapse.metrics import LaterGauge
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
from synapse.replication.tcp.streams.federation import FederationStream
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection
from synapse.util.metrics import Measure
@@ -113,10 +113,10 @@ class FederationRemoteSendQueue(AbstractFederationSender):
# changes. ARGH.
def register(name: str, queue: Sized) -> None:
LaterGauge(
"synapse_federation_send_queue_%s_size" % (queue_name,),
"",
[],
lambda: len(queue),
name="synapse_federation_send_queue_%s_size" % (queue_name,),
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(queue)},
)
for queue_name in [

View File

@@ -399,31 +399,37 @@ class FederationSender(AbstractFederationSender):
self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
LaterGauge(
"synapse_federation_transaction_queue_pending_destinations",
"",
[],
lambda: sum(
1
for d in self._per_destination_queues.values()
if d.transmission_loop_running
),
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum(
1
for d in self._per_destination_queues.values()
if d.transmission_loop_running
)
},
)
LaterGauge(
"synapse_federation_transaction_queue_pending_pdus",
"",
[],
lambda: sum(
d.pending_pdu_count() for d in self._per_destination_queues.values()
),
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum(
d.pending_pdu_count() for d in self._per_destination_queues.values()
)
},
)
LaterGauge(
"synapse_federation_transaction_queue_pending_edus",
"",
[],
lambda: sum(
d.pending_edu_count() for d in self._per_destination_queues.values()
),
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum(
d.pending_edu_count() for d in self._per_destination_queues.values()
)
},
)
self._is_processing = False

View File

@@ -780,10 +780,10 @@ class PresenceHandler(BasePresenceHandler):
)
LaterGauge(
"synapse_handlers_presence_user_to_current_state_size",
"",
[],
lambda: len(self.user_to_current_state),
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_current_state)},
)
# The per-device presence state, maps user to devices to per-device presence state.
@@ -883,10 +883,10 @@ class PresenceHandler(BasePresenceHandler):
)
LaterGauge(
"synapse_handlers_presence_wheel_timer_size",
"",
[],
lambda: len(self.wheel_timer),
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.wheel_timer)},
)
# Used to handle sending of presence to newly joined users/servers

View File

@@ -144,27 +144,31 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
# Cast to a list to prevent it changing while the Prometheus
# thread is collecting metrics
with _in_flight_requests_lock:
reqs = list(_in_flight_requests)
request_metrics = list(_in_flight_requests)
for rm in reqs:
rm.update_metrics()
for request_metric in request_metrics:
request_metric.update_metrics()
# Map from (method, name) -> int, the number of in flight requests of that
# type. The key type is Tuple[str, str], but we leave the length unspecified
# for compatability with LaterGauge's annotations.
counts: Dict[Tuple[str, ...], int] = {}
for rm in reqs:
key = (rm.method, rm.name)
for request_metric in request_metrics:
key = (
request_metric.method,
request_metric.name,
request_metric.our_server_name,
)
counts[key] = counts.get(key, 0) + 1
return counts
LaterGauge(
"synapse_http_server_in_flight_requests_count",
"",
["method", "servlet"],
_get_in_flight_counts,
name="synapse_http_server_in_flight_requests_count",
desc="",
labelnames=["method", "servlet", SERVER_NAME_LABEL],
caller=_get_in_flight_counts,
)

View File

@@ -156,13 +156,13 @@ class _RegistryProxy:
RegistryProxy = cast(CollectorRegistry, _RegistryProxy)
@attr.s(slots=True, hash=True, auto_attribs=True)
@attr.s(slots=True, hash=True, auto_attribs=True, kw_only=True)
class LaterGauge(Collector):
"""A Gauge which periodically calls a user-provided callback to produce metrics."""
name: str
desc: str
labels: Optional[StrSequence] = attr.ib(hash=False)
labelnames: Optional[StrSequence] = attr.ib(hash=False)
# callback: should either return a value (if there are no labels for this metric),
# or dict mapping from a label tuple to a value
caller: Callable[
@@ -170,7 +170,7 @@ class LaterGauge(Collector):
]
def collect(self) -> Iterable[Metric]:
g = GaugeMetricFamily(self.name, self.desc, labels=self.labels)
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames)
try:
calls = self.caller()

View File

@@ -29,6 +29,7 @@ from typing import (
Iterable,
List,
Literal,
Mapping,
Optional,
Set,
Tuple,
@@ -263,7 +264,10 @@ class Notifier:
# This is not a very cheap test to perform, but it's only executed
# when rendering the metrics page, which is likely once per minute at
# most when scraping it.
def count_listeners() -> int:
#
# Ideally, we'd use `Mapping[Tuple[str], int]` here but mypy doesn't like it.
# This is close enough and better than a type ignore.
def count_listeners() -> Mapping[Tuple[str, ...], int]:
all_user_streams: Set[_NotifierUserStream] = set()
for streams in list(self.room_to_user_streams.values()):
@@ -271,18 +275,34 @@ class Notifier:
for stream in list(self.user_to_user_stream.values()):
all_user_streams.add(stream)
return sum(stream.count_listeners() for stream in all_user_streams)
LaterGauge("synapse_notifier_listeners", "", [], count_listeners)
return {
(self.server_name,): sum(
stream.count_listeners() for stream in all_user_streams
)
}
LaterGauge(
"synapse_notifier_rooms",
"",
[],
lambda: count(bool, list(self.room_to_user_streams.values())),
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=count_listeners,
)
LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): count(
bool, list(self.room_to_user_streams.values())
)
},
)
LaterGauge(
"synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream)
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_user_stream)},
)
def add_replication_callback(self, cb: Callable[[], None]) -> None:

View File

@@ -244,10 +244,10 @@ class ReplicationCommandHandler:
self._connections: List[IReplicationConnection] = []
LaterGauge(
"synapse_replication_tcp_resource_total_connections",
"",
[],
lambda: len(self._connections),
name="synapse_replication_tcp_resource_total_connections",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._connections)},
)
# When POSITION or RDATA commands arrive, we stick them in a queue and process
@@ -267,11 +267,11 @@ class ReplicationCommandHandler:
self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}
LaterGauge(
"synapse_replication_tcp_command_queue",
"Number of inbound RDATA/POSITION commands queued for processing",
["stream_name"],
lambda: {
(stream_name,): len(queue)
name="synapse_replication_tcp_command_queue",
desc="Number of inbound RDATA/POSITION commands queued for processing",
labelnames=["stream_name", SERVER_NAME_LABEL],
caller=lambda: {
(stream_name, self.server_name): len(queue)
for stream_name, queue in self._command_queues_by_stream.items()
},
)

View File

@@ -524,10 +524,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
# The following simply registers metrics for the replication connections
pending_commands = LaterGauge(
"synapse_replication_tcp_protocol_pending_commands",
"",
["name"],
lambda: {(p.name,): len(p.pending_commands) for p in connected_connections},
name="synapse_replication_tcp_protocol_pending_commands",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: {
(p.name, p.server_name): len(p.pending_commands) for p in connected_connections
},
)
@@ -539,10 +541,12 @@ def transport_buffer_size(protocol: BaseReplicationStreamProtocol) -> int:
transport_send_buffer = LaterGauge(
"synapse_replication_tcp_protocol_transport_send_buffer",
"",
["name"],
lambda: {(p.name,): transport_buffer_size(p) for p in connected_connections},
name="synapse_replication_tcp_protocol_transport_send_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: {
(p.name, p.server_name): transport_buffer_size(p) for p in connected_connections
},
)
@@ -564,22 +568,22 @@ def transport_kernel_read_buffer_size(
tcp_transport_kernel_send_buffer = LaterGauge(
"synapse_replication_tcp_protocol_transport_kernel_send_buffer",
"",
["name"],
lambda: {
(p.name,): transport_kernel_read_buffer_size(p, False)
name="synapse_replication_tcp_protocol_transport_kernel_send_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, False)
for p in connected_connections
},
)
tcp_transport_kernel_read_buffer = LaterGauge(
"synapse_replication_tcp_protocol_transport_kernel_read_buffer",
"",
["name"],
lambda: {
(p.name,): transport_kernel_read_buffer_size(p, True)
name="synapse_replication_tcp_protocol_transport_kernel_read_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, True)
for p in connected_connections
},
)

View File

@@ -588,10 +588,10 @@ class DatabasePool:
self.updates = BackgroundUpdater(hs, self)
LaterGauge(
"synapse_background_update_status",
"Background update status",
[],
self.updates.get_status,
name="synapse_background_update_status",
desc="Background update status",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self.updates.get_status()},
)
self._previous_txn_total_time = 0.0

View File

@@ -43,7 +43,7 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.logging.opentracing import trace
from synapse.metrics import LaterGauge
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
@@ -117,10 +117,10 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
self._count_known_servers,
)
LaterGauge(
"synapse_federation_known_servers",
"",
[],
lambda: self._known_servers_count,
name="synapse_federation_known_servers",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self._known_servers_count},
)
@wrap_as_background_process("_count_known_servers")

View File

@@ -119,7 +119,10 @@ def _get_counts_from_rate_limiter_instance(
# Only track metrics if they provided a `metrics_name` to
# differentiate this instance of the rate limiter.
if rate_limiter_instance.metrics_name:
key = (rate_limiter_instance.metrics_name,)
key = (
rate_limiter_instance.metrics_name,
rate_limiter_instance.our_server_name,
)
counts[key] = count_func(rate_limiter_instance)
return counts
@@ -129,10 +132,10 @@ def _get_counts_from_rate_limiter_instance(
# differentiate one really noisy homeserver from a general
# ratelimit tuning problem across the federation.
LaterGauge(
"synapse_rate_limit_sleep_affected_hosts",
"Number of hosts that had requests put to sleep",
["rate_limiter_name"],
lambda: _get_counts_from_rate_limiter_instance(
name="synapse_rate_limit_sleep_affected_hosts",
desc="Number of hosts that had requests put to sleep",
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
caller=lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum(
ratelimiter.should_sleep()
for ratelimiter in rate_limiter_instance.ratelimiters.values()
@@ -140,10 +143,10 @@ LaterGauge(
),
)
LaterGauge(
"synapse_rate_limit_reject_affected_hosts",
"Number of hosts that had requests rejected",
["rate_limiter_name"],
lambda: _get_counts_from_rate_limiter_instance(
name="synapse_rate_limit_reject_affected_hosts",
desc="Number of hosts that had requests rejected",
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
caller=lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum(
ratelimiter.should_reject()
for ratelimiter in rate_limiter_instance.ratelimiters.values()
@@ -171,6 +174,7 @@ class FederationRateLimiter:
for this rate limiter.
"""
self.our_server_name = our_server_name
self.metrics_name = metrics_name
def new_limiter() -> "_PerHostRatelimiter":

View File

@@ -30,7 +30,7 @@ from synapse.logging.context import (
nested_logging_context,
set_current_context,
)
from synapse.metrics import LaterGauge
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
@@ -131,10 +131,10 @@ class TaskScheduler:
)
LaterGauge(
"synapse_scheduler_running_tasks",
"The number of concurrent running tasks handled by the TaskScheduler",
labels=None,
caller=lambda: len(self._running_tasks),
name="synapse_scheduler_running_tasks",
desc="The number of concurrent running tasks handled by the TaskScheduler",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._running_tasks)},
)
def register_action(