Refactor Gauge metrics to be homeserver-scoped (#18725)

Bulk refactor `Gauge` metrics to be homeserver-scoped. We also add lints
to make sure that new `Gauge` metrics don't sneak in without using the
`server_name` label (`SERVER_NAME_LABEL`).

Part of https://github.com/element-hq/synapse/issues/18592



### Testing strategy

 1. Add the `metrics` listener in your `homeserver.yaml`
    ```yaml
    listeners:
      # This is just showing how to configure metrics either way
      #
      # `http` `metrics` resource
      - port: 9322
        type: http
        bind_addresses: ['127.0.0.1']
        resources:
          - names: [metrics]
            compress: false
      # `metrics` listener
      - port: 9323
        type: metrics
        bind_addresses: ['127.0.0.1']
    ```
1. Start the homeserver: `poetry run synapse_homeserver --config-path
homeserver.yaml`
1. Fetch `http://localhost:9322/_synapse/metrics` and/or
`http://localhost:9323/metrics`
1. Observe response includes the TODO metrics with the `server_name`
label

### Pull Request Checklist

<!-- Please read
https://element-hq.github.io/synapse/latest/development/contributing_guide.html
before submitting your pull request -->

* [x] Pull request is based on the develop branch
* [x] Pull request includes a [changelog
file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog).
The entry should:
- Be a short description of your change which makes sense to users.
"Fixed a bug that prevented receiving messages from other servers."
instead of "Moved X method from `EventStore` to `EventWorkerStore`.".
  - Use markdown where necessary, mostly for `code blocks`.
  - End with either a period (.) or an exclamation mark (!).
  - Start with a capital letter.
- Feel free to credit yourself, by adding a sentence "Contributed by
@github_username." or "Contributed by [Your Name]." to the end of the
entry.
* [x] [Code
style](https://element-hq.github.io/synapse/latest/code_style.html) is
correct (run the
[linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))
This commit is contained in:
Eric Eastwood
2025-07-29 10:37:59 -05:00
committed by GitHub
parent 2c236be058
commit f13a136396
29 changed files with 234 additions and 88 deletions

1
changelog.d/18725.misc Normal file
View File

@@ -0,0 +1 @@
Refactor `Gauge` metrics to be homeserver-scoped.

View File

@@ -4396,7 +4396,7 @@
"exemplar": false,
"expr": "(time() - max without (job, index, host) (avg_over_time(synapse_federation_last_received_pdu_time[10m]))) / 60",
"instant": false,
"legendFormat": "{{server_name}} ",
"legendFormat": "{{origin_server_name}} ",
"range": true,
"refId": "A"
}
@@ -4518,7 +4518,7 @@
"exemplar": false,
"expr": "(time() - max without (job, index, host) (avg_over_time(synapse_federation_last_sent_pdu_time[10m]))) / 60",
"instant": false,
"legendFormat": "{{server_name}}",
"legendFormat": "{{destination_server_name}}",
"range": true,
"refId": "A"
}

View File

@@ -117,6 +117,25 @@ each upgrade are complete before moving on to the next upgrade, to avoid
stacking them up. You can monitor the currently running background updates with
[the Admin API](usage/administration/admin_api/background_updates.html#status).
# Upgrading to v1.136.0
## Metric labels have changed on `synapse_federation_last_received_pdu_time` and `synapse_federation_last_sent_pdu_time`
Previously, the `synapse_federation_last_received_pdu_time` and
`synapse_federation_last_sent_pdu_time` metrics both used the `server_name` label to
differentiate between different servers that we send and receive events from.
Since we're now using the `server_name` label to differentiate between different Synapse
homeserver instances running in the same process, these metrics have been changed as follows:
- `synapse_federation_last_received_pdu_time` now uses the `origin_server_name` label
- `synapse_federation_last_sent_pdu_time` now uses the `destination_server_name` label
The Grafana dashboard JSON in `contrib/grafana/synapse.json` has been updated to reflect
this change but you will need to manually update your own existing Grafana dashboards
using these metrics.
# Upgrading to v1.135.0
## `on_user_registration` module API callback may now run on any worker

View File

@@ -28,7 +28,7 @@ from typing import Callable, Optional, Tuple, Type, Union
import mypy.types
from mypy.erasetype import remove_instance_last_known_values
from mypy.errorcodes import ErrorCode
from mypy.nodes import ARG_NAMED_OPT, ListExpr, NameExpr, TempNode, Var
from mypy.nodes import ARG_NAMED_OPT, ListExpr, NameExpr, TempNode, TupleExpr, Var
from mypy.plugin import (
FunctionLike,
FunctionSigContext,
@@ -61,6 +61,7 @@ class SynapsePlugin(Plugin):
) -> Optional[Callable[[FunctionSigContext], FunctionLike]]:
if fullname in (
"prometheus_client.metrics.Counter",
"prometheus_client.metrics.Gauge",
# TODO: Add other prometheus_client metrics that need checking as we
# refactor, see https://github.com/element-hq/synapse/issues/18592
):
@@ -98,8 +99,8 @@ def check_prometheus_metric_instantiation(ctx: FunctionSigContext) -> CallableTy
ensures metrics are correctly separated by homeserver.
There are also some metrics that apply at the process level, such as CPU usage,
Python garbage collection, Twisted reactor tick time which shouldn't have the
`SERVER_NAME_LABEL`. In those cases, use use a type ignore comment to disable the
Python garbage collection, and Twisted reactor tick time, which shouldn't have the
`SERVER_NAME_LABEL`. In those cases, use a type ignore comment to disable the
check, e.g. `# type: ignore[missing-server-name-label]`.
"""
# The true signature, this isn't being modified so this is what will be returned.
@@ -136,7 +137,7 @@ def check_prometheus_metric_instantiation(ctx: FunctionSigContext) -> CallableTy
# ]
# ```
labelnames_arg_expression = ctx.args[2][0] if len(ctx.args[2]) > 0 else None
if isinstance(labelnames_arg_expression, ListExpr):
if isinstance(labelnames_arg_expression, (ListExpr, TupleExpr)):
# Check if the `labelnames` argument includes the `server_name` label (`SERVER_NAME_LABEL`).
for labelname_expression in labelnames_arg_expression.items:
if (

View File

@@ -525,8 +525,12 @@ async def start(hs: "HomeServer") -> None:
)
# Register the threadpools with our metrics.
register_threadpool("default", reactor.getThreadPool())
register_threadpool("gai_resolver", resolver_threadpool)
register_threadpool(
name="default", server_name=server_name, threadpool=reactor.getThreadPool()
)
register_threadpool(
name="gai_resolver", server_name=server_name, threadpool=resolver_threadpool
)
# Set up the SIGHUP machinery.
if hasattr(signal, "SIGHUP"):

View File

@@ -28,6 +28,7 @@ from prometheus_client import Gauge
from twisted.internet import defer
from synapse.metrics import SERVER_NAME_LABEL
from synapse.metrics.background_process_metrics import (
run_as_background_process,
)
@@ -57,16 +58,25 @@ Phone home stats are sent every 3 hours
_stats_process: List[Tuple[int, "resource.struct_rusage"]] = []
# Gauges to expose monthly active user control metrics
current_mau_gauge = Gauge("synapse_admin_mau_current", "Current MAU")
current_mau_gauge = Gauge(
"synapse_admin_mau_current",
"Current MAU",
labelnames=[SERVER_NAME_LABEL],
)
current_mau_by_service_gauge = Gauge(
"synapse_admin_mau_current_mau_by_service",
"Current MAU by service",
["app_service"],
labelnames=["app_service", SERVER_NAME_LABEL],
)
max_mau_gauge = Gauge(
"synapse_admin_mau_max",
"MAU Limit",
labelnames=[SERVER_NAME_LABEL],
)
max_mau_gauge = Gauge("synapse_admin_mau_max", "MAU Limit")
registered_reserved_users_mau_gauge = Gauge(
"synapse_admin_mau_registered_reserved_users",
"Registered users with reserved threepids",
labelnames=[SERVER_NAME_LABEL],
)
@@ -237,13 +247,21 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
await store.get_monthly_active_count_by_service()
)
reserved_users = await store.get_registered_reserved_users()
current_mau_gauge.set(float(current_mau_count))
current_mau_gauge.labels(**{SERVER_NAME_LABEL: server_name}).set(
float(current_mau_count)
)
for app_service, count in current_mau_count_by_service.items():
current_mau_by_service_gauge.labels(app_service).set(float(count))
current_mau_by_service_gauge.labels(
app_service=app_service, **{SERVER_NAME_LABEL: server_name}
).set(float(count))
registered_reserved_users_mau_gauge.set(float(len(reserved_users)))
max_mau_gauge.set(float(hs.config.server.max_mau_value))
registered_reserved_users_mau_gauge.labels(
**{SERVER_NAME_LABEL: server_name}
).set(float(len(reserved_users)))
max_mau_gauge.labels(**{SERVER_NAME_LABEL: server_name}).set(
float(hs.config.server.max_mau_value)
)
return run_as_background_process(
"generate_monthly_active_users",

View File

@@ -127,7 +127,7 @@ pdu_process_time = Histogram(
last_pdu_ts_metric = Gauge(
"synapse_federation_last_received_pdu_time",
"The timestamp of the last PDU which was successfully received from the given domain",
labelnames=("server_name",),
labelnames=("origin_server_name", SERVER_NAME_LABEL),
)
@@ -554,7 +554,9 @@ class FederationServer(FederationBase):
)
if newest_pdu_ts and origin in self._federation_metrics_domains:
last_pdu_ts_metric.labels(server_name=origin).set(newest_pdu_ts / 1000)
last_pdu_ts_metric.labels(
origin_server_name=origin, **{SERVER_NAME_LABEL: self.server_name}
).set(newest_pdu_ts / 1000)
return pdu_results

View File

@@ -705,10 +705,12 @@ class FederationSender(AbstractFederationSender):
assert ts is not None
synapse.metrics.event_processing_lag.labels(
"federation_sender"
name="federation_sender",
**{SERVER_NAME_LABEL: self.server_name},
).set(now - ts)
synapse.metrics.event_processing_last_ts.labels(
"federation_sender"
name="federation_sender",
**{SERVER_NAME_LABEL: self.server_name},
).set(ts)
events_processed_counter.labels(
@@ -726,7 +728,7 @@ class FederationSender(AbstractFederationSender):
).inc()
synapse.metrics.event_processing_positions.labels(
"federation_sender"
name="federation_sender", **{SERVER_NAME_LABEL: self.server_name}
).set(next_token)
finally:

View File

@@ -34,6 +34,7 @@ from synapse.logging.opentracing import (
tags,
whitelisted_homeserver,
)
from synapse.metrics import SERVER_NAME_LABEL
from synapse.types import JsonDict
from synapse.util import json_decoder
from synapse.util.metrics import measure_func
@@ -47,7 +48,7 @@ issue_8631_logger = logging.getLogger("synapse.8631_debug")
last_pdu_ts_metric = Gauge(
"synapse_federation_last_sent_pdu_time",
"The timestamp of the last PDU which was successfully sent to the given domain",
labelnames=("server_name",),
labelnames=("destination_server_name", SERVER_NAME_LABEL),
)
@@ -191,6 +192,7 @@ class TransactionManager:
if pdus and destination in self._federation_metrics_domains:
last_pdu = pdus[-1]
last_pdu_ts_metric.labels(server_name=destination).set(
last_pdu.origin_server_ts / 1000
)
last_pdu_ts_metric.labels(
destination_server_name=destination,
**{SERVER_NAME_LABEL: self.server_name},
).set(last_pdu.origin_server_ts / 1000)

View File

@@ -207,7 +207,8 @@ class ApplicationServicesHandler:
await self.store.set_appservice_last_pos(upper_bound)
synapse.metrics.event_processing_positions.labels(
"appservice_sender"
name="appservice_sender",
**{SERVER_NAME_LABEL: self.server_name},
).set(upper_bound)
events_processed_counter.labels(
@@ -230,10 +231,12 @@ class ApplicationServicesHandler:
assert ts is not None
synapse.metrics.event_processing_lag.labels(
"appservice_sender"
name="appservice_sender",
**{SERVER_NAME_LABEL: self.server_name},
).set(now - ts)
synapse.metrics.event_processing_last_ts.labels(
"appservice_sender"
name="appservice_sender",
**{SERVER_NAME_LABEL: self.server_name},
).set(ts)
finally:
self.is_processing = False

View File

@@ -22,7 +22,7 @@ from synapse.api.errors import ShadowBanError
from synapse.api.ratelimiting import Ratelimiter
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
from synapse.logging.opentracing import set_tag
from synapse.metrics import event_processing_positions
from synapse.metrics import SERVER_NAME_LABEL, event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.delayed_events import (
ReplicationAddedDelayedEventRestServlet,
@@ -191,7 +191,9 @@ class DelayedEventsHandler:
self._event_pos = max_pos
# Expose current event processing position to prometheus
event_processing_positions.labels("delayed_events").set(max_pos)
event_processing_positions.labels(
name="delayed_events", **{SERVER_NAME_LABEL: self.server_name}
).set(max_pos)
await self._store.update_delayed_events_stream_pos(max_pos)

View File

@@ -1568,9 +1568,9 @@ class PresenceHandler(BasePresenceHandler):
self._event_pos = max_pos
# Expose current event processing position to prometheus
synapse.metrics.event_processing_positions.labels("presence").set(
max_pos
)
synapse.metrics.event_processing_positions.labels(
name="presence", **{SERVER_NAME_LABEL: self.server_name}
).set(max_pos)
async def _handle_state_delta(self, room_id: str, deltas: List[StateDelta]) -> None:
"""Process current state deltas for the room to find new joins that need

View File

@@ -49,7 +49,7 @@ from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging import opentracing
from synapse.metrics import event_processing_positions
from synapse.metrics import SERVER_NAME_LABEL, event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.push import ReplicationCopyPusherRestServlet
from synapse.storage.databases.main.state_deltas import StateDelta
@@ -2255,7 +2255,9 @@ class RoomForgetterHandler(StateDeltasHandler):
self.pos = max_pos
# Expose current event processing position to prometheus
event_processing_positions.labels("room_forgetter").set(max_pos)
event_processing_positions.labels(
name="room_forgetter", **{SERVER_NAME_LABEL: self.server_name}
).set(max_pos)
await self._store.update_room_forgetter_stream_pos(max_pos)

View File

@@ -32,7 +32,7 @@ from typing import (
)
from synapse.api.constants import EventContentFields, EventTypes, Membership
from synapse.metrics import event_processing_positions
from synapse.metrics import SERVER_NAME_LABEL, event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.types import JsonDict
@@ -147,7 +147,9 @@ class StatsHandler:
logger.debug("Handled room stats to %s -> %s", self.pos, max_pos)
event_processing_positions.labels("stats").set(max_pos)
event_processing_positions.labels(
name="stats", **{SERVER_NAME_LABEL: self.server_name}
).set(max_pos)
self.pos = max_pos

View File

@@ -35,6 +35,7 @@ from synapse.api.constants import (
)
from synapse.api.errors import Codes, SynapseError
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
from synapse.metrics import SERVER_NAME_LABEL
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.storage.databases.main.user_directory import SearchResult
@@ -262,9 +263,9 @@ class UserDirectoryHandler(StateDeltasHandler):
self.pos = max_pos
# Expose current event processing position to prometheus
synapse.metrics.event_processing_positions.labels("user_dir").set(
max_pos
)
synapse.metrics.event_processing_positions.labels(
name="user_dir", **{SERVER_NAME_LABEL: self.server_name}
).set(max_pos)
await self.store.update_user_directory_stream_pos(max_pos)

View File

@@ -495,19 +495,27 @@ event_processing_loop_room_count = Counter(
# Used to track where various components have processed in the event stream,
# e.g. federation sending, appservice sending, etc.
event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"])
event_processing_positions = Gauge(
"synapse_event_processing_positions", "", labelnames=["name", SERVER_NAME_LABEL]
)
# Used to track the current max events stream position
event_persisted_position = Gauge("synapse_event_persisted_position", "")
event_persisted_position = Gauge(
"synapse_event_persisted_position", "", labelnames=[SERVER_NAME_LABEL]
)
# Used to track the received_ts of the last event processed by various
# components
event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"])
event_processing_last_ts = Gauge(
"synapse_event_processing_last_ts", "", labelnames=["name", SERVER_NAME_LABEL]
)
# Used to track the lag processing events. This is the time difference
# between the last processed event's received_ts and the time it was
# finished being processed.
event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
event_processing_lag = Gauge(
"synapse_event_processing_lag", "", labelnames=["name", SERVER_NAME_LABEL]
)
event_processing_lag_by_event = Histogram(
"synapse_event_processing_lag_by_event",
@@ -516,7 +524,11 @@ event_processing_lag_by_event = Histogram(
)
# Build info of the running server.
build_info = Gauge(
#
# This is a process-level metric, so it does not have the `SERVER_NAME_LABEL`. We
# consider this process-level because all Synapse homeservers running in the process
# will use the same Synapse version.
build_info = Gauge( # type: ignore[missing-server-name-label]
"synapse_build_info", "Build information", ["pythonversion", "version", "osversion"]
)
build_info.labels(
@@ -538,38 +550,51 @@ threepid_send_requests = Histogram(
threadpool_total_threads = Gauge(
"synapse_threadpool_total_threads",
"Total number of threads currently in the threadpool",
["name"],
labelnames=["name", SERVER_NAME_LABEL],
)
threadpool_total_working_threads = Gauge(
"synapse_threadpool_working_threads",
"Number of threads currently working in the threadpool",
["name"],
labelnames=["name", SERVER_NAME_LABEL],
)
threadpool_total_min_threads = Gauge(
"synapse_threadpool_min_threads",
"Minimum number of threads configured in the threadpool",
["name"],
labelnames=["name", SERVER_NAME_LABEL],
)
threadpool_total_max_threads = Gauge(
"synapse_threadpool_max_threads",
"Maximum number of threads configured in the threadpool",
["name"],
labelnames=["name", SERVER_NAME_LABEL],
)
def register_threadpool(name: str, threadpool: ThreadPool) -> None:
"""Add metrics for the threadpool."""
def register_threadpool(*, name: str, server_name: str, threadpool: ThreadPool) -> None:
"""
Add metrics for the threadpool.
threadpool_total_min_threads.labels(name).set(threadpool.min)
threadpool_total_max_threads.labels(name).set(threadpool.max)
Args:
name: The name of the threadpool, used to identify it in the metrics.
server_name: The homeserver name (used to label metrics) (this should be `hs.hostname`).
threadpool: The threadpool to register metrics for.
"""
threadpool_total_threads.labels(name).set_function(lambda: len(threadpool.threads))
threadpool_total_working_threads.labels(name).set_function(
lambda: len(threadpool.working)
)
threadpool_total_min_threads.labels(
name=name, **{SERVER_NAME_LABEL: server_name}
).set(threadpool.min)
threadpool_total_max_threads.labels(
name=name, **{SERVER_NAME_LABEL: server_name}
).set(threadpool.max)
threadpool_total_threads.labels(
name=name, **{SERVER_NAME_LABEL: server_name}
).set_function(lambda: len(threadpool.threads))
threadpool_total_working_threads.labels(
name=name, **{SERVER_NAME_LABEL: server_name}
).set_function(lambda: len(threadpool.working))
class MetricsResource(Resource):

View File

@@ -54,7 +54,8 @@ running_on_pypy = platform.python_implementation() == "PyPy"
# Python GC metrics
#
gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"])
# This is a process-level metric, so it does not have the `SERVER_NAME_LABEL`.
gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"]) # type: ignore[missing-server-name-label]
gc_time = Histogram(
"python_gc_time",
"Time taken to GC (sec)",

View File

@@ -22,6 +22,7 @@ from typing import TYPE_CHECKING
import attr
from synapse.metrics import SERVER_NAME_LABEL
from synapse.metrics.background_process_metrics import run_as_background_process
if TYPE_CHECKING:
@@ -33,6 +34,7 @@ from prometheus_client import Gauge
current_dau_gauge = Gauge(
"synapse_admin_daily_active_users",
"Current daily active users count",
labelnames=[SERVER_NAME_LABEL],
)
@@ -89,4 +91,6 @@ class CommonUsageMetricsManager:
"""Update the Prometheus gauges."""
metrics = await self._collect()
current_dau_gauge.set(float(metrics.daily_active_users))
current_dau_gauge.labels(
**{SERVER_NAME_LABEL: self.server_name},
).set(float(metrics.daily_active_users))

View File

@@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, Dict, Iterable, Optional
from prometheus_client import Gauge
from synapse.api.errors import Codes, SynapseError
from synapse.metrics import SERVER_NAME_LABEL
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
@@ -44,7 +45,9 @@ logger = logging.getLogger(__name__)
synapse_pushers = Gauge(
"synapse_pushers", "Number of active synapse pushers", ["kind", "app_id"]
"synapse_pushers",
"Number of active synapse pushers",
labelnames=["kind", "app_id", SERVER_NAME_LABEL],
)
@@ -420,11 +423,17 @@ class PusherPool:
previous_pusher.on_stop()
synapse_pushers.labels(
type(previous_pusher).__name__, previous_pusher.app_id
kind=type(previous_pusher).__name__,
app_id=previous_pusher.app_id,
**{SERVER_NAME_LABEL: self.server_name},
).dec()
byuser[appid_pushkey] = pusher
synapse_pushers.labels(type(pusher).__name__, pusher.app_id).inc()
synapse_pushers.labels(
kind=type(pusher).__name__,
app_id=pusher.app_id,
**{SERVER_NAME_LABEL: self.server_name},
).inc()
logger.info("Starting pusher %s / %s", pusher.user_id, appid_pushkey)
@@ -476,4 +485,8 @@ class PusherPool:
pusher = byuser.pop(appid_pushkey)
pusher.on_stop()
synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec()
synapse_pushers.labels(
kind=type(pusher).__name__,
app_id=pusher.app_id,
**{SERVER_NAME_LABEL: self.server_name},
).dec()

View File

@@ -52,7 +52,7 @@ logger = logging.getLogger(__name__)
_pending_outgoing_requests = Gauge(
"synapse_pending_outgoing_replication_requests",
"Number of active outgoing replication requests, by replication method name",
["name"],
labelnames=["name", SERVER_NAME_LABEL],
)
_outgoing_request_counter = Counter(
@@ -213,7 +213,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
instance_map = hs.config.worker.instance_map
outgoing_gauge = _pending_outgoing_requests.labels(cls.NAME)
outgoing_gauge = _pending_outgoing_requests.labels(
name=cls.NAME,
**{SERVER_NAME_LABEL: server_name},
)
replication_secret = None
if hs.config.worker.worker_replication_secret:

View File

@@ -981,7 +981,10 @@ class HomeServer(metaclass=abc.ABCMeta):
)
# Register the threadpool with our metrics.
register_threadpool("media", media_threadpool)
server_name = self.hostname
register_threadpool(
name="media", server_name=server_name, threadpool=media_threadpool
)
return media_threadpool

View File

@@ -126,9 +126,11 @@ class _PoolConnection(Connection):
def make_pool(
*,
reactor: IReactorCore,
db_config: DatabaseConnectionConfig,
engine: BaseDatabaseEngine,
server_name: str,
) -> adbapi.ConnectionPool:
"""Get the connection pool for the database."""
@@ -152,7 +154,11 @@ def make_pool(
**db_args,
)
register_threadpool(f"database-{db_config.name}", connection_pool.threadpool)
register_threadpool(
name=f"database-{db_config.name}",
server_name=server_name,
threadpool=connection_pool.threadpool,
)
return connection_pool
@@ -573,7 +579,12 @@ class DatabasePool:
self._clock = hs.get_clock()
self._txn_limit = database_config.config.get("txn_limit", 0)
self._database_config = database_config
self._db_pool = make_pool(hs.get_reactor(), database_config, engine)
self._db_pool = make_pool(
reactor=hs.get_reactor(),
db_config=database_config,
engine=engine,
server_name=self.server_name,
)
self.updates = BackgroundUpdater(hs, self)
LaterGauge(

View File

@@ -71,11 +71,13 @@ if TYPE_CHECKING:
oldest_pdu_in_federation_staging = Gauge(
"synapse_federation_server_oldest_inbound_pdu_in_staging",
"The age in seconds since we received the oldest pdu in the federation staging area",
labelnames=[SERVER_NAME_LABEL],
)
number_pdus_in_federation_queue = Gauge(
"synapse_federation_server_number_inbound_pdu_in_staging",
"The total number of events in the inbound federation staging",
labelnames=[SERVER_NAME_LABEL],
)
pdus_pruned_from_federation_queue = Counter(
@@ -2060,8 +2062,12 @@ class EventFederationWorkerStore(
"_get_stats_for_federation_staging", _get_stats_for_federation_staging_txn
)
number_pdus_in_federation_queue.set(count)
oldest_pdu_in_federation_staging.set(age)
number_pdus_in_federation_queue.labels(
**{SERVER_NAME_LABEL: self.server_name}
).set(count)
oldest_pdu_in_federation_staging.labels(
**{SERVER_NAME_LABEL: self.server_name}
).set(age)
async def clean_room_for_join(self, room_id: str) -> None:
await self.db_pool.runInteraction(

View File

@@ -368,7 +368,9 @@ class PersistEventsStore:
if not use_negative_stream_ordering:
# we don't want to set the event_persisted_position to a negative
# stream_ordering.
synapse.metrics.event_persisted_position.set(stream)
synapse.metrics.event_persisted_position.labels(
**{SERVER_NAME_LABEL: self.server_name}
).set(stream)
for event, context in events_and_contexts:
if context.app_service:

View File

@@ -68,6 +68,7 @@ from synapse.logging.opentracing import (
tag_args,
trace,
)
from synapse.metrics import SERVER_NAME_LABEL
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
@@ -138,6 +139,7 @@ EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events
event_fetch_ongoing_gauge = Gauge(
"synapse_event_fetch_ongoing",
"The number of event fetchers that are running",
labelnames=[SERVER_NAME_LABEL],
)
@@ -312,7 +314,9 @@ class EventsWorkerStore(SQLBaseStore):
Tuple[Iterable[str], "defer.Deferred[Dict[str, _EventRow]]"]
] = []
self._event_fetch_ongoing = 0
event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
event_fetch_ongoing_gauge.labels(**{SERVER_NAME_LABEL: self.server_name}).set(
self._event_fetch_ongoing
)
# We define this sequence here so that it can be referenced from both
# the DataStore and PersistEventStore.
@@ -1140,7 +1144,9 @@ class EventsWorkerStore(SQLBaseStore):
and self._event_fetch_ongoing < EVENT_QUEUE_THREADS
):
self._event_fetch_ongoing += 1
event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
event_fetch_ongoing_gauge.labels(
**{SERVER_NAME_LABEL: self.server_name}
).set(self._event_fetch_ongoing)
# `_event_fetch_ongoing` is decremented in `_fetch_thread`.
should_start = True
else:
@@ -1164,7 +1170,9 @@ class EventsWorkerStore(SQLBaseStore):
event_fetches_to_fail = []
with self._event_fetch_lock:
self._event_fetch_ongoing -= 1
event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
event_fetch_ongoing_gauge.labels(
**{SERVER_NAME_LABEL: self.server_name}
).set(self._event_fetch_ongoing)
# There may still be work remaining in `_event_fetch_list` if we
# failed, or it was added in between us deciding to exit and

View File

@@ -37,6 +37,7 @@ from prometheus_client import Gauge
from twisted.internet import defer
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.metrics import SERVER_NAME_LABEL
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util import Clock
@@ -49,19 +50,19 @@ R = TypeVar("R")
number_queued = Gauge(
"synapse_util_batching_queue_number_queued",
"The number of items waiting in the queue across all keys",
labelnames=("name",),
labelnames=("name", SERVER_NAME_LABEL),
)
number_in_flight = Gauge(
"synapse_util_batching_queue_number_pending",
"The number of items across all keys either being processed or waiting in a queue",
labelnames=("name",),
labelnames=("name", SERVER_NAME_LABEL),
)
number_of_keys = Gauge(
"synapse_util_batching_queue_number_of_keys",
"The number of distinct keys that have items queued",
labelnames=("name",),
labelnames=("name", SERVER_NAME_LABEL),
)
@@ -114,14 +115,18 @@ class BatchingQueue(Generic[V, R]):
# The function to call with batches of values.
self._process_batch_callback = process_batch_callback
number_queued.labels(self._name).set_function(
lambda: sum(len(q) for q in self._next_values.values())
number_queued.labels(
name=self._name, **{SERVER_NAME_LABEL: self.server_name}
).set_function(lambda: sum(len(q) for q in self._next_values.values()))
number_of_keys.labels(
name=self._name, **{SERVER_NAME_LABEL: self.server_name}
).set_function(lambda: len(self._next_values))
self._number_in_flight_metric: Gauge = number_in_flight.labels(
name=self._name, **{SERVER_NAME_LABEL: self.server_name}
)
number_of_keys.labels(self._name).set_function(lambda: len(self._next_values))
self._number_in_flight_metric: Gauge = number_in_flight.labels(self._name)
async def add_to_queue(self, value: V, key: Hashable = ()) -> R:
"""Adds the value to the queue with the given key, returning the result
of the processing function for the batch that included the given value.

View File

@@ -43,6 +43,7 @@ from prometheus_client import Gauge
from twisted.internet import defer
from twisted.python.failure import Failure
from synapse.metrics import SERVER_NAME_LABEL
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
@@ -50,7 +51,7 @@ from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
cache_pending_metric = Gauge(
"synapse_util_caches_cache_pending",
"Number of lookups currently pending for this cache",
["name"],
labelnames=["name", SERVER_NAME_LABEL],
)
T = TypeVar("T")
@@ -111,7 +112,9 @@ class DeferredCache(Generic[KT, VT]):
] = cache_type()
def metrics_cb() -> None:
cache_pending_metric.labels(name).set(len(self._pending_deferred_cache))
cache_pending_metric.labels(
name=name, **{SERVER_NAME_LABEL: server_name}
).set(len(self._pending_deferred_cache))
# cache is used for completed results and maps to the result itself, rather than
# a Deferred.

View File

@@ -702,6 +702,7 @@ def make_fake_db_pool(
reactor: ISynapseReactor,
db_config: DatabaseConnectionConfig,
engine: BaseDatabaseEngine,
server_name: str,
) -> adbapi.ConnectionPool:
"""Wrapper for `make_pool` which builds a pool which runs db queries synchronously.
@@ -710,7 +711,9 @@ def make_fake_db_pool(
is a drop-in replacement for the normal `make_pool` which builds such a connection
pool.
"""
pool = make_pool(reactor, db_config, engine)
pool = make_pool(
reactor=reactor, db_config=db_config, engine=engine, server_name=server_name
)
def runWithConnection(
func: Callable[..., R], *args: Any, **kwargs: Any

View File

@@ -42,9 +42,9 @@ class BatchingQueueTestCase(TestCase):
# We ensure that we remove any existing metrics for "test_queue".
try:
number_queued.remove("test_queue")
number_of_keys.remove("test_queue")
number_in_flight.remove("test_queue")
number_queued.remove("test_queue", "test_server")
number_of_keys.remove("test_queue", "test_server")
number_in_flight.remove("test_queue", "test_server")
except KeyError:
pass