Compare commits

...

2 Commits

Author SHA1 Message Date
dependabot[bot]
f7103be392 Bump gitpython from 3.1.44 to 3.1.45
Bumps [gitpython](https://github.com/gitpython-developers/GitPython) from 3.1.44 to 3.1.45.
- [Release notes](https://github.com/gitpython-developers/GitPython/releases)
- [Changelog](https://github.com/gitpython-developers/GitPython/blob/main/CHANGES)
- [Commits](https://github.com/gitpython-developers/GitPython/compare/3.1.44...3.1.45)

---
updated-dependencies:
- dependency-name: gitpython
  dependency-version: 3.1.45
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-07-28 16:15:46 +00:00
Eric Eastwood
2c236be058 Refactor Counter metrics to be homeserver-scoped (#18656)
Bulk refactor `Counter` metrics to be homeserver-scoped. We also add
lints to make sure that new `Counter` metrics don't sneak in without
using the `server_name` label (`SERVER_NAME_LABEL`).

All of the "Fill in" commits are just bulk refactor.

Part of https://github.com/element-hq/synapse/issues/18592



### Testing strategy

 1. Add the `metrics` listener in your `homeserver.yaml`
    ```yaml
    listeners:
      # This is just showing how to configure metrics either way
      #
      # `http` `metrics` resource
      - port: 9322
        type: http
        bind_addresses: ['127.0.0.1']
        resources:
          - names: [metrics]
            compress: false
      # `metrics` listener
      - port: 9323
        type: metrics
        bind_addresses: ['127.0.0.1']
    ```
1. Start the homeserver: `poetry run synapse_homeserver --config-path
homeserver.yaml`
1. Fetch `http://localhost:9322/_synapse/metrics` and/or
`http://localhost:9323/metrics`
1. Observe response includes the `synapse_user_registrations_total`,
`synapse_http_server_response_count_total`, etc metrics with the
`server_name` label
2025-07-25 14:58:47 -05:00
44 changed files with 779 additions and 265 deletions

1
changelog.d/18656.misc Normal file
View File

@@ -0,0 +1 @@
Refactor `Counter` metrics to be homeserver-scoped.

51
poetry.lock generated
View File

@@ -39,7 +39,7 @@ description = "The ultimate Python library in building OAuth and OpenID Connect
optional = true
python-versions = ">=3.9"
groups = ["main"]
markers = "extra == \"oidc\" or extra == \"jwt\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"jwt\" or extra == \"oidc\""
files = [
{file = "authlib-1.6.1-py2.py3-none-any.whl", hash = "sha256:e9d2031c34c6309373ab845afc24168fe9e93dc52d252631f52642f21f5ed06e"},
{file = "authlib-1.6.1.tar.gz", hash = "sha256:4dffdbb1460ba6ec8c17981a4c67af7d8af131231b5a36a88a1e8c80c111cdfd"},
@@ -435,7 +435,7 @@ description = "XML bomb protection for Python stdlib modules"
optional = true
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
groups = ["main"]
markers = "extra == \"saml2\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"saml2\""
files = [
{file = "defusedxml-0.7.1-py2.py3-none-any.whl", hash = "sha256:a352e7e428770286cc899e2542b6cdaedb2b4953ff269a210103ec58f6198a61"},
{file = "defusedxml-0.7.1.tar.gz", hash = "sha256:1bb3032db185915b62d7c6209c5a8792be6a32ab2fedacc84e01b52c51aa3e69"},
@@ -478,7 +478,7 @@ description = "XPath 1.0/2.0/3.0/3.1 parsers and selectors for ElementTree and l
optional = true
python-versions = ">=3.7"
groups = ["main"]
markers = "extra == \"saml2\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"saml2\""
files = [
{file = "elementpath-4.1.5-py3-none-any.whl", hash = "sha256:2ac1a2fb31eb22bbbf817f8cf6752f844513216263f0e3892c8e79782fe4bb55"},
{file = "elementpath-4.1.5.tar.gz", hash = "sha256:c2d6dc524b29ef751ecfc416b0627668119d8812441c555d7471da41d4bacb8d"},
@@ -504,18 +504,19 @@ smmap = ">=3.0.1,<6"
[[package]]
name = "gitpython"
version = "3.1.44"
version = "3.1.45"
description = "GitPython is a Python library used to interact with Git repositories"
optional = false
python-versions = ">=3.7"
groups = ["dev"]
files = [
{file = "GitPython-3.1.44-py3-none-any.whl", hash = "sha256:9e0e10cda9bed1ee64bc9a6de50e7e38a9c9943241cd7f585f6df3ed28011110"},
{file = "gitpython-3.1.44.tar.gz", hash = "sha256:c87e30b26253bf5418b01b0660f818967f3c503193838337fe5e573331249269"},
{file = "gitpython-3.1.45-py3-none-any.whl", hash = "sha256:8908cb2e02fb3b93b7eb0f2827125cb699869470432cc885f019b8fd0fccff77"},
{file = "gitpython-3.1.45.tar.gz", hash = "sha256:85b0ee964ceddf211c41b9f27a49086010a190fd8132a24e21f362a4b36a791c"},
]
[package.dependencies]
gitdb = ">=4.0.1,<5"
typing-extensions = {version = ">=3.10.0.2", markers = "python_version < \"3.10\""}
[package.extras]
doc = ["sphinx (>=7.1.2,<7.2)", "sphinx-autodoc-typehints", "sphinx_rtd_theme"]
@@ -528,7 +529,7 @@ description = "Python wrapper for hiredis"
optional = true
python-versions = ">=3.8"
groups = ["main"]
markers = "extra == \"redis\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"redis\""
files = [
{file = "hiredis-3.2.1-cp310-cp310-macosx_10_15_universal2.whl", hash = "sha256:add17efcbae46c5a6a13b244ff0b4a8fa079602ceb62290095c941b42e9d5dec"},
{file = "hiredis-3.2.1-cp310-cp310-macosx_10_15_x86_64.whl", hash = "sha256:5fe955cc4f66c57df1ae8e5caf4de2925d43b5efab4e40859662311d1bcc5f54"},
@@ -865,7 +866,7 @@ description = "Jaeger Python OpenTracing Tracer implementation"
optional = true
python-versions = ">=3.7"
groups = ["main"]
markers = "extra == \"opentracing\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"opentracing\""
files = [
{file = "jaeger-client-4.8.0.tar.gz", hash = "sha256:3157836edab8e2c209bd2d6ae61113db36f7ee399e66b1dcbb715d87ab49bfe0"},
]
@@ -1003,7 +1004,7 @@ description = "A strictly RFC 4510 conforming LDAP V3 pure Python client library
optional = true
python-versions = "*"
groups = ["main"]
markers = "extra == \"matrix-synapse-ldap3\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"matrix-synapse-ldap3\""
files = [
{file = "ldap3-2.9.1-py2.py3-none-any.whl", hash = "sha256:5869596fc4948797020d3f03b7939da938778a0f9e2009f7a072ccf92b8e8d70"},
{file = "ldap3-2.9.1.tar.gz", hash = "sha256:f3e7fc4718e3f09dda568b57100095e0ce58633bcabbed8667ce3f8fbaa4229f"},
@@ -1019,7 +1020,7 @@ description = "Powerful and Pythonic XML processing library combining libxml2/li
optional = true
python-versions = ">=3.8"
groups = ["main"]
markers = "extra == \"url-preview\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"url-preview\""
files = [
{file = "lxml-6.0.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:35bc626eec405f745199200ccb5c6b36f202675d204aa29bb52e27ba2b71dea8"},
{file = "lxml-6.0.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:246b40f8a4aec341cbbf52617cad8ab7c888d944bfe12a6abd2b1f6cfb6f6082"},
@@ -1260,7 +1261,7 @@ description = "An LDAP3 auth provider for Synapse"
optional = true
python-versions = ">=3.7"
groups = ["main"]
markers = "extra == \"matrix-synapse-ldap3\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"matrix-synapse-ldap3\""
files = [
{file = "matrix-synapse-ldap3-0.3.0.tar.gz", hash = "sha256:8bb6517173164d4b9cc44f49de411d8cebdb2e705d5dd1ea1f38733c4a009e1d"},
{file = "matrix_synapse_ldap3-0.3.0-py3-none-any.whl", hash = "sha256:8b4d701f8702551e98cc1d8c20dbed532de5613584c08d0df22de376ba99159d"},
@@ -1493,7 +1494,7 @@ description = "OpenTracing API for Python. See documentation at http://opentraci
optional = true
python-versions = "*"
groups = ["main"]
markers = "extra == \"opentracing\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"opentracing\""
files = [
{file = "opentracing-2.4.0.tar.gz", hash = "sha256:a173117e6ef580d55874734d1fa7ecb6f3655160b8b8974a2a1e98e5ec9c840d"},
]
@@ -1699,7 +1700,7 @@ description = "psycopg2 - Python-PostgreSQL Database Adapter"
optional = true
python-versions = ">=3.8"
groups = ["main"]
markers = "extra == \"postgres\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"postgres\""
files = [
{file = "psycopg2-2.9.10-cp310-cp310-win32.whl", hash = "sha256:5df2b672140f95adb453af93a7d669d7a7bf0a56bcd26f1502329166f4a61716"},
{file = "psycopg2-2.9.10-cp310-cp310-win_amd64.whl", hash = "sha256:c6f7b8561225f9e711a9c47087388a97fdc948211c10a4bccbf0ba68ab7b3b5a"},
@@ -1720,7 +1721,7 @@ description = ".. image:: https://travis-ci.org/chtd/psycopg2cffi.svg?branch=mas
optional = true
python-versions = "*"
groups = ["main"]
markers = "platform_python_implementation == \"PyPy\" and (extra == \"postgres\" or extra == \"all\")"
markers = "platform_python_implementation == \"PyPy\" and (extra == \"all\" or extra == \"postgres\")"
files = [
{file = "psycopg2cffi-2.9.0.tar.gz", hash = "sha256:7e272edcd837de3a1d12b62185eb85c45a19feda9e62fa1b120c54f9e8d35c52"},
]
@@ -1736,7 +1737,7 @@ description = "A Simple library to enable psycopg2 compatability"
optional = true
python-versions = "*"
groups = ["main"]
markers = "platform_python_implementation == \"PyPy\" and (extra == \"postgres\" or extra == \"all\")"
markers = "platform_python_implementation == \"PyPy\" and (extra == \"all\" or extra == \"postgres\")"
files = [
{file = "psycopg2cffi-compat-1.1.tar.gz", hash = "sha256:d25e921748475522b33d13420aad5c2831c743227dc1f1f2585e0fdb5c914e05"},
]
@@ -1996,7 +1997,7 @@ description = "A development tool to measure, monitor and analyze the memory beh
optional = true
python-versions = ">=3.6"
groups = ["main"]
markers = "extra == \"cache-memory\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"cache-memory\""
files = [
{file = "Pympler-1.0.1-py3-none-any.whl", hash = "sha256:d260dda9ae781e1eab6ea15bacb84015849833ba5555f141d2d9b7b7473b307d"},
{file = "Pympler-1.0.1.tar.gz", hash = "sha256:993f1a3599ca3f4fcd7160c7545ad06310c9e12f70174ae7ae8d4e25f6c5d3fa"},
@@ -2056,7 +2057,7 @@ description = "Python implementation of SAML Version 2 Standard"
optional = true
python-versions = ">=3.9,<4.0"
groups = ["main"]
markers = "extra == \"saml2\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"saml2\""
files = [
{file = "pysaml2-7.5.0-py3-none-any.whl", hash = "sha256:bc6627cc344476a83c757f440a73fda1369f13b6fda1b4e16bca63ffbabb5318"},
{file = "pysaml2-7.5.0.tar.gz", hash = "sha256:f36871d4e5ee857c6b85532e942550d2cf90ea4ee943d75eb681044bbc4f54f7"},
@@ -2081,7 +2082,7 @@ description = "Extensions to the standard Python datetime module"
optional = true
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7"
groups = ["main"]
markers = "extra == \"saml2\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"saml2\""
files = [
{file = "python-dateutil-2.8.2.tar.gz", hash = "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86"},
{file = "python_dateutil-2.8.2-py2.py3-none-any.whl", hash = "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9"},
@@ -2109,7 +2110,7 @@ description = "World timezone definitions, modern and historical"
optional = true
python-versions = "*"
groups = ["main"]
markers = "extra == \"saml2\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"saml2\""
files = [
{file = "pytz-2022.7.1-py2.py3-none-any.whl", hash = "sha256:78f4f37d8198e0627c5f1143240bb0206b8691d8d7ac6d78fee88b78733f8c4a"},
{file = "pytz-2022.7.1.tar.gz", hash = "sha256:01a0681c4b9684a28304615eba55d1ab31ae00bf68ec157ec3708a8182dbbcd0"},
@@ -2474,7 +2475,7 @@ description = "Python client for Sentry (https://sentry.io)"
optional = true
python-versions = ">=3.6"
groups = ["main"]
markers = "extra == \"sentry\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"sentry\""
files = [
{file = "sentry_sdk-2.32.0-py2.py3-none-any.whl", hash = "sha256:6cf51521b099562d7ce3606da928c473643abe99b00ce4cb5626ea735f4ec345"},
{file = "sentry_sdk-2.32.0.tar.gz", hash = "sha256:9016c75d9316b0f6921ac14c8cd4fb938f26002430ac5be9945ab280f78bec6b"},
@@ -2662,7 +2663,7 @@ description = "Tornado IOLoop Backed Concurrent Futures"
optional = true
python-versions = "*"
groups = ["main"]
markers = "extra == \"opentracing\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"opentracing\""
files = [
{file = "threadloop-1.0.2-py2-none-any.whl", hash = "sha256:5c90dbefab6ffbdba26afb4829d2a9df8275d13ac7dc58dccb0e279992679599"},
{file = "threadloop-1.0.2.tar.gz", hash = "sha256:8b180aac31013de13c2ad5c834819771992d350267bddb854613ae77ef571944"},
@@ -2678,7 +2679,7 @@ description = "Python bindings for the Apache Thrift RPC system"
optional = true
python-versions = "*"
groups = ["main"]
markers = "extra == \"opentracing\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"opentracing\""
files = [
{file = "thrift-0.16.0.tar.gz", hash = "sha256:2b5b6488fcded21f9d312aa23c9ff6a0195d0f6ae26ddbd5ad9e3e25dfc14408"},
]
@@ -2740,7 +2741,7 @@ description = "Tornado is a Python web framework and asynchronous networking lib
optional = true
python-versions = ">=3.9"
groups = ["main"]
markers = "extra == \"opentracing\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"opentracing\""
files = [
{file = "tornado-6.5-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:f81067dad2e4443b015368b24e802d0083fecada4f0a4572fdb72fc06e54a9a6"},
{file = "tornado-6.5-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:9ac1cbe1db860b3cbb251e795c701c41d343f06a96049d6274e7c77559117e41"},
@@ -2877,7 +2878,7 @@ description = "non-blocking redis client for python"
optional = true
python-versions = "*"
groups = ["main"]
markers = "extra == \"redis\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"redis\""
files = [
{file = "txredisapi-1.4.11-py3-none-any.whl", hash = "sha256:ac64d7a9342b58edca13ef267d4fa7637c1aa63f8595e066801c1e8b56b22d0b"},
{file = "txredisapi-1.4.11.tar.gz", hash = "sha256:3eb1af99aefdefb59eb877b1dd08861efad60915e30ad5bf3d5bf6c5cedcdbc6"},
@@ -3208,7 +3209,7 @@ description = "An XML Schema validator and decoder"
optional = true
python-versions = ">=3.7"
groups = ["main"]
markers = "extra == \"saml2\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"saml2\""
files = [
{file = "xmlschema-2.4.0-py3-none-any.whl", hash = "sha256:dc87be0caaa61f42649899189aab2fd8e0d567f2cf548433ba7b79278d231a4a"},
{file = "xmlschema-2.4.0.tar.gz", hash = "sha256:d74cd0c10866ac609e1ef94a5a69b018ad16e39077bc6393408b40c6babee793"},

View File

@@ -28,8 +28,13 @@ from typing import Callable, Optional, Tuple, Type, Union
import mypy.types
from mypy.erasetype import remove_instance_last_known_values
from mypy.errorcodes import ErrorCode
from mypy.nodes import ARG_NAMED_OPT, TempNode, Var
from mypy.plugin import FunctionSigContext, MethodSigContext, Plugin
from mypy.nodes import ARG_NAMED_OPT, ListExpr, NameExpr, TempNode, Var
from mypy.plugin import (
FunctionLike,
FunctionSigContext,
MethodSigContext,
Plugin,
)
from mypy.typeops import bind_self
from mypy.types import (
AnyType,
@@ -43,8 +48,26 @@ from mypy.types import (
UnionType,
)
PROMETHEUS_METRIC_MISSING_SERVER_NAME_LABEL = ErrorCode(
"missing-server-name-label",
"`SERVER_NAME_LABEL` required in metric",
category="per-homeserver-tenant-metrics",
)
class SynapsePlugin(Plugin):
def get_function_signature_hook(
self, fullname: str
) -> Optional[Callable[[FunctionSigContext], FunctionLike]]:
if fullname in (
"prometheus_client.metrics.Counter",
# TODO: Add other prometheus_client metrics that need checking as we
# refactor, see https://github.com/element-hq/synapse/issues/18592
):
return check_prometheus_metric_instantiation
return None
def get_method_signature_hook(
self, fullname: str
) -> Optional[Callable[[MethodSigContext], CallableType]]:
@@ -65,6 +88,85 @@ class SynapsePlugin(Plugin):
return None
def check_prometheus_metric_instantiation(ctx: FunctionSigContext) -> CallableType:
"""
Ensure that the `prometheus_client` metrics include the `SERVER_NAME_LABEL` label
when instantiated.
This is important because we support multiple Synapse instances running in the same
process, where all metrics share a single global `REGISTRY`. The `server_name` label
ensures metrics are correctly separated by homeserver.
There are also some metrics that apply at the process level, such as CPU usage,
Python garbage collection, Twisted reactor tick time which shouldn't have the
`SERVER_NAME_LABEL`. In those cases, use use a type ignore comment to disable the
check, e.g. `# type: ignore[missing-server-name-label]`.
"""
# The true signature, this isn't being modified so this is what will be returned.
signature: CallableType = ctx.default_signature
# Sanity check the arguments are still as expected in this version of
# `prometheus_client`. ex. `Counter(name, documentation, labelnames, ...)`
#
# `signature.arg_names` should be: ["name", "documentation", "labelnames", ...]
if len(signature.arg_names) < 3 or signature.arg_names[2] != "labelnames":
ctx.api.fail(
f"Expected the 3rd argument of {signature.name} to be 'labelnames', but got "
f"{signature.arg_names[2]}",
ctx.context,
)
return signature
# Ensure mypy is passing the correct number of arguments because we are doing some
# dirty indexing into `ctx.args` later on.
assert len(ctx.args) == len(signature.arg_names), (
f"Expected the list of arguments in the {signature.name} signature ({len(signature.arg_names)})"
f"to match the number of arguments from the function signature context ({len(ctx.args)})"
)
# Check if the `labelnames` argument includes `SERVER_NAME_LABEL`
#
# `ctx.args` should look like this:
# ```
# [
# [StrExpr("name")],
# [StrExpr("documentation")],
# [ListExpr([StrExpr("label1"), StrExpr("label2")])]
# ...
# ]
# ```
labelnames_arg_expression = ctx.args[2][0] if len(ctx.args[2]) > 0 else None
if isinstance(labelnames_arg_expression, ListExpr):
# Check if the `labelnames` argument includes the `server_name` label (`SERVER_NAME_LABEL`).
for labelname_expression in labelnames_arg_expression.items:
if (
isinstance(labelname_expression, NameExpr)
and labelname_expression.fullname == "synapse.metrics.SERVER_NAME_LABEL"
):
# Found the `SERVER_NAME_LABEL`, all good!
break
else:
ctx.api.fail(
f"Expected {signature.name} to include `SERVER_NAME_LABEL` in the list of labels. "
"If this is a process-level metric (vs homeserver-level), use a type ignore comment "
"to disable this check.",
ctx.context,
code=PROMETHEUS_METRIC_MISSING_SERVER_NAME_LABEL,
)
else:
ctx.api.fail(
f"Expected the `labelnames` argument of {signature.name} to be a list of label names "
f"(including `SERVER_NAME_LABEL`), but got {labelnames_arg_expression}. "
"If this is a process-level metric (vs homeserver-level), use a type ignore comment "
"to disable this check.",
ctx.context,
code=PROMETHEUS_METRIC_MISSING_SERVER_NAME_LABEL,
)
return signature
return signature
def _get_true_return_type(signature: CallableType) -> mypy.types.Type:
"""
Get the "final" return type of a callable which might return an Awaitable/Deferred.

View File

@@ -48,6 +48,7 @@ from synapse.events import EventBase
from synapse.events.utils import SerializeEventConfig, serialize_event
from synapse.http.client import SimpleHttpClient, is_unknown_endpoint
from synapse.logging import opentracing
from synapse.metrics import SERVER_NAME_LABEL
from synapse.types import DeviceListUpdates, JsonDict, JsonMapping, ThirdPartyInstanceID
from synapse.util.caches.response_cache import ResponseCache
@@ -59,29 +60,31 @@ logger = logging.getLogger(__name__)
sent_transactions_counter = Counter(
"synapse_appservice_api_sent_transactions",
"Number of /transactions/ requests sent",
["service"],
labelnames=["service", SERVER_NAME_LABEL],
)
failed_transactions_counter = Counter(
"synapse_appservice_api_failed_transactions",
"Number of /transactions/ requests that failed to send",
["service"],
labelnames=["service", SERVER_NAME_LABEL],
)
sent_events_counter = Counter(
"synapse_appservice_api_sent_events", "Number of events sent to the AS", ["service"]
"synapse_appservice_api_sent_events",
"Number of events sent to the AS",
labelnames=["service", SERVER_NAME_LABEL],
)
sent_ephemeral_counter = Counter(
"synapse_appservice_api_sent_ephemeral",
"Number of ephemeral events sent to the AS",
["service"],
labelnames=["service", SERVER_NAME_LABEL],
)
sent_todevice_counter = Counter(
"synapse_appservice_api_sent_todevice",
"Number of todevice messages sent to the AS",
["service"],
labelnames=["service", SERVER_NAME_LABEL],
)
HOUR_IN_MS = 60 * 60 * 1000
@@ -382,6 +385,7 @@ class ApplicationServiceApi(SimpleHttpClient):
"left": list(device_list_summary.left),
}
labels = {"service": service.id, SERVER_NAME_LABEL: self.server_name}
try:
args = None
if self.config.use_appservice_legacy_authorization:
@@ -399,10 +403,10 @@ class ApplicationServiceApi(SimpleHttpClient):
service.url,
[event.get("event_id") for event in events],
)
sent_transactions_counter.labels(service.id).inc()
sent_events_counter.labels(service.id).inc(len(serialized_events))
sent_ephemeral_counter.labels(service.id).inc(len(ephemeral))
sent_todevice_counter.labels(service.id).inc(len(to_device_messages))
sent_transactions_counter.labels(**labels).inc()
sent_events_counter.labels(**labels).inc(len(serialized_events))
sent_ephemeral_counter.labels(**labels).inc(len(ephemeral))
sent_todevice_counter.labels(**labels).inc(len(to_device_messages))
return True
except CodeMessageException as e:
logger.warning(
@@ -421,7 +425,7 @@ class ApplicationServiceApi(SimpleHttpClient):
ex.args,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
failed_transactions_counter.labels(service.id).inc()
failed_transactions_counter.labels(**labels).inc()
return False
async def claim_client_keys(

View File

@@ -74,6 +74,7 @@ from synapse.federation.transport.client import SendJoinResponse
from synapse.http.client import is_unknown_endpoint
from synapse.http.types import QueryParams
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, tag_args, trace
from synapse.metrics import SERVER_NAME_LABEL
from synapse.types import JsonDict, StrCollection, UserID, get_domain_from_id
from synapse.types.handlers.policy_server import RECOMMENDATION_OK, RECOMMENDATION_SPAM
from synapse.util.async_helpers import concurrently_execute
@@ -85,7 +86,9 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["type"])
sent_queries_counter = Counter(
"synapse_federation_client_sent_queries", "", labelnames=["type", SERVER_NAME_LABEL]
)
PDU_RETRY_TIME_MS = 1 * 60 * 1000
@@ -209,7 +212,10 @@ class FederationClient(FederationBase):
Returns:
The JSON object from the response
"""
sent_queries_counter.labels(query_type).inc()
sent_queries_counter.labels(
type=query_type,
**{SERVER_NAME_LABEL: self.server_name},
).inc()
return await self.transport_layer.make_query(
destination,
@@ -231,7 +237,10 @@ class FederationClient(FederationBase):
Returns:
The JSON object from the response
"""
sent_queries_counter.labels("client_device_keys").inc()
sent_queries_counter.labels(
type="client_device_keys",
**{SERVER_NAME_LABEL: self.server_name},
).inc()
return await self.transport_layer.query_client_keys(
destination, content, timeout
)
@@ -242,7 +251,10 @@ class FederationClient(FederationBase):
"""Query the device keys for a list of user ids hosted on a remote
server.
"""
sent_queries_counter.labels("user_devices").inc()
sent_queries_counter.labels(
type="user_devices",
**{SERVER_NAME_LABEL: self.server_name},
).inc()
return await self.transport_layer.query_user_devices(
destination, user_id, timeout
)
@@ -264,7 +276,10 @@ class FederationClient(FederationBase):
Returns:
The JSON object from the response
"""
sent_queries_counter.labels("client_one_time_keys").inc()
sent_queries_counter.labels(
type="client_one_time_keys",
**{SERVER_NAME_LABEL: self.server_name},
).inc()
# Convert the query with counts into a stable and unstable query and check
# if attempting to claim more than 1 OTK.

View File

@@ -82,6 +82,7 @@ from synapse.logging.opentracing import (
tag_args,
trace,
)
from synapse.metrics import SERVER_NAME_LABEL
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
@@ -104,12 +105,18 @@ TRANSACTION_CONCURRENCY_LIMIT = 10
logger = logging.getLogger(__name__)
received_pdus_counter = Counter("synapse_federation_server_received_pdus", "")
received_pdus_counter = Counter(
"synapse_federation_server_received_pdus", "", labelnames=[SERVER_NAME_LABEL]
)
received_edus_counter = Counter("synapse_federation_server_received_edus", "")
received_edus_counter = Counter(
"synapse_federation_server_received_edus", "", labelnames=[SERVER_NAME_LABEL]
)
received_queries_counter = Counter(
"synapse_federation_server_received_queries", "", ["type"]
"synapse_federation_server_received_queries",
"",
labelnames=["type", SERVER_NAME_LABEL],
)
pdu_process_time = Histogram(
@@ -434,7 +441,9 @@ class FederationServer(FederationBase):
report back to the sending server.
"""
received_pdus_counter.inc(len(transaction.pdus))
received_pdus_counter.labels(**{SERVER_NAME_LABEL: self.server_name}).inc(
len(transaction.pdus)
)
origin_host, _ = parse_server_name(origin)
@@ -553,7 +562,7 @@ class FederationServer(FederationBase):
"""Process the EDUs in a received transaction."""
async def _process_edu(edu_dict: JsonDict) -> None:
received_edus_counter.inc()
received_edus_counter.labels(**{SERVER_NAME_LABEL: self.server_name}).inc()
edu = Edu(
origin=origin,
@@ -668,7 +677,10 @@ class FederationServer(FederationBase):
async def on_query_request(
self, query_type: str, args: Dict[str, str]
) -> Tuple[int, Dict[str, Any]]:
received_queries_counter.labels(query_type).inc()
received_queries_counter.labels(
type=query_type,
**{SERVER_NAME_LABEL: self.server_name},
).inc()
resp = await self.registry.on_query(query_type, args)
return 200, resp

View File

@@ -160,6 +160,7 @@ from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import (
SERVER_NAME_LABEL,
LaterGauge,
event_processing_loop_counter,
event_processing_loop_room_count,
@@ -189,11 +190,13 @@ logger = logging.getLogger(__name__)
sent_pdus_destination_dist_count = Counter(
"synapse_federation_client_sent_pdu_destinations_count",
"Number of PDUs queued for sending to one or more destinations",
labelnames=[SERVER_NAME_LABEL],
)
sent_pdus_destination_dist_total = Counter(
"synapse_federation_client_sent_pdu_destinations",
"Total number of PDUs queued for sending across all destinations",
labelnames=[SERVER_NAME_LABEL],
)
# Time (in s) to wait before trying to wake up destinations that have
@@ -708,13 +711,19 @@ class FederationSender(AbstractFederationSender):
"federation_sender"
).set(ts)
events_processed_counter.inc(len(event_entries))
events_processed_counter.labels(
**{SERVER_NAME_LABEL: self.server_name}
).inc(len(event_entries))
event_processing_loop_room_count.labels("federation_sender").inc(
len(events_by_room)
)
event_processing_loop_room_count.labels(
name="federation_sender",
**{SERVER_NAME_LABEL: self.server_name},
).inc(len(events_by_room))
event_processing_loop_counter.labels("federation_sender").inc()
event_processing_loop_counter.labels(
name="federation_sender",
**{SERVER_NAME_LABEL: self.server_name},
).inc()
synapse.metrics.event_processing_positions.labels(
"federation_sender"
@@ -735,8 +744,12 @@ class FederationSender(AbstractFederationSender):
if not destinations:
return
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()
sent_pdus_destination_dist_total.labels(
**{SERVER_NAME_LABEL: self.server_name}
).inc(len(destinations))
sent_pdus_destination_dist_count.labels(
**{SERVER_NAME_LABEL: self.server_name}
).inc()
assert pdu.internal_metadata.stream_ordering

View File

@@ -40,7 +40,7 @@ from synapse.federation.units import Edu
from synapse.handlers.presence import format_user_presence_state
from synapse.logging import issue9533_logger
from synapse.logging.opentracing import SynapseTags, set_tag
from synapse.metrics import sent_transactions_counter
from synapse.metrics import SERVER_NAME_LABEL, sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import JsonDict, ReadReceipt
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
@@ -56,13 +56,15 @@ logger = logging.getLogger(__name__)
sent_edus_counter = Counter(
"synapse_federation_client_sent_edus", "Total number of EDUs successfully sent"
"synapse_federation_client_sent_edus",
"Total number of EDUs successfully sent",
labelnames=[SERVER_NAME_LABEL],
)
sent_edus_by_type = Counter(
"synapse_federation_client_sent_edus_by_type",
"Number of sent EDUs successfully sent, by event type",
["type"],
labelnames=["type", SERVER_NAME_LABEL],
)
@@ -368,10 +370,17 @@ class PerDestinationQueue:
self._destination, pending_pdus, pending_edus
)
sent_transactions_counter.inc()
sent_edus_counter.inc(len(pending_edus))
sent_transactions_counter.labels(
**{SERVER_NAME_LABEL: self.server_name}
).inc()
sent_edus_counter.labels(
**{SERVER_NAME_LABEL: self.server_name}
).inc(len(pending_edus))
for edu in pending_edus:
sent_edus_by_type.labels(edu.edu_type).inc()
sent_edus_by_type.labels(
type=edu.edu_type,
**{SERVER_NAME_LABEL: self.server_name},
).inc()
except NotRetryingDestination as e:
logger.debug(
@@ -596,7 +605,9 @@ class PerDestinationQueue:
self._destination, room_catchup_pdus, []
)
sent_transactions_counter.inc()
sent_transactions_counter.labels(
**{SERVER_NAME_LABEL: self.server_name}
).inc()
# We pulled this from the DB, so it'll be non-null
assert pdu.internal_metadata.stream_ordering

View File

@@ -42,6 +42,7 @@ from synapse.events import EventBase
from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import (
SERVER_NAME_LABEL,
event_processing_loop_counter,
event_processing_loop_room_count,
)
@@ -68,7 +69,9 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
events_processed_counter = Counter(
"synapse_handlers_appservice_events_processed", "", labelnames=[SERVER_NAME_LABEL]
)
class ApplicationServicesHandler:
@@ -207,13 +210,19 @@ class ApplicationServicesHandler:
"appservice_sender"
).set(upper_bound)
events_processed_counter.inc(len(events))
events_processed_counter.labels(
**{SERVER_NAME_LABEL: self.server_name}
).inc(len(events))
event_processing_loop_room_count.labels("appservice_sender").inc(
len(events_by_room)
)
event_processing_loop_room_count.labels(
name="appservice_sender",
**{SERVER_NAME_LABEL: self.server_name},
).inc(len(events_by_room))
event_processing_loop_counter.labels("appservice_sender").inc()
event_processing_loop_counter.labels(
name="appservice_sender",
**{SERVER_NAME_LABEL: self.server_name},
).inc()
if events:
now = self.clock.time_msec()

View File

@@ -70,6 +70,7 @@ from synapse.http import get_request_user_agent
from synapse.http.server import finish_request, respond_with_html
from synapse.http.site import SynapseRequest
from synapse.logging.context import defer_to_thread
from synapse.metrics import SERVER_NAME_LABEL
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases.main.registration import (
LoginTokenExpired,
@@ -95,7 +96,7 @@ INVALID_USERNAME_OR_PASSWORD = "Invalid username or password"
invalid_login_token_counter = Counter(
"synapse_user_login_invalid_login_tokens",
"Counts the number of rejected m.login.token on /login",
["reason"],
labelnames=["reason", SERVER_NAME_LABEL],
)
@@ -1478,11 +1479,20 @@ class AuthHandler:
try:
return await self.store.consume_login_token(login_token)
except LoginTokenExpired:
invalid_login_token_counter.labels("expired").inc()
invalid_login_token_counter.labels(
reason="expired",
**{SERVER_NAME_LABEL: self.server_name},
).inc()
except LoginTokenReused:
invalid_login_token_counter.labels("reused").inc()
invalid_login_token_counter.labels(
reason="reused",
**{SERVER_NAME_LABEL: self.server_name},
).inc()
except NotFoundError:
invalid_login_token_counter.labels("not found").inc()
invalid_login_token_counter.labels(
reason="not found",
**{SERVER_NAME_LABEL: self.server_name},
).inc()
raise AuthError(403, "Invalid login token", errcode=Codes.FORBIDDEN)

View File

@@ -1444,6 +1444,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
def __init__(self, hs: "HomeServer", device_handler: DeviceWriterHandler):
super().__init__(hs)
self.server_name = hs.hostname
self.federation = hs.get_federation_client()
self.server_name = hs.hostname # nb must be called this for @measure_func
self.clock = hs.get_clock() # nb must be called this for @measure_func

View File

@@ -76,6 +76,7 @@ from synapse.logging.opentracing import (
tag_args,
trace,
)
from synapse.metrics import SERVER_NAME_LABEL
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
@@ -105,6 +106,7 @@ logger = logging.getLogger(__name__)
soft_failed_event_counter = Counter(
"synapse_federation_soft_failed_events_total",
"Events received over federation that we marked as soft_failed",
labelnames=[SERVER_NAME_LABEL],
)
# Added to debug performance and track progress on optimizations
@@ -2051,7 +2053,9 @@ class FederationEventHandler:
"hs": origin,
},
)
soft_failed_event_counter.inc()
soft_failed_event_counter.labels(
**{SERVER_NAME_LABEL: self.server_name}
).inc()
event.internal_metadata.soft_failed = True
async def _load_or_fetch_auth_events_for_event(

View File

@@ -105,7 +105,7 @@ from synapse.api.presence import UserDevicePresenceState, UserPresenceState
from synapse.appservice import ApplicationService
from synapse.events.presence_router import PresenceRouter
from synapse.logging.context import run_in_background
from synapse.metrics import LaterGauge
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
@@ -137,24 +137,40 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "")
notified_presence_counter = Counter(
"synapse_handler_presence_notified_presence", "", labelnames=[SERVER_NAME_LABEL]
)
federation_presence_out_counter = Counter(
"synapse_handler_presence_federation_presence_out", ""
"synapse_handler_presence_federation_presence_out",
"",
labelnames=[SERVER_NAME_LABEL],
)
presence_updates_counter = Counter(
"synapse_handler_presence_presence_updates", "", labelnames=[SERVER_NAME_LABEL]
)
timers_fired_counter = Counter(
"synapse_handler_presence_timers_fired", "", labelnames=[SERVER_NAME_LABEL]
)
presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "")
timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "")
federation_presence_counter = Counter(
"synapse_handler_presence_federation_presence", ""
"synapse_handler_presence_federation_presence", "", labelnames=[SERVER_NAME_LABEL]
)
bump_active_time_counter = Counter(
"synapse_handler_presence_bump_active_time", "", labelnames=[SERVER_NAME_LABEL]
)
bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "")
get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"])
get_updates_counter = Counter(
"synapse_handler_presence_get_updates", "", labelnames=["type", SERVER_NAME_LABEL]
)
notify_reason_counter = Counter(
"synapse_handler_presence_notify_reason", "", ["locality", "reason"]
"synapse_handler_presence_notify_reason",
"",
labelnames=["locality", "reason", SERVER_NAME_LABEL],
)
state_transition_counter = Counter(
"synapse_handler_presence_state_transition", "", ["locality", "from", "to"]
"synapse_handler_presence_state_transition",
"",
labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
)
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
@@ -668,7 +684,9 @@ class WorkerPresenceHandler(BasePresenceHandler):
old_state = self.user_to_current_state.get(new_state.user_id)
self.user_to_current_state[new_state.user_id] = new_state
is_mine = self.is_mine_id(new_state.user_id)
if not old_state or should_notify(old_state, new_state, is_mine):
if not old_state or should_notify(
old_state, new_state, is_mine, self.server_name
):
state_to_notify.append(new_state)
stream_id = token
@@ -977,6 +995,7 @@ class PresenceHandler(BasePresenceHandler):
prev_state,
new_state,
is_mine=self.is_mine_id(user_id),
our_server_name=self.server_name,
wheel_timer=self.wheel_timer,
now=now,
# When overriding disabled presence, don't kick off all the
@@ -996,10 +1015,14 @@ class PresenceHandler(BasePresenceHandler):
# TODO: We should probably ensure there are no races hereafter
presence_updates_counter.inc(len(new_states))
presence_updates_counter.labels(
**{SERVER_NAME_LABEL: self.server_name}
).inc(len(new_states))
if to_notify:
notified_presence_counter.inc(len(to_notify))
notified_presence_counter.labels(
**{SERVER_NAME_LABEL: self.server_name}
).inc(len(to_notify))
await self._persist_and_notify(list(to_notify.values()))
self.unpersisted_users_changes |= {s.user_id for s in new_states}
@@ -1018,7 +1041,9 @@ class PresenceHandler(BasePresenceHandler):
if user_id not in to_notify
}
if to_federation_ping:
federation_presence_out_counter.inc(len(to_federation_ping))
federation_presence_out_counter.labels(
**{SERVER_NAME_LABEL: self.server_name}
).inc(len(to_federation_ping))
hosts_to_states = await get_interested_remotes(
self.store,
@@ -1068,7 +1093,9 @@ class PresenceHandler(BasePresenceHandler):
for user_id in users_to_check
]
timers_fired_counter.inc(len(states))
timers_fired_counter.labels(**{SERVER_NAME_LABEL: self.server_name}).inc(
len(states)
)
# Set of user ID & device IDs which are currently syncing.
syncing_user_devices = {
@@ -1102,7 +1129,7 @@ class PresenceHandler(BasePresenceHandler):
user_id = user.to_string()
bump_active_time_counter.inc()
bump_active_time_counter.labels(**{SERVER_NAME_LABEL: self.server_name}).inc()
now = self.clock.time_msec()
@@ -1354,7 +1381,9 @@ class PresenceHandler(BasePresenceHandler):
updates.append(prev_state.copy_and_replace(**new_fields))
if updates:
federation_presence_counter.inc(len(updates))
federation_presence_counter.labels(
**{SERVER_NAME_LABEL: self.server_name}
).inc(len(updates))
await self._update_states(updates)
async def set_state(
@@ -1667,7 +1696,10 @@ class PresenceHandler(BasePresenceHandler):
def should_notify(
old_state: UserPresenceState, new_state: UserPresenceState, is_mine: bool
old_state: UserPresenceState,
new_state: UserPresenceState,
is_mine: bool,
our_server_name: str,
) -> bool:
"""Decides if a presence state change should be sent to interested parties."""
user_location = "remote"
@@ -1678,19 +1710,38 @@ def should_notify(
return False
if old_state.status_msg != new_state.status_msg:
notify_reason_counter.labels(user_location, "status_msg_change").inc()
notify_reason_counter.labels(
locality=user_location,
reason="status_msg_change",
**{SERVER_NAME_LABEL: our_server_name},
).inc()
return True
if old_state.state != new_state.state:
notify_reason_counter.labels(user_location, "state_change").inc()
notify_reason_counter.labels(
locality=user_location,
reason="state_change",
**{SERVER_NAME_LABEL: our_server_name},
).inc()
state_transition_counter.labels(
user_location, old_state.state, new_state.state
**{
"locality": user_location,
# `from` is a reserved word in Python so we have to label it this way if
# we want to use keyword args.
"from": old_state.state,
"to": new_state.state,
SERVER_NAME_LABEL: our_server_name,
},
).inc()
return True
if old_state.state == PresenceState.ONLINE:
if new_state.currently_active != old_state.currently_active:
notify_reason_counter.labels(user_location, "current_active_change").inc()
notify_reason_counter.labels(
locality=user_location,
reason="current_active_change",
**{SERVER_NAME_LABEL: our_server_name},
).inc()
return True
if (
@@ -1700,14 +1751,18 @@ def should_notify(
# Only notify about last active bumps if we're not currently active
if not new_state.currently_active:
notify_reason_counter.labels(
user_location, "last_active_change_online"
locality=user_location,
reason="last_active_change_online",
**{SERVER_NAME_LABEL: our_server_name},
).inc()
return True
elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# Always notify for a transition where last active gets bumped.
notify_reason_counter.labels(
user_location, "last_active_change_not_online"
locality=user_location,
reason="last_active_change_not_online",
**{SERVER_NAME_LABEL: our_server_name},
).inc()
return True
@@ -1774,6 +1829,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
self.server_name = hs.hostname
self.get_presence_handler = hs.get_presence_handler
self.get_presence_router = hs.get_presence_router
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.store = hs.get_datastores().main
@@ -1885,7 +1941,10 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
# If we have the full list of changes for presence we can
# simply check which ones share a room with the user.
get_updates_counter.labels("stream").inc()
get_updates_counter.labels(
type="stream",
**{SERVER_NAME_LABEL: self.server_name},
).inc()
sharing_users = await self.store.do_users_share_a_room(
user_id, updated_users
@@ -1898,7 +1957,10 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
else:
# Too many possible updates. Find all users we can see and check
# if any of them have changed.
get_updates_counter.labels("full").inc()
get_updates_counter.labels(
type="full",
**{SERVER_NAME_LABEL: self.server_name},
).inc()
users_interested_in = (
await self.store.get_users_who_share_room_with_user(user_id)
@@ -2148,6 +2210,7 @@ def handle_update(
prev_state: UserPresenceState,
new_state: UserPresenceState,
is_mine: bool,
our_server_name: str,
wheel_timer: WheelTimer,
now: int,
persist: bool,
@@ -2160,6 +2223,7 @@ def handle_update(
prev_state
new_state
is_mine: Whether the user is ours
our_server_name: The homeserver name of the our server (`hs.hostname`)
wheel_timer
now: Time now in ms
persist: True if this state should persist until another update occurs.
@@ -2228,7 +2292,7 @@ def handle_update(
)
# Check whether the change was something worth notifying about
if should_notify(prev_state, new_state, is_mine):
if should_notify(prev_state, new_state, is_mine, our_server_name):
new_state = new_state.copy_and_replace(last_federation_update_ts=now)
persist_and_notify = True

View File

@@ -45,6 +45,7 @@ from synapse.api.errors import (
from synapse.appservice import ApplicationService
from synapse.config.server import is_threepid_reserved
from synapse.http.servlet import assert_params_in_dict
from synapse.metrics import SERVER_NAME_LABEL
from synapse.replication.http.login import RegisterDeviceReplicationServlet
from synapse.replication.http.register import (
ReplicationPostRegisterActionsServlet,
@@ -62,29 +63,38 @@ logger = logging.getLogger(__name__)
registration_counter = Counter(
"synapse_user_registrations_total",
"Number of new users registered (since restart)",
["guest", "shadow_banned", "auth_provider"],
labelnames=["guest", "shadow_banned", "auth_provider", SERVER_NAME_LABEL],
)
login_counter = Counter(
"synapse_user_logins_total",
"Number of user logins (since restart)",
["guest", "auth_provider"],
labelnames=["guest", "auth_provider", SERVER_NAME_LABEL],
)
def init_counters_for_auth_provider(auth_provider_id: str) -> None:
def init_counters_for_auth_provider(auth_provider_id: str, server_name: str) -> None:
"""Ensure the prometheus counters for the given auth provider are initialised
This fixes a problem where the counters are not reported for a given auth provider
until the user first logs in/registers.
Args:
auth_provider_id: The ID of the auth provider to initialise counters for.
server_name: Our server name (used to label metrics) (this should be `hs.hostname`).
"""
for is_guest in (True, False):
login_counter.labels(guest=is_guest, auth_provider=auth_provider_id)
login_counter.labels(
guest=is_guest,
auth_provider=auth_provider_id,
**{SERVER_NAME_LABEL: server_name},
)
for shadow_banned in (True, False):
registration_counter.labels(
guest=is_guest,
shadow_banned=shadow_banned,
auth_provider=auth_provider_id,
**{SERVER_NAME_LABEL: server_name},
)
@@ -97,6 +107,7 @@ class LoginDict(TypedDict):
class RegistrationHandler:
def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.clock = hs.get_clock()
@@ -112,7 +123,6 @@ class RegistrationHandler:
self._account_validity_handler = hs.get_account_validity_handler()
self._user_consent_version = self.hs.config.consent.user_consent_version
self._server_notices_mxid = hs.config.servernotices.server_notices_mxid
self._server_name = hs.hostname
self._user_types_config = hs.config.user_types
self._spam_checker_module_callbacks = hs.get_module_api_callbacks().spam_checker
@@ -138,7 +148,9 @@ class RegistrationHandler:
)
self.refresh_token_lifetime = hs.config.registration.refresh_token_lifetime
init_counters_for_auth_provider("")
init_counters_for_auth_provider(
auth_provider_id="", server_name=self.server_name
)
async def check_username(
self,
@@ -362,6 +374,7 @@ class RegistrationHandler:
guest=make_guest,
shadow_banned=shadow_banned,
auth_provider=(auth_provider_id or ""),
**{SERVER_NAME_LABEL: self.server_name},
).inc()
# If the user does not need to consent at registration, auto-join any
@@ -422,7 +435,7 @@ class RegistrationHandler:
if self.hs.config.registration.auto_join_user_id:
fake_requester = create_requester(
self.hs.config.registration.auto_join_user_id,
authenticated_entity=self._server_name,
authenticated_entity=self.server_name,
)
# If the room requires an invite, add the user to the list of invites.
@@ -435,7 +448,7 @@ class RegistrationHandler:
requires_join = True
else:
fake_requester = create_requester(
user_id, authenticated_entity=self._server_name
user_id, authenticated_entity=self.server_name
)
# Choose whether to federate the new room.
@@ -467,7 +480,7 @@ class RegistrationHandler:
await room_member_handler.update_membership(
requester=create_requester(
user_id, authenticated_entity=self._server_name
user_id, authenticated_entity=self.server_name
),
target=UserID.from_string(user_id),
room_id=room_id,
@@ -493,7 +506,7 @@ class RegistrationHandler:
if requires_join:
await room_member_handler.update_membership(
requester=create_requester(
user_id, authenticated_entity=self._server_name
user_id, authenticated_entity=self.server_name
),
target=UserID.from_string(user_id),
room_id=room_id,
@@ -539,7 +552,7 @@ class RegistrationHandler:
# we don't have a local user in the room to craft up an invite with.
requires_invite = await self.store.is_host_joined(
room_id,
self._server_name,
self.server_name,
)
if requires_invite:
@@ -567,7 +580,7 @@ class RegistrationHandler:
await room_member_handler.update_membership(
requester=create_requester(
self.hs.config.registration.auto_join_user_id,
authenticated_entity=self._server_name,
authenticated_entity=self.server_name,
),
target=UserID.from_string(user_id),
room_id=room_id,
@@ -579,7 +592,7 @@ class RegistrationHandler:
# Send the join.
await room_member_handler.update_membership(
requester=create_requester(
user_id, authenticated_entity=self._server_name
user_id, authenticated_entity=self.server_name
),
target=UserID.from_string(user_id),
room_id=room_id,
@@ -790,6 +803,7 @@ class RegistrationHandler:
login_counter.labels(
guest=is_guest,
auth_provider=(auth_provider_id or ""),
**{SERVER_NAME_LABEL: self.server_name},
).inc()
return (

View File

@@ -202,7 +202,7 @@ class SsoHandler:
def __init__(self, hs: "HomeServer"):
self._clock = hs.get_clock()
self._store = hs.get_datastores().main
self._server_name = hs.hostname
self.server_name = hs.hostname
self._is_mine_server_name = hs.is_mine_server_name
self._registration_handler = hs.get_registration_handler()
self._auth_handler = hs.get_auth_handler()
@@ -238,7 +238,9 @@ class SsoHandler:
p_id = p.idp_id
assert p_id not in self._identity_providers
self._identity_providers[p_id] = p
init_counters_for_auth_provider(p_id)
init_counters_for_auth_provider(
auth_provider_id=p_id, server_name=self.server_name
)
def get_identity_providers(self) -> Mapping[str, SsoIdentityProvider]:
"""Get the configured identity providers"""
@@ -569,7 +571,7 @@ class SsoHandler:
return attributes
# Check if this mxid already exists
user_id = UserID(attributes.localpart, self._server_name).to_string()
user_id = UserID(attributes.localpart, self.server_name).to_string()
if not await self._store.get_users_by_id_case_insensitive(user_id):
# This mxid is free
break
@@ -907,7 +909,7 @@ class SsoHandler:
# render an error page.
html = self._bad_user_template.render(
server_name=self._server_name,
server_name=self.server_name,
user_id_to_verify=user_id_to_verify,
)
respond_with_html(request, 200, html)
@@ -959,7 +961,7 @@ class SsoHandler:
if contains_invalid_mxid_characters(localpart):
raise SynapseError(400, "localpart is invalid: %s" % (localpart,))
user_id = UserID(localpart, self._server_name).to_string()
user_id = UserID(localpart, self.server_name).to_string()
user_infos = await self._store.get_users_by_id_case_insensitive(user_id)
logger.info("[session %s] users: %s", session_id, user_infos)

View File

@@ -63,6 +63,7 @@ from synapse.logging.opentracing import (
start_active_span,
trace,
)
from synapse.metrics import SERVER_NAME_LABEL
from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.databases.main.stream import PaginateFunction
@@ -104,7 +105,7 @@ non_empty_sync_counter = Counter(
"Count of non empty sync responses. type is initial_sync/full_state_sync"
"/incremental_sync. lazy_loaded indicates if lazy loaded members were "
"enabled for that request.",
["type", "lazy_loaded"],
labelnames=["type", "lazy_loaded", SERVER_NAME_LABEL],
)
# Store the cache that tracks which lazy-loaded members have been sent to a given
@@ -614,7 +615,11 @@ class SyncHandler:
lazy_loaded = "true"
else:
lazy_loaded = "false"
non_empty_sync_counter.labels(sync_label, lazy_loaded).inc()
non_empty_sync_counter.labels(
type=sync_label,
lazy_loaded=lazy_loaded,
**{SERVER_NAME_LABEL: self.server_name},
).inc()
return result

View File

@@ -85,6 +85,7 @@ from synapse.http.replicationagent import ReplicationAgent
from synapse.http.types import QueryParams
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import set_tag, start_active_span, tags
from synapse.metrics import SERVER_NAME_LABEL
from synapse.types import ISynapseReactor, StrSequence
from synapse.util import json_decoder
from synapse.util.async_helpers import timeout_deferred
@@ -108,9 +109,13 @@ except ImportError:
logger = logging.getLogger(__name__)
outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"])
outgoing_requests_counter = Counter(
"synapse_http_client_requests", "", labelnames=["method", SERVER_NAME_LABEL]
)
incoming_responses_counter = Counter(
"synapse_http_client_responses", "", ["method", "code"]
"synapse_http_client_responses",
"",
labelnames=["method", "code", SERVER_NAME_LABEL],
)
# the type of the headers map, to be passed to the t.w.h.Headers.
@@ -346,6 +351,7 @@ class BaseHttpClient:
treq_args: Optional[Dict[str, Any]] = None,
):
self.hs = hs
self.server_name = hs.hostname
self.reactor = hs.get_reactor()
self._extra_treq_args = treq_args or {}
@@ -384,7 +390,9 @@ class BaseHttpClient:
RequestTimedOutError if the request times out before the headers are read
"""
outgoing_requests_counter.labels(method).inc()
outgoing_requests_counter.labels(
method=method, **{SERVER_NAME_LABEL: self.server_name}
).inc()
# log request but strip `access_token` (AS requests for example include this)
logger.debug("Sending request %s %s", method, redact_uri(uri))
@@ -438,7 +446,11 @@ class BaseHttpClient:
response = await make_deferred_yieldable(request_deferred)
incoming_responses_counter.labels(method, response.code).inc()
incoming_responses_counter.labels(
method=method,
code=response.code,
**{SERVER_NAME_LABEL: self.server_name},
).inc()
logger.info(
"Received response to %s %s: %s",
method,
@@ -447,7 +459,11 @@ class BaseHttpClient:
)
return response
except Exception as e:
incoming_responses_counter.labels(method, "ERR").inc()
incoming_responses_counter.labels(
method=method,
code="ERR",
**{SERVER_NAME_LABEL: self.server_name},
).inc()
logger.info(
"Error sending request to %s %s: %s %s",
method,
@@ -855,6 +871,7 @@ class ReplicationClient(BaseHttpClient):
hs: The HomeServer instance to pass in
"""
super().__init__(hs)
self.server_name = hs.hostname
# Use a pool, but a very small one.
pool = HTTPConnectionPool(self.reactor)
@@ -891,7 +908,9 @@ class ReplicationClient(BaseHttpClient):
RequestTimedOutError if the request times out before the headers are read
"""
outgoing_requests_counter.labels(method).inc()
outgoing_requests_counter.labels(
method=method, **{SERVER_NAME_LABEL: self.server_name}
).inc()
logger.debug("Sending request %s %s", method, uri)
@@ -948,7 +967,11 @@ class ReplicationClient(BaseHttpClient):
response = await make_deferred_yieldable(request_deferred)
incoming_responses_counter.labels(method, response.code).inc()
incoming_responses_counter.labels(
method=method,
code=response.code,
**{SERVER_NAME_LABEL: self.server_name},
).inc()
logger.info(
"Received response to %s %s: %s",
method,
@@ -957,7 +980,11 @@ class ReplicationClient(BaseHttpClient):
)
return response
except Exception as e:
incoming_responses_counter.labels(method, "ERR").inc()
incoming_responses_counter.labels(
method=method,
code="ERR",
**{SERVER_NAME_LABEL: self.server_name},
).inc()
logger.info(
"Error sending request to %s %s: %s %s",
method,

View File

@@ -87,6 +87,7 @@ from synapse.http.types import QueryParams
from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import set_tag, start_active_span, tags
from synapse.metrics import SERVER_NAME_LABEL
from synapse.types import JsonDict
from synapse.util import json_decoder
from synapse.util.async_helpers import AwakenableSleeper, Linearizer, timeout_deferred
@@ -99,10 +100,14 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
outgoing_requests_counter = Counter(
"synapse_http_matrixfederationclient_requests", "", ["method"]
"synapse_http_matrixfederationclient_requests",
"",
labelnames=["method", SERVER_NAME_LABEL],
)
incoming_responses_counter = Counter(
"synapse_http_matrixfederationclient_responses", "", ["method", "code"]
"synapse_http_matrixfederationclient_responses",
"",
labelnames=["method", "code", SERVER_NAME_LABEL],
)
@@ -697,7 +702,9 @@ class MatrixFederationHttpClient:
_sec_timeout,
)
outgoing_requests_counter.labels(request.method).inc()
outgoing_requests_counter.labels(
method=request.method, **{SERVER_NAME_LABEL: self.server_name}
).inc()
try:
with Measure(
@@ -736,7 +743,9 @@ class MatrixFederationHttpClient:
raise RequestSendFailed(e, can_retry=True) from e
incoming_responses_counter.labels(
request.method, response.code
method=request.method,
code=response.code,
**{SERVER_NAME_LABEL: self.server_name},
).inc()
set_tag(tags.HTTP_STATUS_CODE, response.code)

View File

@@ -27,40 +27,52 @@ from typing import Dict, Mapping, Set, Tuple
from prometheus_client.core import Counter, Histogram
from synapse.logging.context import current_context
from synapse.metrics import LaterGauge
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
logger = logging.getLogger(__name__)
# total number of responses served, split by method/servlet/tag
response_count = Counter(
"synapse_http_server_response_count", "", ["method", "servlet", "tag"]
"synapse_http_server_response_count",
"",
labelnames=["method", "servlet", "tag", SERVER_NAME_LABEL],
)
requests_counter = Counter(
"synapse_http_server_requests_received", "", ["method", "servlet"]
"synapse_http_server_requests_received",
"",
labelnames=["method", "servlet", SERVER_NAME_LABEL],
)
outgoing_responses_counter = Counter(
"synapse_http_server_responses", "", ["method", "code"]
"synapse_http_server_responses",
"",
labelnames=["method", "code", SERVER_NAME_LABEL],
)
response_timer = Histogram(
"synapse_http_server_response_time_seconds",
"sec",
["method", "servlet", "tag", "code"],
labelnames=["method", "servlet", "tag", "code", SERVER_NAME_LABEL],
)
response_ru_utime = Counter(
"synapse_http_server_response_ru_utime_seconds", "sec", ["method", "servlet", "tag"]
"synapse_http_server_response_ru_utime_seconds",
"sec",
labelnames=["method", "servlet", "tag", SERVER_NAME_LABEL],
)
response_ru_stime = Counter(
"synapse_http_server_response_ru_stime_seconds", "sec", ["method", "servlet", "tag"]
"synapse_http_server_response_ru_stime_seconds",
"sec",
labelnames=["method", "servlet", "tag", SERVER_NAME_LABEL],
)
response_db_txn_count = Counter(
"synapse_http_server_response_db_txn_count", "", ["method", "servlet", "tag"]
"synapse_http_server_response_db_txn_count",
"",
labelnames=["method", "servlet", "tag", SERVER_NAME_LABEL],
)
# seconds spent waiting for db txns, excluding scheduling time, when processing
@@ -68,34 +80,42 @@ response_db_txn_count = Counter(
response_db_txn_duration = Counter(
"synapse_http_server_response_db_txn_duration_seconds",
"",
["method", "servlet", "tag"],
labelnames=["method", "servlet", "tag", SERVER_NAME_LABEL],
)
# seconds spent waiting for a db connection, when processing this request
response_db_sched_duration = Counter(
"synapse_http_server_response_db_sched_duration_seconds",
"",
["method", "servlet", "tag"],
labelnames=["method", "servlet", "tag", SERVER_NAME_LABEL],
)
# size in bytes of the response written
response_size = Counter(
"synapse_http_server_response_size", "", ["method", "servlet", "tag"]
"synapse_http_server_response_size",
"",
labelnames=["method", "servlet", "tag", SERVER_NAME_LABEL],
)
# In flight metrics are incremented while the requests are in flight, rather
# than when the response was written.
in_flight_requests_ru_utime = Counter(
"synapse_http_server_in_flight_requests_ru_utime_seconds", "", ["method", "servlet"]
"synapse_http_server_in_flight_requests_ru_utime_seconds",
"",
labelnames=["method", "servlet", SERVER_NAME_LABEL],
)
in_flight_requests_ru_stime = Counter(
"synapse_http_server_in_flight_requests_ru_stime_seconds", "", ["method", "servlet"]
"synapse_http_server_in_flight_requests_ru_stime_seconds",
"",
labelnames=["method", "servlet", SERVER_NAME_LABEL],
)
in_flight_requests_db_txn_count = Counter(
"synapse_http_server_in_flight_requests_db_txn_count", "", ["method", "servlet"]
"synapse_http_server_in_flight_requests_db_txn_count",
"",
labelnames=["method", "servlet", SERVER_NAME_LABEL],
)
# seconds spent waiting for db txns, excluding scheduling time, when processing
@@ -103,14 +123,14 @@ in_flight_requests_db_txn_count = Counter(
in_flight_requests_db_txn_duration = Counter(
"synapse_http_server_in_flight_requests_db_txn_duration_seconds",
"",
["method", "servlet"],
labelnames=["method", "servlet", SERVER_NAME_LABEL],
)
# seconds spent waiting for a db connection, when processing this request
in_flight_requests_db_sched_duration = Counter(
"synapse_http_server_in_flight_requests_db_sched_duration_seconds",
"",
["method", "servlet"],
labelnames=["method", "servlet", SERVER_NAME_LABEL],
)
_in_flight_requests: Set["RequestMetrics"] = set()
@@ -149,6 +169,13 @@ LaterGauge(
class RequestMetrics:
def __init__(self, our_server_name: str) -> None:
"""
Args:
our_server_name: Our homeserver name (used to label metrics) (`hs.hostname`)
"""
self.our_server_name = our_server_name
def start(self, time_sec: float, name: str, method: str) -> None:
self.start_ts = time_sec
self.start_context = current_context()
@@ -194,33 +221,39 @@ class RequestMetrics:
response_code_str = str(response_code)
outgoing_responses_counter.labels(self.method, response_code_str).inc()
outgoing_responses_counter.labels(
method=self.method,
code=response_code_str,
**{SERVER_NAME_LABEL: self.our_server_name},
).inc()
response_count.labels(self.method, self.name, tag).inc()
response_base_labels = {
"method": self.method,
"servlet": self.name,
"tag": tag,
SERVER_NAME_LABEL: self.our_server_name,
}
response_timer.labels(self.method, self.name, tag, response_code_str).observe(
response_count.labels(**response_base_labels).inc()
response_timer.labels(code=response_code_str, **response_base_labels).observe(
time_sec - self.start_ts
)
resource_usage = context.get_resource_usage()
response_ru_utime.labels(self.method, self.name, tag).inc(
resource_usage.ru_utime
)
response_ru_stime.labels(self.method, self.name, tag).inc(
resource_usage.ru_stime
)
response_db_txn_count.labels(self.method, self.name, tag).inc(
response_ru_utime.labels(**response_base_labels).inc(resource_usage.ru_utime)
response_ru_stime.labels(**response_base_labels).inc(resource_usage.ru_stime)
response_db_txn_count.labels(**response_base_labels).inc(
resource_usage.db_txn_count
)
response_db_txn_duration.labels(self.method, self.name, tag).inc(
response_db_txn_duration.labels(**response_base_labels).inc(
resource_usage.db_txn_duration_sec
)
response_db_sched_duration.labels(self.method, self.name, tag).inc(
response_db_sched_duration.labels(**response_base_labels).inc(
resource_usage.db_sched_duration_sec
)
response_size.labels(self.method, self.name, tag).inc(sent_bytes)
response_size.labels(**response_base_labels).inc(sent_bytes)
# We always call this at the end to ensure that we update the metrics
# regardless of whether a call to /metrics while the request was in
@@ -240,24 +273,30 @@ class RequestMetrics:
diff = new_stats - self._request_stats
self._request_stats = new_stats
in_flight_labels = {
"method": self.method,
"servlet": self.name,
SERVER_NAME_LABEL: self.our_server_name,
}
# max() is used since rapid use of ru_stime/ru_utime can end up with the
# count going backwards due to NTP, time smearing, fine-grained
# correction, or floating points. Who knows, really?
in_flight_requests_ru_utime.labels(self.method, self.name).inc(
in_flight_requests_ru_utime.labels(**in_flight_labels).inc(
max(diff.ru_utime, 0)
)
in_flight_requests_ru_stime.labels(self.method, self.name).inc(
in_flight_requests_ru_stime.labels(**in_flight_labels).inc(
max(diff.ru_stime, 0)
)
in_flight_requests_db_txn_count.labels(self.method, self.name).inc(
in_flight_requests_db_txn_count.labels(**in_flight_labels).inc(
diff.db_txn_count
)
in_flight_requests_db_txn_duration.labels(self.method, self.name).inc(
in_flight_requests_db_txn_duration.labels(**in_flight_labels).inc(
diff.db_txn_duration_sec
)
in_flight_requests_db_sched_duration.labels(self.method, self.name).inc(
in_flight_requests_db_sched_duration.labels(**in_flight_labels).inc(
diff.db_sched_duration_sec
)

View File

@@ -44,6 +44,7 @@ from synapse.logging.context import (
LoggingContext,
PreserveLoggingContext,
)
from synapse.metrics import SERVER_NAME_LABEL
from synapse.types import ISynapseReactor, Requester
if TYPE_CHECKING:
@@ -83,12 +84,14 @@ class SynapseRequest(Request):
self,
channel: HTTPChannel,
site: "SynapseSite",
our_server_name: str,
*args: Any,
max_request_body_size: int = 1024,
request_id_header: Optional[str] = None,
**kw: Any,
):
super().__init__(channel, *args, **kw)
self.our_server_name = our_server_name
self._max_request_body_size = max_request_body_size
self.request_id_header = request_id_header
self.synapse_site = site
@@ -334,7 +337,11 @@ class SynapseRequest(Request):
# dispatching to the handler, so that the handler
# can update the servlet name in the request
# metrics
requests_counter.labels(self.get_method(), self.request_metrics.name).inc()
requests_counter.labels(
method=self.get_method(),
servlet=self.request_metrics.name,
**{SERVER_NAME_LABEL: self.our_server_name},
).inc()
@contextlib.contextmanager
def processing(self) -> Generator[None, None, None]:
@@ -455,7 +462,7 @@ class SynapseRequest(Request):
self.request_metrics.name.
"""
self.start_time = time.time()
self.request_metrics = RequestMetrics()
self.request_metrics = RequestMetrics(our_server_name=self.our_server_name)
self.request_metrics.start(
self.start_time, name=servlet_name, method=self.get_method()
)
@@ -694,6 +701,7 @@ class SynapseSite(ProxySite):
self.site_tag = site_tag
self.reactor: ISynapseReactor = reactor
self.server_name = hs.hostname
assert config.http_options is not None
proxied = config.http_options.x_forwarded
@@ -705,6 +713,7 @@ class SynapseSite(ProxySite):
return request_class(
channel,
self,
our_server_name=self.server_name,
max_request_body_size=max_request_body_size,
queued=queued,
request_id_header=request_id_header,

View File

@@ -91,6 +91,7 @@ terms, an endpoint you can scrape is called an *instance*, usually corresponding
single process." (source: https://prometheus.io/docs/concepts/jobs_instances/)
"""
CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8"
"""
Content type of the latest text format for Prometheus metrics.
@@ -471,18 +472,24 @@ REGISTRY.register(CPUMetrics())
# Federation Metrics
#
sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "")
sent_transactions_counter = Counter(
"synapse_federation_client_sent_transactions", "", labelnames=[SERVER_NAME_LABEL]
)
events_processed_counter = Counter("synapse_federation_client_events_processed", "")
events_processed_counter = Counter(
"synapse_federation_client_events_processed", "", labelnames=[SERVER_NAME_LABEL]
)
event_processing_loop_counter = Counter(
"synapse_event_processing_loop_count", "Event processing loop iterations", ["name"]
"synapse_event_processing_loop_count",
"Event processing loop iterations",
labelnames=["name", SERVER_NAME_LABEL],
)
event_processing_loop_room_count = Counter(
"synapse_event_processing_loop_room_count",
"Rooms seen per event processing loop iteration",
["name"],
labelnames=["name", SERVER_NAME_LABEL],
)

View File

@@ -50,7 +50,7 @@ from synapse.handlers.presence import format_user_presence_state
from synapse.logging import issue9533_logger
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.opentracing import log_kv, start_active_span
from synapse.metrics import LaterGauge
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
from synapse.streams.config import PaginationConfig
from synapse.types import (
ISynapseReactor,
@@ -74,10 +74,15 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
notified_events_counter = Counter("synapse_notifier_notified_events", "")
# FIXME: Unused metric, remove if not needed.
notified_events_counter = Counter(
"synapse_notifier_notified_events", "", labelnames=[SERVER_NAME_LABEL]
)
users_woken_by_stream_counter = Counter(
"synapse_notifier_users_woken_by_stream", "", ["stream"]
"synapse_notifier_users_woken_by_stream",
"",
labelnames=["stream", SERVER_NAME_LABEL],
)
T = TypeVar("T")
@@ -224,6 +229,7 @@ class Notifier:
self.room_to_user_streams: Dict[str, Set[_NotifierUserStream]] = {}
self.hs = hs
self.server_name = hs.hostname
self._storage_controllers = hs.get_storage_controllers()
self.event_sources = hs.get_event_sources()
self.store = hs.get_datastores().main
@@ -350,9 +356,10 @@ class Notifier:
for listener in listeners:
listener.callback(current_token)
users_woken_by_stream_counter.labels(StreamKeyType.UN_PARTIAL_STATED_ROOMS).inc(
len(user_streams)
)
users_woken_by_stream_counter.labels(
stream=StreamKeyType.UN_PARTIAL_STATED_ROOMS,
**{SERVER_NAME_LABEL: self.server_name},
).inc(len(user_streams))
# Poke the replication so that other workers also see the write to
# the un-partial-stated rooms stream.
@@ -575,7 +582,10 @@ class Notifier:
listener.callback(current_token)
if user_streams:
users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
users_woken_by_stream_counter.labels(
stream=stream_key,
**{SERVER_NAME_LABEL: self.server_name},
).inc(len(user_streams))
self.notify_replication()

View File

@@ -50,6 +50,7 @@ from synapse.event_auth import auth_types_for_event, get_user_power_level
from synapse.events import EventBase, relation_from_event
from synapse.events.snapshot import EventContext
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import SERVER_NAME_LABEL
from synapse.state import CREATE_KEY, POWER_KEY
from synapse.storage.databases.main.roommember import EventIdMembership
from synapse.storage.invite_rule import InviteRule
@@ -68,11 +69,17 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# FIXME: Unused metric, remove if not needed.
push_rules_invalidation_counter = Counter(
"synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter", ""
"synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter",
"",
labelnames=[SERVER_NAME_LABEL],
)
# FIXME: Unused metric, remove if not needed.
push_rules_state_size_counter = Counter(
"synapse_push_bulk_push_rule_evaluator_push_rules_state_size_counter", ""
"synapse_push_bulk_push_rule_evaluator_push_rules_state_size_counter",
"",
labelnames=[SERVER_NAME_LABEL],
)

View File

@@ -31,6 +31,7 @@ from twisted.internet.interfaces import IDelayedCall
from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.logging import opentracing
from synapse.metrics import SERVER_NAME_LABEL
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import Pusher, PusherConfig, PusherConfigException
from synapse.storage.databases.main.event_push_actions import HttpPushAction
@@ -46,21 +47,25 @@ logger = logging.getLogger(__name__)
http_push_processed_counter = Counter(
"synapse_http_httppusher_http_pushes_processed",
"Number of push notifications successfully sent",
labelnames=[SERVER_NAME_LABEL],
)
http_push_failed_counter = Counter(
"synapse_http_httppusher_http_pushes_failed",
"Number of push notifications which failed",
labelnames=[SERVER_NAME_LABEL],
)
http_badges_processed_counter = Counter(
"synapse_http_httppusher_badge_updates_processed",
"Number of badge updates successfully sent",
labelnames=[SERVER_NAME_LABEL],
)
http_badges_failed_counter = Counter(
"synapse_http_httppusher_badge_updates_failed",
"Number of badge updates which failed",
labelnames=[SERVER_NAME_LABEL],
)
@@ -268,7 +273,9 @@ class HttpPusher(Pusher):
processed = await self._process_one(push_action)
if processed:
http_push_processed_counter.inc()
http_push_processed_counter.labels(
**{SERVER_NAME_LABEL: self.server_name}
).inc()
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action.stream_ordering
pusher_still_exists = (
@@ -292,7 +299,9 @@ class HttpPusher(Pusher):
self.app_id, self.pushkey, self.user_id, self.failing_since
)
else:
http_push_failed_counter.inc()
http_push_failed_counter.labels(
**{SERVER_NAME_LABEL: self.server_name}
).inc()
if not self.failing_since:
self.failing_since = self.clock.time_msec()
await self.store.update_pusher_failing_since(
@@ -543,9 +552,13 @@ class HttpPusher(Pusher):
}
try:
await self.http_client.post_json_get_json(self.url, d)
http_badges_processed_counter.inc()
http_badges_processed_counter.labels(
**{SERVER_NAME_LABEL: self.server_name}
).inc()
except Exception as e:
logger.warning(
"Failed to send badge count to %s: %s %s", self.name, type(e), e
)
http_badges_failed_counter.inc()
http_badges_failed_counter.labels(
**{SERVER_NAME_LABEL: self.server_name}
).inc()

View File

@@ -32,6 +32,7 @@ from synapse.api.constants import EventContentFields, EventTypes, Membership, Ro
from synapse.api.errors import StoreError
from synapse.config.emailconfig import EmailSubjectConfig
from synapse.events import EventBase
from synapse.metrics import SERVER_NAME_LABEL
from synapse.push.presentable_names import (
calculate_room_name,
descriptor_from_member_events,
@@ -60,7 +61,7 @@ T = TypeVar("T")
emails_sent_counter = Counter(
"synapse_emails_sent_total",
"Emails sent by type",
["type"],
labelnames=["type", SERVER_NAME_LABEL],
)
@@ -123,6 +124,7 @@ class Mailer:
template_text: jinja2.Template,
):
self.hs = hs
self.server_name = hs.hostname
self.template_html = template_html
self.template_text = template_text
@@ -137,8 +139,6 @@ class Mailer:
logger.info("Created Mailer for app_name %s", app_name)
emails_sent_counter.labels("password_reset")
async def send_password_reset_mail(
self, email_address: str, token: str, client_secret: str, sid: str
) -> None:
@@ -162,7 +162,10 @@ class Mailer:
template_vars: TemplateVars = {"link": link}
emails_sent_counter.labels("password_reset").inc()
emails_sent_counter.labels(
type="password_reset",
**{SERVER_NAME_LABEL: self.server_name},
).inc()
await self.send_email(
email_address,
@@ -171,8 +174,6 @@ class Mailer:
template_vars,
)
emails_sent_counter.labels("registration")
async def send_registration_mail(
self, email_address: str, token: str, client_secret: str, sid: str
) -> None:
@@ -196,7 +197,10 @@ class Mailer:
template_vars: TemplateVars = {"link": link}
emails_sent_counter.labels("registration").inc()
emails_sent_counter.labels(
type="registration",
**{SERVER_NAME_LABEL: self.server_name},
).inc()
await self.send_email(
email_address,
@@ -205,8 +209,6 @@ class Mailer:
template_vars,
)
emails_sent_counter.labels("already_in_use")
async def send_already_in_use_mail(self, email_address: str) -> None:
"""Send an email if the address is already bound to an user account
@@ -214,6 +216,11 @@ class Mailer:
email_address: Email address we're sending to the "already in use" mail
"""
emails_sent_counter.labels(
type="already_in_use",
**{SERVER_NAME_LABEL: self.server_name},
).inc()
await self.send_email(
email_address,
self.email_subjects.email_already_in_use
@@ -221,8 +228,6 @@ class Mailer:
{},
)
emails_sent_counter.labels("add_threepid")
async def send_add_threepid_mail(
self, email_address: str, token: str, client_secret: str, sid: str
) -> None:
@@ -247,7 +252,10 @@ class Mailer:
template_vars: TemplateVars = {"link": link}
emails_sent_counter.labels("add_threepid").inc()
emails_sent_counter.labels(
type="add_threepid",
**{SERVER_NAME_LABEL: self.server_name},
).inc()
await self.send_email(
email_address,
@@ -256,8 +264,6 @@ class Mailer:
template_vars,
)
emails_sent_counter.labels("notification")
async def send_notification_mail(
self,
app_id: str,
@@ -352,7 +358,10 @@ class Mailer:
"reason": reason,
}
emails_sent_counter.labels("notification").inc()
emails_sent_counter.labels(
type="notification",
**{SERVER_NAME_LABEL: self.server_name},
).inc()
await self.send_email(
email_address, summary_text, template_vars, unsubscribe_link

View File

@@ -38,6 +38,7 @@ from synapse.http.servlet import parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.logging import opentracing
from synapse.logging.opentracing import trace_with_opname
from synapse.metrics import SERVER_NAME_LABEL
from synapse.types import JsonDict
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.cancellation import is_function_cancellable
@@ -57,7 +58,7 @@ _pending_outgoing_requests = Gauge(
_outgoing_request_counter = Counter(
"synapse_outgoing_replication_requests",
"Number of outgoing replication requests, by replication method name and result",
["name", "code"],
labelnames=["name", "code", SERVER_NAME_LABEL],
)
@@ -205,6 +206,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
parameter to specify which instance to hit (the instance must be in
the `instance_map` config).
"""
server_name = hs.hostname
clock = hs.get_clock()
client = hs.get_replication_client()
local_instance_name = hs.get_instance_name()
@@ -333,15 +335,27 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
# We convert to SynapseError as we know that it was a SynapseError
# on the main process that we should send to the client. (And
# importantly, not stack traces everywhere)
_outgoing_request_counter.labels(cls.NAME, e.code).inc()
_outgoing_request_counter.labels(
name=cls.NAME,
code=e.code,
**{SERVER_NAME_LABEL: server_name},
).inc()
raise e.to_synapse_error()
except Exception as e:
_outgoing_request_counter.labels(cls.NAME, "ERR").inc()
_outgoing_request_counter.labels(
name=cls.NAME,
code="ERR",
**{SERVER_NAME_LABEL: server_name},
).inc()
raise SynapseError(
502, f"Failed to talk to {instance_name} process"
) from e
_outgoing_request_counter.labels(cls.NAME, 200).inc()
_outgoing_request_counter.labels(
name=cls.NAME,
code=200,
**{SERVER_NAME_LABEL: server_name},
).inc()
# Wait on any streams that the remote may have written to.
for stream_name, position in result.pop(

View File

@@ -26,6 +26,7 @@ from prometheus_client import Counter, Histogram
from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable
from synapse.metrics import SERVER_NAME_LABEL
from synapse.util import json_decoder, json_encoder
if TYPE_CHECKING:
@@ -36,13 +37,13 @@ if TYPE_CHECKING:
set_counter = Counter(
"synapse_external_cache_set",
"Number of times we set a cache",
labelnames=["cache_name"],
labelnames=["cache_name", SERVER_NAME_LABEL],
)
get_counter = Counter(
"synapse_external_cache_get",
"Number of times we get a cache",
labelnames=["cache_name", "hit"],
labelnames=["cache_name", "hit", SERVER_NAME_LABEL],
)
response_timer = Histogram(
@@ -69,6 +70,8 @@ class ExternalCache:
"""
def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
if hs.config.redis.redis_enabled:
self._redis_connection: Optional["ConnectionHandler"] = (
hs.get_outbound_redis_connection()
@@ -93,7 +96,9 @@ class ExternalCache:
if self._redis_connection is None:
return
set_counter.labels(cache_name).inc()
set_counter.labels(
cache_name=cache_name, **{SERVER_NAME_LABEL: self.server_name}
).inc()
# txredisapi requires the value to be string, bytes or numbers, so we
# encode stuff in JSON.
@@ -131,7 +136,11 @@ class ExternalCache:
logger.debug("Got cache result %s %s: %r", cache_name, key, result)
get_counter.labels(cache_name, result is not None).inc()
get_counter.labels(
cache_name=cache_name,
hit=result is not None,
**{SERVER_NAME_LABEL: self.server_name},
).inc()
if not result:
return None

View File

@@ -40,7 +40,7 @@ from prometheus_client import Counter
from twisted.internet.protocol import ReconnectingClientFactory
from synapse.metrics import LaterGauge
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.commands import (
ClearUserSyncsCommand,
@@ -85,13 +85,26 @@ logger = logging.getLogger(__name__)
# number of updates received for each RDATA stream
inbound_rdata_count = Counter(
"synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"]
"synapse_replication_tcp_protocol_inbound_rdata_count",
"",
labelnames=["stream_name", SERVER_NAME_LABEL],
)
user_sync_counter = Counter(
"synapse_replication_tcp_resource_user_sync", "", labelnames=[SERVER_NAME_LABEL]
)
federation_ack_counter = Counter(
"synapse_replication_tcp_resource_federation_ack",
"",
labelnames=[SERVER_NAME_LABEL],
)
# FIXME: Unused metric, remove if not needed.
remove_pusher_counter = Counter(
"synapse_replication_tcp_resource_remove_pusher", "", labelnames=[SERVER_NAME_LABEL]
)
user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "")
remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")
user_ip_cache_counter = Counter(
"synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL]
)
# the type of the entries in _command_queues_by_stream
@@ -460,7 +473,7 @@ class ReplicationCommandHandler:
def on_USER_SYNC(
self, conn: IReplicationConnection, cmd: UserSyncCommand
) -> Optional[Awaitable[None]]:
user_sync_counter.inc()
user_sync_counter.labels(**{SERVER_NAME_LABEL: self.server_name}).inc()
if self._is_presence_writer:
return self._presence_handler.update_external_syncs_row(
@@ -484,7 +497,7 @@ class ReplicationCommandHandler:
def on_FEDERATION_ACK(
self, conn: IReplicationConnection, cmd: FederationAckCommand
) -> None:
federation_ack_counter.inc()
federation_ack_counter.labels(**{SERVER_NAME_LABEL: self.server_name}).inc()
if self._federation_sender:
self._federation_sender.federation_ack(cmd.instance_name, cmd.token)
@@ -492,7 +505,7 @@ class ReplicationCommandHandler:
def on_USER_IP(
self, conn: IReplicationConnection, cmd: UserIpCommand
) -> Optional[Awaitable[None]]:
user_ip_cache_counter.inc()
user_ip_cache_counter.labels(**{SERVER_NAME_LABEL: self.server_name}).inc()
if self._is_master or self._should_insert_client_ips:
# We make a point of only returning an awaitable if there's actually
@@ -532,7 +545,9 @@ class ReplicationCommandHandler:
return
stream_name = cmd.stream_name
inbound_rdata_count.labels(stream_name).inc()
inbound_rdata_count.labels(
stream_name=stream_name, **{SERVER_NAME_LABEL: self.server_name}
).inc()
# We put the received command into a queue here for two reasons:
# 1. so we don't try and concurrently handle multiple rows for the

View File

@@ -36,10 +36,7 @@ from twisted.internet.address import IPv4Address, IPv6Address
from twisted.internet.interfaces import IAddress, IConnector
from twisted.python.failure import Failure
from synapse.logging.context import (
PreserveLoggingContext,
make_deferred_yieldable,
)
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.metrics import SERVER_NAME_LABEL
from synapse.metrics.background_process_metrics import (
BackgroundProcessLoggingContext,

View File

@@ -29,6 +29,7 @@ from prometheus_client import Counter
from twisted.internet.interfaces import IAddress
from twisted.internet.protocol import ServerFactory
from synapse.metrics import SERVER_NAME_LABEL
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.commands import PositionCommand
from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
@@ -40,7 +41,9 @@ if TYPE_CHECKING:
from synapse.server import HomeServer
stream_updates_counter = Counter(
"synapse_replication_tcp_resource_stream_updates", "", ["stream_name"]
"synapse_replication_tcp_resource_stream_updates",
"",
labelnames=["stream_name", SERVER_NAME_LABEL],
)
logger = logging.getLogger(__name__)
@@ -226,7 +229,10 @@ class ReplicationStreamer:
len(updates),
current_token,
)
stream_updates_counter.labels(stream.NAME).inc(len(updates))
stream_updates_counter.labels(
stream_name=stream.NAME,
**{SERVER_NAME_LABEL: self.server_name},
).inc(len(updates))
else:
# The token has advanced but there is no data to

View File

@@ -323,10 +323,12 @@ class UsernameAvailabilityRestServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__()
self.hs = hs
self.server_name = hs.hostname
self.registration_handler = hs.get_registration_handler()
self.ratelimiter = FederationRateLimiter(
hs.get_clock(),
FederationRatelimitSettings(
our_server_name=self.server_name,
clock=hs.get_clock(),
config=FederationRatelimitSettings(
# Time window of 2s
window_size=2000,
# Artificially delay requests if rate > sleep_limit/window_size

View File

@@ -849,7 +849,8 @@ class HomeServer(metaclass=abc.ABCMeta):
@cache_in_self
def get_federation_ratelimiter(self) -> FederationRateLimiter:
return FederationRateLimiter(
self.get_clock(),
our_server_name=self.hostname,
clock=self.get_clock(),
config=self.config.ratelimiting.rc_federation,
metrics_name="federation_servlets",
)

View File

@@ -51,6 +51,7 @@ from synapse.events.snapshot import (
)
from synapse.logging.context import ContextResourceUsage
from synapse.logging.opentracing import tag_args, trace
from synapse.metrics import SERVER_NAME_LABEL
from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServlet
from synapse.state import v1, v2
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
@@ -607,11 +608,13 @@ _biggest_room_by_cpu_counter = Counter(
"synapse_state_res_cpu_for_biggest_room_seconds",
"CPU time spent performing state resolution for the single most expensive "
"room for state resolution",
labelnames=[SERVER_NAME_LABEL],
)
_biggest_room_by_db_counter = Counter(
"synapse_state_res_db_for_biggest_room_seconds",
"Database time spent performing state resolution for the single most "
"expensive room for state resolution",
labelnames=[SERVER_NAME_LABEL],
)
_cpu_times = Histogram(
@@ -880,7 +883,9 @@ class StateResolutionHandler:
# report info on the single biggest to prometheus
_, biggest_metrics = biggest[0]
prometheus_counter_metric.inc(extract_key(biggest_metrics))
prometheus_counter_metric.labels(**{SERVER_NAME_LABEL: self.server_name}).inc(
extract_key(biggest_metrics)
)
def _make_state_cache_entry(

View File

@@ -61,6 +61,7 @@ from synapse.logging.opentracing import (
start_active_span_follows_from,
trace,
)
from synapse.metrics import SERVER_NAME_LABEL
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.controllers.state import StateStorageController
from synapse.storage.databases import Databases
@@ -82,19 +83,23 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# The number of times we are recalculating the current state
state_delta_counter = Counter("synapse_storage_events_state_delta", "")
state_delta_counter = Counter(
"synapse_storage_events_state_delta", "", labelnames=[SERVER_NAME_LABEL]
)
# The number of times we are recalculating state when there is only a
# single forward extremity
state_delta_single_event_counter = Counter(
"synapse_storage_events_state_delta_single_event", ""
"synapse_storage_events_state_delta_single_event",
"",
labelnames=[SERVER_NAME_LABEL],
)
# The number of times we are reculating state when we could have resonably
# calculated the delta when we calculated the state for an event we were
# persisting.
state_delta_reuse_delta_counter = Counter(
"synapse_storage_events_state_delta_reuse_delta", ""
"synapse_storage_events_state_delta_reuse_delta", "", labelnames=[SERVER_NAME_LABEL]
)
# The number of forward extremities for each new event.
@@ -115,16 +120,19 @@ stale_forward_extremities_counter = Histogram(
state_resolutions_during_persistence = Counter(
"synapse_storage_events_state_resolutions_during_persistence",
"Number of times we had to do state res to calculate new current state",
labelnames=[SERVER_NAME_LABEL],
)
potential_times_prune_extremities = Counter(
"synapse_storage_events_potential_times_prune_extremities",
"Number of times we might be able to prune extremities",
labelnames=[SERVER_NAME_LABEL],
)
times_pruned_extremities = Counter(
"synapse_storage_events_times_pruned_extremities",
"Number of times we were actually be able to prune extremities",
labelnames=[SERVER_NAME_LABEL],
)
@@ -709,9 +717,11 @@ class EventsPersistenceStorageController:
if all_single_prev_not_state:
return (new_forward_extremities, None)
state_delta_counter.inc()
state_delta_counter.labels(**{SERVER_NAME_LABEL: self.server_name}).inc()
if len(new_latest_event_ids) == 1:
state_delta_single_event_counter.inc()
state_delta_single_event_counter.labels(
**{SERVER_NAME_LABEL: self.server_name}
).inc()
# This is a fairly handwavey check to see if we could
# have guessed what the delta would have been when
@@ -726,7 +736,9 @@ class EventsPersistenceStorageController:
for ev, _ in ev_ctx_rm:
prev_event_ids = set(ev.prev_event_ids())
if latest_event_ids == prev_event_ids:
state_delta_reuse_delta_counter.inc()
state_delta_reuse_delta_counter.labels(
**{SERVER_NAME_LABEL: self.server_name}
).inc()
break
logger.debug("Calculating state delta for room %s", room_id)
@@ -996,7 +1008,9 @@ class EventsPersistenceStorageController:
),
)
state_resolutions_during_persistence.inc()
state_resolutions_during_persistence.labels(
**{SERVER_NAME_LABEL: self.server_name}
).inc()
# If the returned state matches the state group of one of the new
# forward extremities then we check if we are able to prune some state
@@ -1024,7 +1038,9 @@ class EventsPersistenceStorageController:
"""See if we can prune any of the extremities after calculating the
resolved state.
"""
potential_times_prune_extremities.inc()
potential_times_prune_extremities.labels(
**{SERVER_NAME_LABEL: self.server_name}
).inc()
# We keep all the extremities that have the same state group, and
# see if we can drop the others.
@@ -1122,7 +1138,7 @@ class EventsPersistenceStorageController:
return new_latest_event_ids
times_pruned_extremities.inc()
times_pruned_extremities.labels(**{SERVER_NAME_LABEL: self.server_name}).inc()
logger.info(
"Pruning forward extremities in room %s: from %s -> %s",

View File

@@ -61,7 +61,7 @@ from synapse.logging.context import (
current_context,
make_deferred_yieldable,
)
from synapse.metrics import LaterGauge, register_threadpool
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge, register_threadpool
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.background_updates import BackgroundUpdater
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
@@ -85,8 +85,16 @@ perf_logger = logging.getLogger("synapse.storage.TIME")
sql_scheduling_timer = Histogram("synapse_storage_schedule_time", "sec")
sql_query_timer = Histogram("synapse_storage_query_time", "sec", ["verb"])
sql_txn_count = Counter("synapse_storage_transaction_time_count", "sec", ["desc"])
sql_txn_duration = Counter("synapse_storage_transaction_time_sum", "sec", ["desc"])
sql_txn_count = Counter(
"synapse_storage_transaction_time_count",
"sec",
labelnames=["desc", SERVER_NAME_LABEL],
)
sql_txn_duration = Counter(
"synapse_storage_transaction_time_sum",
"sec",
labelnames=["desc", SERVER_NAME_LABEL],
)
# Unique indexes which have been added in background updates. Maps from table name
@@ -869,8 +877,14 @@ class DatabasePool:
self._current_txn_total_time += duration
self._txn_perf_counters.update(desc, duration)
sql_txn_count.labels(desc).inc(1)
sql_txn_duration.labels(desc).inc(duration)
sql_txn_count.labels(
desc=desc,
**{SERVER_NAME_LABEL: self.server_name},
).inc(1)
sql_txn_duration.labels(
desc=desc,
**{SERVER_NAME_LABEL: self.server_name},
).inc(duration)
async def runInteraction(
self,

View File

@@ -45,6 +45,7 @@ from synapse.api.errors import StoreError
from synapse.api.room_versions import EventFormatVersions, RoomVersion
from synapse.events import EventBase, make_event_from_dict
from synapse.logging.opentracing import tag_args, trace
from synapse.metrics import SERVER_NAME_LABEL
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.background_updates import ForeignKeyConstraint
@@ -81,6 +82,7 @@ pdus_pruned_from_federation_queue = Counter(
"synapse_federation_server_number_inbound_pdu_pruned",
"The number of events in the inbound federation staging that have been "
"pruned due to the queue getting too long",
labelnames=[SERVER_NAME_LABEL],
)
logger = logging.getLogger(__name__)
@@ -2003,7 +2005,9 @@ class EventFederationWorkerStore(
if not to_delete:
return False
pdus_pruned_from_federation_queue.inc(len(to_delete))
pdus_pruned_from_federation_queue.labels(
**{SERVER_NAME_LABEL: self.server_name}
).inc(len(to_delete))
logger.info(
"Pruning %d events in room %s from federation queue",
len(to_delete),

View File

@@ -55,6 +55,7 @@ from synapse.events import EventBase, StrippedStateEvent, relation_from_event
from synapse.events.snapshot import EventContext
from synapse.events.utils import parse_stripped_state_event
from synapse.logging.opentracing import trace
from synapse.metrics import SERVER_NAME_LABEL
from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
@@ -89,11 +90,13 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
persist_event_counter = Counter("synapse_storage_events_persisted_events", "")
persist_event_counter = Counter(
"synapse_storage_events_persisted_events", "", labelnames=[SERVER_NAME_LABEL]
)
event_counter = Counter(
"synapse_storage_events_persisted_events_sep",
"",
["type", "origin_type", "origin_entity"],
labelnames=["type", "origin_type", "origin_entity", SERVER_NAME_LABEL],
)
# State event type/key pairs that we need to gather to fill in the
@@ -237,6 +240,7 @@ class PersistEventsStore:
db_conn: LoggingDatabaseConnection,
):
self.hs = hs
self.server_name = hs.hostname
self.db_pool = db
self.store = main_data_store
self.database_engine = db.engine
@@ -357,7 +361,9 @@ class PersistEventsStore:
new_event_links=new_event_links,
sliding_sync_table_changes=sliding_sync_table_changes,
)
persist_event_counter.inc(len(events_and_contexts))
persist_event_counter.labels(**{SERVER_NAME_LABEL: self.server_name}).inc(
len(events_and_contexts)
)
if not use_negative_stream_ordering:
# we don't want to set the event_persisted_position to a negative
@@ -375,7 +381,12 @@ class PersistEventsStore:
origin_type = "remote"
origin_entity = get_domain_from_id(event.sender)
event_counter.labels(event.type, origin_type, origin_entity).inc()
event_counter.labels(
type=event.type,
origin_type=origin_type,
origin_entity=origin_entity,
**{SERVER_NAME_LABEL: self.server_name},
).inc()
if (
not self.hs.config.experimental.msc4293_enabled
@@ -2839,7 +2850,7 @@ class PersistEventsStore:
txn: LoggingTransaction,
events_and_contexts: List[Tuple[EventBase, EventContext]],
) -> None:
to_prefill = []
to_prefill: List[EventCacheEntry] = []
ev_map = {e.event_id: e for e, _ in events_and_contexts}
if not ev_map:

View File

@@ -52,7 +52,7 @@ from synapse.logging.context import (
run_in_background,
)
from synapse.logging.opentracing import start_active_span
from synapse.metrics import Histogram, LaterGauge
from synapse.metrics import SERVER_NAME_LABEL, Histogram, LaterGauge
from synapse.util import Clock
if typing.TYPE_CHECKING:
@@ -65,12 +65,12 @@ logger = logging.getLogger(__name__)
rate_limit_sleep_counter = Counter(
"synapse_rate_limit_sleep",
"Number of requests slept by the rate limiter",
["rate_limiter_name"],
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
)
rate_limit_reject_counter = Counter(
"synapse_rate_limit_reject",
"Number of requests rejected by the rate limiter",
["rate_limiter_name"],
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
)
queue_wait_timer = Histogram(
"synapse_rate_limit_queue_wait_time_seconds",
@@ -157,6 +157,7 @@ class FederationRateLimiter:
def __init__(
self,
our_server_name: str,
clock: Clock,
config: FederationRatelimitSettings,
metrics_name: Optional[str] = None,
@@ -174,7 +175,10 @@ class FederationRateLimiter:
def new_limiter() -> "_PerHostRatelimiter":
return _PerHostRatelimiter(
clock=clock, config=config, metrics_name=metrics_name
our_server_name=our_server_name,
clock=clock,
config=config,
metrics_name=metrics_name,
)
self.ratelimiters: DefaultDict[str, "_PerHostRatelimiter"] = (
@@ -205,6 +209,7 @@ class FederationRateLimiter:
class _PerHostRatelimiter:
def __init__(
self,
our_server_name: str,
clock: Clock,
config: FederationRatelimitSettings,
metrics_name: Optional[str] = None,
@@ -218,6 +223,7 @@ class _PerHostRatelimiter:
for this rate limiter.
from the rest in the metrics
"""
self.our_server_name = our_server_name
self.clock = clock
self.metrics_name = metrics_name
@@ -296,7 +302,10 @@ class _PerHostRatelimiter:
if self.should_reject():
logger.debug("Ratelimiter(%s): rejecting request", self.host)
if self.metrics_name:
rate_limit_reject_counter.labels(self.metrics_name).inc()
rate_limit_reject_counter.labels(
rate_limiter_name=self.metrics_name,
**{SERVER_NAME_LABEL: self.our_server_name},
).inc()
raise LimitExceededError(
limiter_name="rc_federation",
retry_after_ms=int(self.window_size / self.sleep_limit),
@@ -333,7 +342,10 @@ class _PerHostRatelimiter:
self.sleep_sec,
)
if self.metrics_name:
rate_limit_sleep_counter.labels(self.metrics_name).inc()
rate_limit_sleep_counter.labels(
rate_limiter_name=self.metrics_name,
**{SERVER_NAME_LABEL: self.our_server_name},
).inc()
ret_defer = run_in_background(self.clock.sleep, self.sleep_sec)
self.sleeping_requests.add(request_id)

View File

@@ -867,7 +867,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
# First test a known access token
channel = FakeChannel(self.site, self.reactor)
# type-ignore: FakeChannel is a mock of an HTTPChannel, not a proper HTTPChannel
req = SynapseRequest(channel, self.site) # type: ignore[arg-type]
req = SynapseRequest(channel, self.site, self.hs.hostname) # type: ignore[arg-type]
req.client.host = EXAMPLE_IPV4_ADDR
req.requestHeaders.addRawHeader("Authorization", f"Bearer {known_token}")
req.requestHeaders.addRawHeader("User-Agent", EXAMPLE_USER_AGENT)
@@ -899,7 +899,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
MAS_USER_AGENT = "masmasmas"
channel = FakeChannel(self.site, self.reactor)
req = SynapseRequest(channel, self.site) # type: ignore[arg-type]
req = SynapseRequest(channel, self.site, self.hs.hostname) # type: ignore[arg-type]
req.client.host = MAS_IPV4_ADDR
req.requestHeaders.addRawHeader(
"Authorization", f"Bearer {self.auth._admin_token()}"

View File

@@ -90,6 +90,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
prev_state,
new_state,
is_mine=True,
our_server_name=self.hs.hostname,
wheel_timer=wheel_timer,
now=now,
persist=False,
@@ -137,6 +138,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
prev_state,
new_state,
is_mine=True,
our_server_name=self.hs.hostname,
wheel_timer=wheel_timer,
now=now,
persist=False,
@@ -187,6 +189,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
prev_state,
new_state,
is_mine=True,
our_server_name=self.hs.hostname,
wheel_timer=wheel_timer,
now=now,
persist=False,
@@ -235,6 +238,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
prev_state,
new_state,
is_mine=True,
our_server_name=self.hs.hostname,
wheel_timer=wheel_timer,
now=now,
persist=False,
@@ -275,6 +279,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
prev_state,
new_state,
is_mine=False,
our_server_name=self.hs.hostname,
wheel_timer=wheel_timer,
now=now,
persist=False,
@@ -314,6 +319,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
prev_state,
new_state,
is_mine=True,
our_server_name=self.hs.hostname,
wheel_timer=wheel_timer,
now=now,
persist=False,
@@ -341,6 +347,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
prev_state,
new_state,
is_mine=True,
our_server_name=self.hs.hostname,
wheel_timer=wheel_timer,
now=now,
persist=False,
@@ -431,6 +438,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
prev_state,
new_state,
is_mine=True,
our_server_name=self.hs.hostname,
wheel_timer=wheel_timer,
now=now,
persist=True,
@@ -494,6 +502,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
prev_state,
new_state,
is_mine=True,
our_server_name=self.hs.hostname,
wheel_timer=wheel_timer,
now=now,
persist=False,

View File

@@ -171,8 +171,9 @@ class TerseJsonTestCase(LoggerCleanupMixin, TestCase):
site.site_tag = "test-site"
site.server_version_string = "Server v1"
site.reactor = Mock()
request = SynapseRequest(
cast(HTTPChannel, FakeChannel(site, self.reactor)), site
cast(HTTPChannel, FakeChannel(site, self.reactor)), site, "test_server"
)
# Call requestReceived to finish instantiating the object.
request.content = BytesIO()

View File

@@ -99,7 +99,7 @@ class RemoteKeyResourceTestCase(BaseRemoteKeyResourceTestCase):
"""
channel = FakeChannel(self.site, self.reactor)
# channel is a `FakeChannel` but `HTTPChannel` is expected
req = SynapseRequest(channel, self.site) # type: ignore[arg-type]
req = SynapseRequest(channel, self.site, self.hs.hostname) # type: ignore[arg-type]
req.content = BytesIO(b"")
req.requestReceived(
b"GET",
@@ -201,7 +201,7 @@ class EndToEndPerspectivesTests(BaseRemoteKeyResourceTestCase):
channel = FakeChannel(self.site, self.reactor)
# channel is a `FakeChannel` but `HTTPChannel` is expected
req = SynapseRequest(channel, self.site) # type: ignore[arg-type]
req = SynapseRequest(channel, self.site, self.hs.hostname) # type: ignore[arg-type]
req.content = BytesIO(encode_canonical_json(data))
req.requestReceived(

View File

@@ -432,7 +432,7 @@ def make_request(
channel = FakeChannel(site, reactor, ip=client_ip)
req = request(channel, site)
req = request(channel, site, our_server_name="test_server")
channel.request = req
req.content = BytesIO(content)

View File

@@ -37,7 +37,7 @@ class FederationRateLimiterTestCase(TestCase):
"""A simple test with the default values"""
reactor, clock = get_clock()
rc_config = build_rc_config()
ratelimiter = FederationRateLimiter(clock, rc_config)
ratelimiter = FederationRateLimiter("test_server", clock, rc_config)
with ratelimiter.ratelimit("testhost") as d1:
# shouldn't block
@@ -47,7 +47,7 @@ class FederationRateLimiterTestCase(TestCase):
"""Test what happens when we hit the concurrent limit"""
reactor, clock = get_clock()
rc_config = build_rc_config({"rc_federation": {"concurrent": 2}})
ratelimiter = FederationRateLimiter(clock, rc_config)
ratelimiter = FederationRateLimiter("test_server", clock, rc_config)
with ratelimiter.ratelimit("testhost") as d1:
# shouldn't block
@@ -74,7 +74,7 @@ class FederationRateLimiterTestCase(TestCase):
rc_config = build_rc_config(
{"rc_federation": {"sleep_limit": 2, "sleep_delay": 500}}
)
ratelimiter = FederationRateLimiter(clock, rc_config)
ratelimiter = FederationRateLimiter("test_server", clock, rc_config)
with ratelimiter.ratelimit("testhost") as d1:
# shouldn't block
@@ -105,7 +105,7 @@ class FederationRateLimiterTestCase(TestCase):
}
}
)
ratelimiter = FederationRateLimiter(clock, rc_config)
ratelimiter = FederationRateLimiter("test_server", clock, rc_config)
with ratelimiter.ratelimit("testhost") as d:
# shouldn't block