mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-07 01:20:16 +00:00
Compare commits
2 Commits
rei/remove
...
dependabot
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f7103be392 | ||
|
|
2c236be058 |
1
changelog.d/18656.misc
Normal file
1
changelog.d/18656.misc
Normal file
@@ -0,0 +1 @@
|
||||
Refactor `Counter` metrics to be homeserver-scoped.
|
||||
51
poetry.lock
generated
51
poetry.lock
generated
@@ -39,7 +39,7 @@ description = "The ultimate Python library in building OAuth and OpenID Connect
|
||||
optional = true
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"oidc\" or extra == \"jwt\" or extra == \"all\""
|
||||
markers = "extra == \"all\" or extra == \"jwt\" or extra == \"oidc\""
|
||||
files = [
|
||||
{file = "authlib-1.6.1-py2.py3-none-any.whl", hash = "sha256:e9d2031c34c6309373ab845afc24168fe9e93dc52d252631f52642f21f5ed06e"},
|
||||
{file = "authlib-1.6.1.tar.gz", hash = "sha256:4dffdbb1460ba6ec8c17981a4c67af7d8af131231b5a36a88a1e8c80c111cdfd"},
|
||||
@@ -435,7 +435,7 @@ description = "XML bomb protection for Python stdlib modules"
|
||||
optional = true
|
||||
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"saml2\" or extra == \"all\""
|
||||
markers = "extra == \"all\" or extra == \"saml2\""
|
||||
files = [
|
||||
{file = "defusedxml-0.7.1-py2.py3-none-any.whl", hash = "sha256:a352e7e428770286cc899e2542b6cdaedb2b4953ff269a210103ec58f6198a61"},
|
||||
{file = "defusedxml-0.7.1.tar.gz", hash = "sha256:1bb3032db185915b62d7c6209c5a8792be6a32ab2fedacc84e01b52c51aa3e69"},
|
||||
@@ -478,7 +478,7 @@ description = "XPath 1.0/2.0/3.0/3.1 parsers and selectors for ElementTree and l
|
||||
optional = true
|
||||
python-versions = ">=3.7"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"saml2\" or extra == \"all\""
|
||||
markers = "extra == \"all\" or extra == \"saml2\""
|
||||
files = [
|
||||
{file = "elementpath-4.1.5-py3-none-any.whl", hash = "sha256:2ac1a2fb31eb22bbbf817f8cf6752f844513216263f0e3892c8e79782fe4bb55"},
|
||||
{file = "elementpath-4.1.5.tar.gz", hash = "sha256:c2d6dc524b29ef751ecfc416b0627668119d8812441c555d7471da41d4bacb8d"},
|
||||
@@ -504,18 +504,19 @@ smmap = ">=3.0.1,<6"
|
||||
|
||||
[[package]]
|
||||
name = "gitpython"
|
||||
version = "3.1.44"
|
||||
version = "3.1.45"
|
||||
description = "GitPython is a Python library used to interact with Git repositories"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
groups = ["dev"]
|
||||
files = [
|
||||
{file = "GitPython-3.1.44-py3-none-any.whl", hash = "sha256:9e0e10cda9bed1ee64bc9a6de50e7e38a9c9943241cd7f585f6df3ed28011110"},
|
||||
{file = "gitpython-3.1.44.tar.gz", hash = "sha256:c87e30b26253bf5418b01b0660f818967f3c503193838337fe5e573331249269"},
|
||||
{file = "gitpython-3.1.45-py3-none-any.whl", hash = "sha256:8908cb2e02fb3b93b7eb0f2827125cb699869470432cc885f019b8fd0fccff77"},
|
||||
{file = "gitpython-3.1.45.tar.gz", hash = "sha256:85b0ee964ceddf211c41b9f27a49086010a190fd8132a24e21f362a4b36a791c"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
gitdb = ">=4.0.1,<5"
|
||||
typing-extensions = {version = ">=3.10.0.2", markers = "python_version < \"3.10\""}
|
||||
|
||||
[package.extras]
|
||||
doc = ["sphinx (>=7.1.2,<7.2)", "sphinx-autodoc-typehints", "sphinx_rtd_theme"]
|
||||
@@ -528,7 +529,7 @@ description = "Python wrapper for hiredis"
|
||||
optional = true
|
||||
python-versions = ">=3.8"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"redis\" or extra == \"all\""
|
||||
markers = "extra == \"all\" or extra == \"redis\""
|
||||
files = [
|
||||
{file = "hiredis-3.2.1-cp310-cp310-macosx_10_15_universal2.whl", hash = "sha256:add17efcbae46c5a6a13b244ff0b4a8fa079602ceb62290095c941b42e9d5dec"},
|
||||
{file = "hiredis-3.2.1-cp310-cp310-macosx_10_15_x86_64.whl", hash = "sha256:5fe955cc4f66c57df1ae8e5caf4de2925d43b5efab4e40859662311d1bcc5f54"},
|
||||
@@ -865,7 +866,7 @@ description = "Jaeger Python OpenTracing Tracer implementation"
|
||||
optional = true
|
||||
python-versions = ">=3.7"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"opentracing\" or extra == \"all\""
|
||||
markers = "extra == \"all\" or extra == \"opentracing\""
|
||||
files = [
|
||||
{file = "jaeger-client-4.8.0.tar.gz", hash = "sha256:3157836edab8e2c209bd2d6ae61113db36f7ee399e66b1dcbb715d87ab49bfe0"},
|
||||
]
|
||||
@@ -1003,7 +1004,7 @@ description = "A strictly RFC 4510 conforming LDAP V3 pure Python client library
|
||||
optional = true
|
||||
python-versions = "*"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"matrix-synapse-ldap3\" or extra == \"all\""
|
||||
markers = "extra == \"all\" or extra == \"matrix-synapse-ldap3\""
|
||||
files = [
|
||||
{file = "ldap3-2.9.1-py2.py3-none-any.whl", hash = "sha256:5869596fc4948797020d3f03b7939da938778a0f9e2009f7a072ccf92b8e8d70"},
|
||||
{file = "ldap3-2.9.1.tar.gz", hash = "sha256:f3e7fc4718e3f09dda568b57100095e0ce58633bcabbed8667ce3f8fbaa4229f"},
|
||||
@@ -1019,7 +1020,7 @@ description = "Powerful and Pythonic XML processing library combining libxml2/li
|
||||
optional = true
|
||||
python-versions = ">=3.8"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"url-preview\" or extra == \"all\""
|
||||
markers = "extra == \"all\" or extra == \"url-preview\""
|
||||
files = [
|
||||
{file = "lxml-6.0.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:35bc626eec405f745199200ccb5c6b36f202675d204aa29bb52e27ba2b71dea8"},
|
||||
{file = "lxml-6.0.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:246b40f8a4aec341cbbf52617cad8ab7c888d944bfe12a6abd2b1f6cfb6f6082"},
|
||||
@@ -1260,7 +1261,7 @@ description = "An LDAP3 auth provider for Synapse"
|
||||
optional = true
|
||||
python-versions = ">=3.7"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"matrix-synapse-ldap3\" or extra == \"all\""
|
||||
markers = "extra == \"all\" or extra == \"matrix-synapse-ldap3\""
|
||||
files = [
|
||||
{file = "matrix-synapse-ldap3-0.3.0.tar.gz", hash = "sha256:8bb6517173164d4b9cc44f49de411d8cebdb2e705d5dd1ea1f38733c4a009e1d"},
|
||||
{file = "matrix_synapse_ldap3-0.3.0-py3-none-any.whl", hash = "sha256:8b4d701f8702551e98cc1d8c20dbed532de5613584c08d0df22de376ba99159d"},
|
||||
@@ -1493,7 +1494,7 @@ description = "OpenTracing API for Python. See documentation at http://opentraci
|
||||
optional = true
|
||||
python-versions = "*"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"opentracing\" or extra == \"all\""
|
||||
markers = "extra == \"all\" or extra == \"opentracing\""
|
||||
files = [
|
||||
{file = "opentracing-2.4.0.tar.gz", hash = "sha256:a173117e6ef580d55874734d1fa7ecb6f3655160b8b8974a2a1e98e5ec9c840d"},
|
||||
]
|
||||
@@ -1699,7 +1700,7 @@ description = "psycopg2 - Python-PostgreSQL Database Adapter"
|
||||
optional = true
|
||||
python-versions = ">=3.8"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"postgres\" or extra == \"all\""
|
||||
markers = "extra == \"all\" or extra == \"postgres\""
|
||||
files = [
|
||||
{file = "psycopg2-2.9.10-cp310-cp310-win32.whl", hash = "sha256:5df2b672140f95adb453af93a7d669d7a7bf0a56bcd26f1502329166f4a61716"},
|
||||
{file = "psycopg2-2.9.10-cp310-cp310-win_amd64.whl", hash = "sha256:c6f7b8561225f9e711a9c47087388a97fdc948211c10a4bccbf0ba68ab7b3b5a"},
|
||||
@@ -1720,7 +1721,7 @@ description = ".. image:: https://travis-ci.org/chtd/psycopg2cffi.svg?branch=mas
|
||||
optional = true
|
||||
python-versions = "*"
|
||||
groups = ["main"]
|
||||
markers = "platform_python_implementation == \"PyPy\" and (extra == \"postgres\" or extra == \"all\")"
|
||||
markers = "platform_python_implementation == \"PyPy\" and (extra == \"all\" or extra == \"postgres\")"
|
||||
files = [
|
||||
{file = "psycopg2cffi-2.9.0.tar.gz", hash = "sha256:7e272edcd837de3a1d12b62185eb85c45a19feda9e62fa1b120c54f9e8d35c52"},
|
||||
]
|
||||
@@ -1736,7 +1737,7 @@ description = "A Simple library to enable psycopg2 compatability"
|
||||
optional = true
|
||||
python-versions = "*"
|
||||
groups = ["main"]
|
||||
markers = "platform_python_implementation == \"PyPy\" and (extra == \"postgres\" or extra == \"all\")"
|
||||
markers = "platform_python_implementation == \"PyPy\" and (extra == \"all\" or extra == \"postgres\")"
|
||||
files = [
|
||||
{file = "psycopg2cffi-compat-1.1.tar.gz", hash = "sha256:d25e921748475522b33d13420aad5c2831c743227dc1f1f2585e0fdb5c914e05"},
|
||||
]
|
||||
@@ -1996,7 +1997,7 @@ description = "A development tool to measure, monitor and analyze the memory beh
|
||||
optional = true
|
||||
python-versions = ">=3.6"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"cache-memory\" or extra == \"all\""
|
||||
markers = "extra == \"all\" or extra == \"cache-memory\""
|
||||
files = [
|
||||
{file = "Pympler-1.0.1-py3-none-any.whl", hash = "sha256:d260dda9ae781e1eab6ea15bacb84015849833ba5555f141d2d9b7b7473b307d"},
|
||||
{file = "Pympler-1.0.1.tar.gz", hash = "sha256:993f1a3599ca3f4fcd7160c7545ad06310c9e12f70174ae7ae8d4e25f6c5d3fa"},
|
||||
@@ -2056,7 +2057,7 @@ description = "Python implementation of SAML Version 2 Standard"
|
||||
optional = true
|
||||
python-versions = ">=3.9,<4.0"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"saml2\" or extra == \"all\""
|
||||
markers = "extra == \"all\" or extra == \"saml2\""
|
||||
files = [
|
||||
{file = "pysaml2-7.5.0-py3-none-any.whl", hash = "sha256:bc6627cc344476a83c757f440a73fda1369f13b6fda1b4e16bca63ffbabb5318"},
|
||||
{file = "pysaml2-7.5.0.tar.gz", hash = "sha256:f36871d4e5ee857c6b85532e942550d2cf90ea4ee943d75eb681044bbc4f54f7"},
|
||||
@@ -2081,7 +2082,7 @@ description = "Extensions to the standard Python datetime module"
|
||||
optional = true
|
||||
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"saml2\" or extra == \"all\""
|
||||
markers = "extra == \"all\" or extra == \"saml2\""
|
||||
files = [
|
||||
{file = "python-dateutil-2.8.2.tar.gz", hash = "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86"},
|
||||
{file = "python_dateutil-2.8.2-py2.py3-none-any.whl", hash = "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9"},
|
||||
@@ -2109,7 +2110,7 @@ description = "World timezone definitions, modern and historical"
|
||||
optional = true
|
||||
python-versions = "*"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"saml2\" or extra == \"all\""
|
||||
markers = "extra == \"all\" or extra == \"saml2\""
|
||||
files = [
|
||||
{file = "pytz-2022.7.1-py2.py3-none-any.whl", hash = "sha256:78f4f37d8198e0627c5f1143240bb0206b8691d8d7ac6d78fee88b78733f8c4a"},
|
||||
{file = "pytz-2022.7.1.tar.gz", hash = "sha256:01a0681c4b9684a28304615eba55d1ab31ae00bf68ec157ec3708a8182dbbcd0"},
|
||||
@@ -2474,7 +2475,7 @@ description = "Python client for Sentry (https://sentry.io)"
|
||||
optional = true
|
||||
python-versions = ">=3.6"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"sentry\" or extra == \"all\""
|
||||
markers = "extra == \"all\" or extra == \"sentry\""
|
||||
files = [
|
||||
{file = "sentry_sdk-2.32.0-py2.py3-none-any.whl", hash = "sha256:6cf51521b099562d7ce3606da928c473643abe99b00ce4cb5626ea735f4ec345"},
|
||||
{file = "sentry_sdk-2.32.0.tar.gz", hash = "sha256:9016c75d9316b0f6921ac14c8cd4fb938f26002430ac5be9945ab280f78bec6b"},
|
||||
@@ -2662,7 +2663,7 @@ description = "Tornado IOLoop Backed Concurrent Futures"
|
||||
optional = true
|
||||
python-versions = "*"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"opentracing\" or extra == \"all\""
|
||||
markers = "extra == \"all\" or extra == \"opentracing\""
|
||||
files = [
|
||||
{file = "threadloop-1.0.2-py2-none-any.whl", hash = "sha256:5c90dbefab6ffbdba26afb4829d2a9df8275d13ac7dc58dccb0e279992679599"},
|
||||
{file = "threadloop-1.0.2.tar.gz", hash = "sha256:8b180aac31013de13c2ad5c834819771992d350267bddb854613ae77ef571944"},
|
||||
@@ -2678,7 +2679,7 @@ description = "Python bindings for the Apache Thrift RPC system"
|
||||
optional = true
|
||||
python-versions = "*"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"opentracing\" or extra == \"all\""
|
||||
markers = "extra == \"all\" or extra == \"opentracing\""
|
||||
files = [
|
||||
{file = "thrift-0.16.0.tar.gz", hash = "sha256:2b5b6488fcded21f9d312aa23c9ff6a0195d0f6ae26ddbd5ad9e3e25dfc14408"},
|
||||
]
|
||||
@@ -2740,7 +2741,7 @@ description = "Tornado is a Python web framework and asynchronous networking lib
|
||||
optional = true
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"opentracing\" or extra == \"all\""
|
||||
markers = "extra == \"all\" or extra == \"opentracing\""
|
||||
files = [
|
||||
{file = "tornado-6.5-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:f81067dad2e4443b015368b24e802d0083fecada4f0a4572fdb72fc06e54a9a6"},
|
||||
{file = "tornado-6.5-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:9ac1cbe1db860b3cbb251e795c701c41d343f06a96049d6274e7c77559117e41"},
|
||||
@@ -2877,7 +2878,7 @@ description = "non-blocking redis client for python"
|
||||
optional = true
|
||||
python-versions = "*"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"redis\" or extra == \"all\""
|
||||
markers = "extra == \"all\" or extra == \"redis\""
|
||||
files = [
|
||||
{file = "txredisapi-1.4.11-py3-none-any.whl", hash = "sha256:ac64d7a9342b58edca13ef267d4fa7637c1aa63f8595e066801c1e8b56b22d0b"},
|
||||
{file = "txredisapi-1.4.11.tar.gz", hash = "sha256:3eb1af99aefdefb59eb877b1dd08861efad60915e30ad5bf3d5bf6c5cedcdbc6"},
|
||||
@@ -3208,7 +3209,7 @@ description = "An XML Schema validator and decoder"
|
||||
optional = true
|
||||
python-versions = ">=3.7"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"saml2\" or extra == \"all\""
|
||||
markers = "extra == \"all\" or extra == \"saml2\""
|
||||
files = [
|
||||
{file = "xmlschema-2.4.0-py3-none-any.whl", hash = "sha256:dc87be0caaa61f42649899189aab2fd8e0d567f2cf548433ba7b79278d231a4a"},
|
||||
{file = "xmlschema-2.4.0.tar.gz", hash = "sha256:d74cd0c10866ac609e1ef94a5a69b018ad16e39077bc6393408b40c6babee793"},
|
||||
|
||||
@@ -28,8 +28,13 @@ from typing import Callable, Optional, Tuple, Type, Union
|
||||
import mypy.types
|
||||
from mypy.erasetype import remove_instance_last_known_values
|
||||
from mypy.errorcodes import ErrorCode
|
||||
from mypy.nodes import ARG_NAMED_OPT, TempNode, Var
|
||||
from mypy.plugin import FunctionSigContext, MethodSigContext, Plugin
|
||||
from mypy.nodes import ARG_NAMED_OPT, ListExpr, NameExpr, TempNode, Var
|
||||
from mypy.plugin import (
|
||||
FunctionLike,
|
||||
FunctionSigContext,
|
||||
MethodSigContext,
|
||||
Plugin,
|
||||
)
|
||||
from mypy.typeops import bind_self
|
||||
from mypy.types import (
|
||||
AnyType,
|
||||
@@ -43,8 +48,26 @@ from mypy.types import (
|
||||
UnionType,
|
||||
)
|
||||
|
||||
PROMETHEUS_METRIC_MISSING_SERVER_NAME_LABEL = ErrorCode(
|
||||
"missing-server-name-label",
|
||||
"`SERVER_NAME_LABEL` required in metric",
|
||||
category="per-homeserver-tenant-metrics",
|
||||
)
|
||||
|
||||
|
||||
class SynapsePlugin(Plugin):
|
||||
def get_function_signature_hook(
|
||||
self, fullname: str
|
||||
) -> Optional[Callable[[FunctionSigContext], FunctionLike]]:
|
||||
if fullname in (
|
||||
"prometheus_client.metrics.Counter",
|
||||
# TODO: Add other prometheus_client metrics that need checking as we
|
||||
# refactor, see https://github.com/element-hq/synapse/issues/18592
|
||||
):
|
||||
return check_prometheus_metric_instantiation
|
||||
|
||||
return None
|
||||
|
||||
def get_method_signature_hook(
|
||||
self, fullname: str
|
||||
) -> Optional[Callable[[MethodSigContext], CallableType]]:
|
||||
@@ -65,6 +88,85 @@ class SynapsePlugin(Plugin):
|
||||
return None
|
||||
|
||||
|
||||
def check_prometheus_metric_instantiation(ctx: FunctionSigContext) -> CallableType:
|
||||
"""
|
||||
Ensure that the `prometheus_client` metrics include the `SERVER_NAME_LABEL` label
|
||||
when instantiated.
|
||||
|
||||
This is important because we support multiple Synapse instances running in the same
|
||||
process, where all metrics share a single global `REGISTRY`. The `server_name` label
|
||||
ensures metrics are correctly separated by homeserver.
|
||||
|
||||
There are also some metrics that apply at the process level, such as CPU usage,
|
||||
Python garbage collection, Twisted reactor tick time which shouldn't have the
|
||||
`SERVER_NAME_LABEL`. In those cases, use use a type ignore comment to disable the
|
||||
check, e.g. `# type: ignore[missing-server-name-label]`.
|
||||
"""
|
||||
# The true signature, this isn't being modified so this is what will be returned.
|
||||
signature: CallableType = ctx.default_signature
|
||||
|
||||
# Sanity check the arguments are still as expected in this version of
|
||||
# `prometheus_client`. ex. `Counter(name, documentation, labelnames, ...)`
|
||||
#
|
||||
# `signature.arg_names` should be: ["name", "documentation", "labelnames", ...]
|
||||
if len(signature.arg_names) < 3 or signature.arg_names[2] != "labelnames":
|
||||
ctx.api.fail(
|
||||
f"Expected the 3rd argument of {signature.name} to be 'labelnames', but got "
|
||||
f"{signature.arg_names[2]}",
|
||||
ctx.context,
|
||||
)
|
||||
return signature
|
||||
|
||||
# Ensure mypy is passing the correct number of arguments because we are doing some
|
||||
# dirty indexing into `ctx.args` later on.
|
||||
assert len(ctx.args) == len(signature.arg_names), (
|
||||
f"Expected the list of arguments in the {signature.name} signature ({len(signature.arg_names)})"
|
||||
f"to match the number of arguments from the function signature context ({len(ctx.args)})"
|
||||
)
|
||||
|
||||
# Check if the `labelnames` argument includes `SERVER_NAME_LABEL`
|
||||
#
|
||||
# `ctx.args` should look like this:
|
||||
# ```
|
||||
# [
|
||||
# [StrExpr("name")],
|
||||
# [StrExpr("documentation")],
|
||||
# [ListExpr([StrExpr("label1"), StrExpr("label2")])]
|
||||
# ...
|
||||
# ]
|
||||
# ```
|
||||
labelnames_arg_expression = ctx.args[2][0] if len(ctx.args[2]) > 0 else None
|
||||
if isinstance(labelnames_arg_expression, ListExpr):
|
||||
# Check if the `labelnames` argument includes the `server_name` label (`SERVER_NAME_LABEL`).
|
||||
for labelname_expression in labelnames_arg_expression.items:
|
||||
if (
|
||||
isinstance(labelname_expression, NameExpr)
|
||||
and labelname_expression.fullname == "synapse.metrics.SERVER_NAME_LABEL"
|
||||
):
|
||||
# Found the `SERVER_NAME_LABEL`, all good!
|
||||
break
|
||||
else:
|
||||
ctx.api.fail(
|
||||
f"Expected {signature.name} to include `SERVER_NAME_LABEL` in the list of labels. "
|
||||
"If this is a process-level metric (vs homeserver-level), use a type ignore comment "
|
||||
"to disable this check.",
|
||||
ctx.context,
|
||||
code=PROMETHEUS_METRIC_MISSING_SERVER_NAME_LABEL,
|
||||
)
|
||||
else:
|
||||
ctx.api.fail(
|
||||
f"Expected the `labelnames` argument of {signature.name} to be a list of label names "
|
||||
f"(including `SERVER_NAME_LABEL`), but got {labelnames_arg_expression}. "
|
||||
"If this is a process-level metric (vs homeserver-level), use a type ignore comment "
|
||||
"to disable this check.",
|
||||
ctx.context,
|
||||
code=PROMETHEUS_METRIC_MISSING_SERVER_NAME_LABEL,
|
||||
)
|
||||
return signature
|
||||
|
||||
return signature
|
||||
|
||||
|
||||
def _get_true_return_type(signature: CallableType) -> mypy.types.Type:
|
||||
"""
|
||||
Get the "final" return type of a callable which might return an Awaitable/Deferred.
|
||||
|
||||
@@ -48,6 +48,7 @@ from synapse.events import EventBase
|
||||
from synapse.events.utils import SerializeEventConfig, serialize_event
|
||||
from synapse.http.client import SimpleHttpClient, is_unknown_endpoint
|
||||
from synapse.logging import opentracing
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.types import DeviceListUpdates, JsonDict, JsonMapping, ThirdPartyInstanceID
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
|
||||
@@ -59,29 +60,31 @@ logger = logging.getLogger(__name__)
|
||||
sent_transactions_counter = Counter(
|
||||
"synapse_appservice_api_sent_transactions",
|
||||
"Number of /transactions/ requests sent",
|
||||
["service"],
|
||||
labelnames=["service", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
failed_transactions_counter = Counter(
|
||||
"synapse_appservice_api_failed_transactions",
|
||||
"Number of /transactions/ requests that failed to send",
|
||||
["service"],
|
||||
labelnames=["service", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
sent_events_counter = Counter(
|
||||
"synapse_appservice_api_sent_events", "Number of events sent to the AS", ["service"]
|
||||
"synapse_appservice_api_sent_events",
|
||||
"Number of events sent to the AS",
|
||||
labelnames=["service", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
sent_ephemeral_counter = Counter(
|
||||
"synapse_appservice_api_sent_ephemeral",
|
||||
"Number of ephemeral events sent to the AS",
|
||||
["service"],
|
||||
labelnames=["service", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
sent_todevice_counter = Counter(
|
||||
"synapse_appservice_api_sent_todevice",
|
||||
"Number of todevice messages sent to the AS",
|
||||
["service"],
|
||||
labelnames=["service", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
HOUR_IN_MS = 60 * 60 * 1000
|
||||
@@ -382,6 +385,7 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
"left": list(device_list_summary.left),
|
||||
}
|
||||
|
||||
labels = {"service": service.id, SERVER_NAME_LABEL: self.server_name}
|
||||
try:
|
||||
args = None
|
||||
if self.config.use_appservice_legacy_authorization:
|
||||
@@ -399,10 +403,10 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
service.url,
|
||||
[event.get("event_id") for event in events],
|
||||
)
|
||||
sent_transactions_counter.labels(service.id).inc()
|
||||
sent_events_counter.labels(service.id).inc(len(serialized_events))
|
||||
sent_ephemeral_counter.labels(service.id).inc(len(ephemeral))
|
||||
sent_todevice_counter.labels(service.id).inc(len(to_device_messages))
|
||||
sent_transactions_counter.labels(**labels).inc()
|
||||
sent_events_counter.labels(**labels).inc(len(serialized_events))
|
||||
sent_ephemeral_counter.labels(**labels).inc(len(ephemeral))
|
||||
sent_todevice_counter.labels(**labels).inc(len(to_device_messages))
|
||||
return True
|
||||
except CodeMessageException as e:
|
||||
logger.warning(
|
||||
@@ -421,7 +425,7 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
ex.args,
|
||||
exc_info=logger.isEnabledFor(logging.DEBUG),
|
||||
)
|
||||
failed_transactions_counter.labels(service.id).inc()
|
||||
failed_transactions_counter.labels(**labels).inc()
|
||||
return False
|
||||
|
||||
async def claim_client_keys(
|
||||
|
||||
@@ -74,6 +74,7 @@ from synapse.federation.transport.client import SendJoinResponse
|
||||
from synapse.http.client import is_unknown_endpoint
|
||||
from synapse.http.types import QueryParams
|
||||
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, tag_args, trace
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.types import JsonDict, StrCollection, UserID, get_domain_from_id
|
||||
from synapse.types.handlers.policy_server import RECOMMENDATION_OK, RECOMMENDATION_SPAM
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
@@ -85,7 +86,9 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["type"])
|
||||
sent_queries_counter = Counter(
|
||||
"synapse_federation_client_sent_queries", "", labelnames=["type", SERVER_NAME_LABEL]
|
||||
)
|
||||
|
||||
|
||||
PDU_RETRY_TIME_MS = 1 * 60 * 1000
|
||||
@@ -209,7 +212,10 @@ class FederationClient(FederationBase):
|
||||
Returns:
|
||||
The JSON object from the response
|
||||
"""
|
||||
sent_queries_counter.labels(query_type).inc()
|
||||
sent_queries_counter.labels(
|
||||
type=query_type,
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
|
||||
return await self.transport_layer.make_query(
|
||||
destination,
|
||||
@@ -231,7 +237,10 @@ class FederationClient(FederationBase):
|
||||
Returns:
|
||||
The JSON object from the response
|
||||
"""
|
||||
sent_queries_counter.labels("client_device_keys").inc()
|
||||
sent_queries_counter.labels(
|
||||
type="client_device_keys",
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
return await self.transport_layer.query_client_keys(
|
||||
destination, content, timeout
|
||||
)
|
||||
@@ -242,7 +251,10 @@ class FederationClient(FederationBase):
|
||||
"""Query the device keys for a list of user ids hosted on a remote
|
||||
server.
|
||||
"""
|
||||
sent_queries_counter.labels("user_devices").inc()
|
||||
sent_queries_counter.labels(
|
||||
type="user_devices",
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
return await self.transport_layer.query_user_devices(
|
||||
destination, user_id, timeout
|
||||
)
|
||||
@@ -264,7 +276,10 @@ class FederationClient(FederationBase):
|
||||
Returns:
|
||||
The JSON object from the response
|
||||
"""
|
||||
sent_queries_counter.labels("client_one_time_keys").inc()
|
||||
sent_queries_counter.labels(
|
||||
type="client_one_time_keys",
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
|
||||
# Convert the query with counts into a stable and unstable query and check
|
||||
# if attempting to claim more than 1 OTK.
|
||||
|
||||
@@ -82,6 +82,7 @@ from synapse.logging.opentracing import (
|
||||
tag_args,
|
||||
trace,
|
||||
)
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.replication.http.federation import (
|
||||
ReplicationFederationSendEduRestServlet,
|
||||
@@ -104,12 +105,18 @@ TRANSACTION_CONCURRENCY_LIMIT = 10
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
received_pdus_counter = Counter("synapse_federation_server_received_pdus", "")
|
||||
received_pdus_counter = Counter(
|
||||
"synapse_federation_server_received_pdus", "", labelnames=[SERVER_NAME_LABEL]
|
||||
)
|
||||
|
||||
received_edus_counter = Counter("synapse_federation_server_received_edus", "")
|
||||
received_edus_counter = Counter(
|
||||
"synapse_federation_server_received_edus", "", labelnames=[SERVER_NAME_LABEL]
|
||||
)
|
||||
|
||||
received_queries_counter = Counter(
|
||||
"synapse_federation_server_received_queries", "", ["type"]
|
||||
"synapse_federation_server_received_queries",
|
||||
"",
|
||||
labelnames=["type", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
pdu_process_time = Histogram(
|
||||
@@ -434,7 +441,9 @@ class FederationServer(FederationBase):
|
||||
report back to the sending server.
|
||||
"""
|
||||
|
||||
received_pdus_counter.inc(len(transaction.pdus))
|
||||
received_pdus_counter.labels(**{SERVER_NAME_LABEL: self.server_name}).inc(
|
||||
len(transaction.pdus)
|
||||
)
|
||||
|
||||
origin_host, _ = parse_server_name(origin)
|
||||
|
||||
@@ -553,7 +562,7 @@ class FederationServer(FederationBase):
|
||||
"""Process the EDUs in a received transaction."""
|
||||
|
||||
async def _process_edu(edu_dict: JsonDict) -> None:
|
||||
received_edus_counter.inc()
|
||||
received_edus_counter.labels(**{SERVER_NAME_LABEL: self.server_name}).inc()
|
||||
|
||||
edu = Edu(
|
||||
origin=origin,
|
||||
@@ -668,7 +677,10 @@ class FederationServer(FederationBase):
|
||||
async def on_query_request(
|
||||
self, query_type: str, args: Dict[str, str]
|
||||
) -> Tuple[int, Dict[str, Any]]:
|
||||
received_queries_counter.labels(query_type).inc()
|
||||
received_queries_counter.labels(
|
||||
type=query_type,
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
resp = await self.registry.on_query(query_type, args)
|
||||
return 200, resp
|
||||
|
||||
|
||||
@@ -160,6 +160,7 @@ from synapse.federation.sender.transaction_manager import TransactionManager
|
||||
from synapse.federation.units import Edu
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.metrics import (
|
||||
SERVER_NAME_LABEL,
|
||||
LaterGauge,
|
||||
event_processing_loop_counter,
|
||||
event_processing_loop_room_count,
|
||||
@@ -189,11 +190,13 @@ logger = logging.getLogger(__name__)
|
||||
sent_pdus_destination_dist_count = Counter(
|
||||
"synapse_federation_client_sent_pdu_destinations_count",
|
||||
"Number of PDUs queued for sending to one or more destinations",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
sent_pdus_destination_dist_total = Counter(
|
||||
"synapse_federation_client_sent_pdu_destinations",
|
||||
"Total number of PDUs queued for sending across all destinations",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
# Time (in s) to wait before trying to wake up destinations that have
|
||||
@@ -708,13 +711,19 @@ class FederationSender(AbstractFederationSender):
|
||||
"federation_sender"
|
||||
).set(ts)
|
||||
|
||||
events_processed_counter.inc(len(event_entries))
|
||||
events_processed_counter.labels(
|
||||
**{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc(len(event_entries))
|
||||
|
||||
event_processing_loop_room_count.labels("federation_sender").inc(
|
||||
len(events_by_room)
|
||||
)
|
||||
event_processing_loop_room_count.labels(
|
||||
name="federation_sender",
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc(len(events_by_room))
|
||||
|
||||
event_processing_loop_counter.labels("federation_sender").inc()
|
||||
event_processing_loop_counter.labels(
|
||||
name="federation_sender",
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
|
||||
synapse.metrics.event_processing_positions.labels(
|
||||
"federation_sender"
|
||||
@@ -735,8 +744,12 @@ class FederationSender(AbstractFederationSender):
|
||||
if not destinations:
|
||||
return
|
||||
|
||||
sent_pdus_destination_dist_total.inc(len(destinations))
|
||||
sent_pdus_destination_dist_count.inc()
|
||||
sent_pdus_destination_dist_total.labels(
|
||||
**{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc(len(destinations))
|
||||
sent_pdus_destination_dist_count.labels(
|
||||
**{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc()
|
||||
|
||||
assert pdu.internal_metadata.stream_ordering
|
||||
|
||||
|
||||
@@ -40,7 +40,7 @@ from synapse.federation.units import Edu
|
||||
from synapse.handlers.presence import format_user_presence_state
|
||||
from synapse.logging import issue9533_logger
|
||||
from synapse.logging.opentracing import SynapseTags, set_tag
|
||||
from synapse.metrics import sent_transactions_counter
|
||||
from synapse.metrics import SERVER_NAME_LABEL, sent_transactions_counter
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.types import JsonDict, ReadReceipt
|
||||
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
|
||||
@@ -56,13 +56,15 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
sent_edus_counter = Counter(
|
||||
"synapse_federation_client_sent_edus", "Total number of EDUs successfully sent"
|
||||
"synapse_federation_client_sent_edus",
|
||||
"Total number of EDUs successfully sent",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
sent_edus_by_type = Counter(
|
||||
"synapse_federation_client_sent_edus_by_type",
|
||||
"Number of sent EDUs successfully sent, by event type",
|
||||
["type"],
|
||||
labelnames=["type", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
@@ -368,10 +370,17 @@ class PerDestinationQueue:
|
||||
self._destination, pending_pdus, pending_edus
|
||||
)
|
||||
|
||||
sent_transactions_counter.inc()
|
||||
sent_edus_counter.inc(len(pending_edus))
|
||||
sent_transactions_counter.labels(
|
||||
**{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc()
|
||||
sent_edus_counter.labels(
|
||||
**{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc(len(pending_edus))
|
||||
for edu in pending_edus:
|
||||
sent_edus_by_type.labels(edu.edu_type).inc()
|
||||
sent_edus_by_type.labels(
|
||||
type=edu.edu_type,
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
|
||||
except NotRetryingDestination as e:
|
||||
logger.debug(
|
||||
@@ -596,7 +605,9 @@ class PerDestinationQueue:
|
||||
self._destination, room_catchup_pdus, []
|
||||
)
|
||||
|
||||
sent_transactions_counter.inc()
|
||||
sent_transactions_counter.labels(
|
||||
**{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc()
|
||||
|
||||
# We pulled this from the DB, so it'll be non-null
|
||||
assert pdu.internal_metadata.stream_ordering
|
||||
|
||||
@@ -42,6 +42,7 @@ from synapse.events import EventBase
|
||||
from synapse.handlers.presence import format_user_presence_state
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.metrics import (
|
||||
SERVER_NAME_LABEL,
|
||||
event_processing_loop_counter,
|
||||
event_processing_loop_room_count,
|
||||
)
|
||||
@@ -68,7 +69,9 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
|
||||
events_processed_counter = Counter(
|
||||
"synapse_handlers_appservice_events_processed", "", labelnames=[SERVER_NAME_LABEL]
|
||||
)
|
||||
|
||||
|
||||
class ApplicationServicesHandler:
|
||||
@@ -207,13 +210,19 @@ class ApplicationServicesHandler:
|
||||
"appservice_sender"
|
||||
).set(upper_bound)
|
||||
|
||||
events_processed_counter.inc(len(events))
|
||||
events_processed_counter.labels(
|
||||
**{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc(len(events))
|
||||
|
||||
event_processing_loop_room_count.labels("appservice_sender").inc(
|
||||
len(events_by_room)
|
||||
)
|
||||
event_processing_loop_room_count.labels(
|
||||
name="appservice_sender",
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc(len(events_by_room))
|
||||
|
||||
event_processing_loop_counter.labels("appservice_sender").inc()
|
||||
event_processing_loop_counter.labels(
|
||||
name="appservice_sender",
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
|
||||
if events:
|
||||
now = self.clock.time_msec()
|
||||
|
||||
@@ -70,6 +70,7 @@ from synapse.http import get_request_user_agent
|
||||
from synapse.http.server import finish_request, respond_with_html
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.context import defer_to_thread
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage.databases.main.registration import (
|
||||
LoginTokenExpired,
|
||||
@@ -95,7 +96,7 @@ INVALID_USERNAME_OR_PASSWORD = "Invalid username or password"
|
||||
invalid_login_token_counter = Counter(
|
||||
"synapse_user_login_invalid_login_tokens",
|
||||
"Counts the number of rejected m.login.token on /login",
|
||||
["reason"],
|
||||
labelnames=["reason", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
@@ -1478,11 +1479,20 @@ class AuthHandler:
|
||||
try:
|
||||
return await self.store.consume_login_token(login_token)
|
||||
except LoginTokenExpired:
|
||||
invalid_login_token_counter.labels("expired").inc()
|
||||
invalid_login_token_counter.labels(
|
||||
reason="expired",
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
except LoginTokenReused:
|
||||
invalid_login_token_counter.labels("reused").inc()
|
||||
invalid_login_token_counter.labels(
|
||||
reason="reused",
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
except NotFoundError:
|
||||
invalid_login_token_counter.labels("not found").inc()
|
||||
invalid_login_token_counter.labels(
|
||||
reason="not found",
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
|
||||
raise AuthError(403, "Invalid login token", errcode=Codes.FORBIDDEN)
|
||||
|
||||
|
||||
@@ -1444,6 +1444,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
|
||||
def __init__(self, hs: "HomeServer", device_handler: DeviceWriterHandler):
|
||||
super().__init__(hs)
|
||||
|
||||
self.server_name = hs.hostname
|
||||
self.federation = hs.get_federation_client()
|
||||
self.server_name = hs.hostname # nb must be called this for @measure_func
|
||||
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||
|
||||
@@ -76,6 +76,7 @@ from synapse.logging.opentracing import (
|
||||
tag_args,
|
||||
trace,
|
||||
)
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.replication.http.federation import (
|
||||
ReplicationFederationSendEventsRestServlet,
|
||||
@@ -105,6 +106,7 @@ logger = logging.getLogger(__name__)
|
||||
soft_failed_event_counter = Counter(
|
||||
"synapse_federation_soft_failed_events_total",
|
||||
"Events received over federation that we marked as soft_failed",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
# Added to debug performance and track progress on optimizations
|
||||
@@ -2051,7 +2053,9 @@ class FederationEventHandler:
|
||||
"hs": origin,
|
||||
},
|
||||
)
|
||||
soft_failed_event_counter.inc()
|
||||
soft_failed_event_counter.labels(
|
||||
**{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc()
|
||||
event.internal_metadata.soft_failed = True
|
||||
|
||||
async def _load_or_fetch_auth_events_for_event(
|
||||
|
||||
@@ -105,7 +105,7 @@ from synapse.api.presence import UserDevicePresenceState, UserPresenceState
|
||||
from synapse.appservice import ApplicationService
|
||||
from synapse.events.presence_router import PresenceRouter
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
|
||||
from synapse.metrics.background_process_metrics import (
|
||||
run_as_background_process,
|
||||
wrap_as_background_process,
|
||||
@@ -137,24 +137,40 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "")
|
||||
notified_presence_counter = Counter(
|
||||
"synapse_handler_presence_notified_presence", "", labelnames=[SERVER_NAME_LABEL]
|
||||
)
|
||||
federation_presence_out_counter = Counter(
|
||||
"synapse_handler_presence_federation_presence_out", ""
|
||||
"synapse_handler_presence_federation_presence_out",
|
||||
"",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
presence_updates_counter = Counter(
|
||||
"synapse_handler_presence_presence_updates", "", labelnames=[SERVER_NAME_LABEL]
|
||||
)
|
||||
timers_fired_counter = Counter(
|
||||
"synapse_handler_presence_timers_fired", "", labelnames=[SERVER_NAME_LABEL]
|
||||
)
|
||||
presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "")
|
||||
timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "")
|
||||
federation_presence_counter = Counter(
|
||||
"synapse_handler_presence_federation_presence", ""
|
||||
"synapse_handler_presence_federation_presence", "", labelnames=[SERVER_NAME_LABEL]
|
||||
)
|
||||
bump_active_time_counter = Counter(
|
||||
"synapse_handler_presence_bump_active_time", "", labelnames=[SERVER_NAME_LABEL]
|
||||
)
|
||||
bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "")
|
||||
|
||||
get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"])
|
||||
get_updates_counter = Counter(
|
||||
"synapse_handler_presence_get_updates", "", labelnames=["type", SERVER_NAME_LABEL]
|
||||
)
|
||||
|
||||
notify_reason_counter = Counter(
|
||||
"synapse_handler_presence_notify_reason", "", ["locality", "reason"]
|
||||
"synapse_handler_presence_notify_reason",
|
||||
"",
|
||||
labelnames=["locality", "reason", SERVER_NAME_LABEL],
|
||||
)
|
||||
state_transition_counter = Counter(
|
||||
"synapse_handler_presence_state_transition", "", ["locality", "from", "to"]
|
||||
"synapse_handler_presence_state_transition",
|
||||
"",
|
||||
labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
|
||||
@@ -668,7 +684,9 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||
old_state = self.user_to_current_state.get(new_state.user_id)
|
||||
self.user_to_current_state[new_state.user_id] = new_state
|
||||
is_mine = self.is_mine_id(new_state.user_id)
|
||||
if not old_state or should_notify(old_state, new_state, is_mine):
|
||||
if not old_state or should_notify(
|
||||
old_state, new_state, is_mine, self.server_name
|
||||
):
|
||||
state_to_notify.append(new_state)
|
||||
|
||||
stream_id = token
|
||||
@@ -977,6 +995,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
prev_state,
|
||||
new_state,
|
||||
is_mine=self.is_mine_id(user_id),
|
||||
our_server_name=self.server_name,
|
||||
wheel_timer=self.wheel_timer,
|
||||
now=now,
|
||||
# When overriding disabled presence, don't kick off all the
|
||||
@@ -996,10 +1015,14 @@ class PresenceHandler(BasePresenceHandler):
|
||||
|
||||
# TODO: We should probably ensure there are no races hereafter
|
||||
|
||||
presence_updates_counter.inc(len(new_states))
|
||||
presence_updates_counter.labels(
|
||||
**{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc(len(new_states))
|
||||
|
||||
if to_notify:
|
||||
notified_presence_counter.inc(len(to_notify))
|
||||
notified_presence_counter.labels(
|
||||
**{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc(len(to_notify))
|
||||
await self._persist_and_notify(list(to_notify.values()))
|
||||
|
||||
self.unpersisted_users_changes |= {s.user_id for s in new_states}
|
||||
@@ -1018,7 +1041,9 @@ class PresenceHandler(BasePresenceHandler):
|
||||
if user_id not in to_notify
|
||||
}
|
||||
if to_federation_ping:
|
||||
federation_presence_out_counter.inc(len(to_federation_ping))
|
||||
federation_presence_out_counter.labels(
|
||||
**{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc(len(to_federation_ping))
|
||||
|
||||
hosts_to_states = await get_interested_remotes(
|
||||
self.store,
|
||||
@@ -1068,7 +1093,9 @@ class PresenceHandler(BasePresenceHandler):
|
||||
for user_id in users_to_check
|
||||
]
|
||||
|
||||
timers_fired_counter.inc(len(states))
|
||||
timers_fired_counter.labels(**{SERVER_NAME_LABEL: self.server_name}).inc(
|
||||
len(states)
|
||||
)
|
||||
|
||||
# Set of user ID & device IDs which are currently syncing.
|
||||
syncing_user_devices = {
|
||||
@@ -1102,7 +1129,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
|
||||
user_id = user.to_string()
|
||||
|
||||
bump_active_time_counter.inc()
|
||||
bump_active_time_counter.labels(**{SERVER_NAME_LABEL: self.server_name}).inc()
|
||||
|
||||
now = self.clock.time_msec()
|
||||
|
||||
@@ -1354,7 +1381,9 @@ class PresenceHandler(BasePresenceHandler):
|
||||
updates.append(prev_state.copy_and_replace(**new_fields))
|
||||
|
||||
if updates:
|
||||
federation_presence_counter.inc(len(updates))
|
||||
federation_presence_counter.labels(
|
||||
**{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc(len(updates))
|
||||
await self._update_states(updates)
|
||||
|
||||
async def set_state(
|
||||
@@ -1667,7 +1696,10 @@ class PresenceHandler(BasePresenceHandler):
|
||||
|
||||
|
||||
def should_notify(
|
||||
old_state: UserPresenceState, new_state: UserPresenceState, is_mine: bool
|
||||
old_state: UserPresenceState,
|
||||
new_state: UserPresenceState,
|
||||
is_mine: bool,
|
||||
our_server_name: str,
|
||||
) -> bool:
|
||||
"""Decides if a presence state change should be sent to interested parties."""
|
||||
user_location = "remote"
|
||||
@@ -1678,19 +1710,38 @@ def should_notify(
|
||||
return False
|
||||
|
||||
if old_state.status_msg != new_state.status_msg:
|
||||
notify_reason_counter.labels(user_location, "status_msg_change").inc()
|
||||
notify_reason_counter.labels(
|
||||
locality=user_location,
|
||||
reason="status_msg_change",
|
||||
**{SERVER_NAME_LABEL: our_server_name},
|
||||
).inc()
|
||||
return True
|
||||
|
||||
if old_state.state != new_state.state:
|
||||
notify_reason_counter.labels(user_location, "state_change").inc()
|
||||
notify_reason_counter.labels(
|
||||
locality=user_location,
|
||||
reason="state_change",
|
||||
**{SERVER_NAME_LABEL: our_server_name},
|
||||
).inc()
|
||||
state_transition_counter.labels(
|
||||
user_location, old_state.state, new_state.state
|
||||
**{
|
||||
"locality": user_location,
|
||||
# `from` is a reserved word in Python so we have to label it this way if
|
||||
# we want to use keyword args.
|
||||
"from": old_state.state,
|
||||
"to": new_state.state,
|
||||
SERVER_NAME_LABEL: our_server_name,
|
||||
},
|
||||
).inc()
|
||||
return True
|
||||
|
||||
if old_state.state == PresenceState.ONLINE:
|
||||
if new_state.currently_active != old_state.currently_active:
|
||||
notify_reason_counter.labels(user_location, "current_active_change").inc()
|
||||
notify_reason_counter.labels(
|
||||
locality=user_location,
|
||||
reason="current_active_change",
|
||||
**{SERVER_NAME_LABEL: our_server_name},
|
||||
).inc()
|
||||
return True
|
||||
|
||||
if (
|
||||
@@ -1700,14 +1751,18 @@ def should_notify(
|
||||
# Only notify about last active bumps if we're not currently active
|
||||
if not new_state.currently_active:
|
||||
notify_reason_counter.labels(
|
||||
user_location, "last_active_change_online"
|
||||
locality=user_location,
|
||||
reason="last_active_change_online",
|
||||
**{SERVER_NAME_LABEL: our_server_name},
|
||||
).inc()
|
||||
return True
|
||||
|
||||
elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
|
||||
# Always notify for a transition where last active gets bumped.
|
||||
notify_reason_counter.labels(
|
||||
user_location, "last_active_change_not_online"
|
||||
locality=user_location,
|
||||
reason="last_active_change_not_online",
|
||||
**{SERVER_NAME_LABEL: our_server_name},
|
||||
).inc()
|
||||
return True
|
||||
|
||||
@@ -1774,6 +1829,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
|
||||
self.server_name = hs.hostname
|
||||
self.get_presence_handler = hs.get_presence_handler
|
||||
self.get_presence_router = hs.get_presence_router
|
||||
self.server_name = hs.hostname
|
||||
self.clock = hs.get_clock()
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
@@ -1885,7 +1941,10 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
|
||||
|
||||
# If we have the full list of changes for presence we can
|
||||
# simply check which ones share a room with the user.
|
||||
get_updates_counter.labels("stream").inc()
|
||||
get_updates_counter.labels(
|
||||
type="stream",
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
|
||||
sharing_users = await self.store.do_users_share_a_room(
|
||||
user_id, updated_users
|
||||
@@ -1898,7 +1957,10 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
|
||||
else:
|
||||
# Too many possible updates. Find all users we can see and check
|
||||
# if any of them have changed.
|
||||
get_updates_counter.labels("full").inc()
|
||||
get_updates_counter.labels(
|
||||
type="full",
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
|
||||
users_interested_in = (
|
||||
await self.store.get_users_who_share_room_with_user(user_id)
|
||||
@@ -2148,6 +2210,7 @@ def handle_update(
|
||||
prev_state: UserPresenceState,
|
||||
new_state: UserPresenceState,
|
||||
is_mine: bool,
|
||||
our_server_name: str,
|
||||
wheel_timer: WheelTimer,
|
||||
now: int,
|
||||
persist: bool,
|
||||
@@ -2160,6 +2223,7 @@ def handle_update(
|
||||
prev_state
|
||||
new_state
|
||||
is_mine: Whether the user is ours
|
||||
our_server_name: The homeserver name of the our server (`hs.hostname`)
|
||||
wheel_timer
|
||||
now: Time now in ms
|
||||
persist: True if this state should persist until another update occurs.
|
||||
@@ -2228,7 +2292,7 @@ def handle_update(
|
||||
)
|
||||
|
||||
# Check whether the change was something worth notifying about
|
||||
if should_notify(prev_state, new_state, is_mine):
|
||||
if should_notify(prev_state, new_state, is_mine, our_server_name):
|
||||
new_state = new_state.copy_and_replace(last_federation_update_ts=now)
|
||||
persist_and_notify = True
|
||||
|
||||
|
||||
@@ -45,6 +45,7 @@ from synapse.api.errors import (
|
||||
from synapse.appservice import ApplicationService
|
||||
from synapse.config.server import is_threepid_reserved
|
||||
from synapse.http.servlet import assert_params_in_dict
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.replication.http.login import RegisterDeviceReplicationServlet
|
||||
from synapse.replication.http.register import (
|
||||
ReplicationPostRegisterActionsServlet,
|
||||
@@ -62,29 +63,38 @@ logger = logging.getLogger(__name__)
|
||||
registration_counter = Counter(
|
||||
"synapse_user_registrations_total",
|
||||
"Number of new users registered (since restart)",
|
||||
["guest", "shadow_banned", "auth_provider"],
|
||||
labelnames=["guest", "shadow_banned", "auth_provider", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
login_counter = Counter(
|
||||
"synapse_user_logins_total",
|
||||
"Number of user logins (since restart)",
|
||||
["guest", "auth_provider"],
|
||||
labelnames=["guest", "auth_provider", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
def init_counters_for_auth_provider(auth_provider_id: str) -> None:
|
||||
def init_counters_for_auth_provider(auth_provider_id: str, server_name: str) -> None:
|
||||
"""Ensure the prometheus counters for the given auth provider are initialised
|
||||
|
||||
This fixes a problem where the counters are not reported for a given auth provider
|
||||
until the user first logs in/registers.
|
||||
|
||||
Args:
|
||||
auth_provider_id: The ID of the auth provider to initialise counters for.
|
||||
server_name: Our server name (used to label metrics) (this should be `hs.hostname`).
|
||||
"""
|
||||
for is_guest in (True, False):
|
||||
login_counter.labels(guest=is_guest, auth_provider=auth_provider_id)
|
||||
login_counter.labels(
|
||||
guest=is_guest,
|
||||
auth_provider=auth_provider_id,
|
||||
**{SERVER_NAME_LABEL: server_name},
|
||||
)
|
||||
for shadow_banned in (True, False):
|
||||
registration_counter.labels(
|
||||
guest=is_guest,
|
||||
shadow_banned=shadow_banned,
|
||||
auth_provider=auth_provider_id,
|
||||
**{SERVER_NAME_LABEL: server_name},
|
||||
)
|
||||
|
||||
|
||||
@@ -97,6 +107,7 @@ class LoginDict(TypedDict):
|
||||
|
||||
class RegistrationHandler:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.server_name = hs.hostname
|
||||
self.store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self.clock = hs.get_clock()
|
||||
@@ -112,7 +123,6 @@ class RegistrationHandler:
|
||||
self._account_validity_handler = hs.get_account_validity_handler()
|
||||
self._user_consent_version = self.hs.config.consent.user_consent_version
|
||||
self._server_notices_mxid = hs.config.servernotices.server_notices_mxid
|
||||
self._server_name = hs.hostname
|
||||
self._user_types_config = hs.config.user_types
|
||||
|
||||
self._spam_checker_module_callbacks = hs.get_module_api_callbacks().spam_checker
|
||||
@@ -138,7 +148,9 @@ class RegistrationHandler:
|
||||
)
|
||||
self.refresh_token_lifetime = hs.config.registration.refresh_token_lifetime
|
||||
|
||||
init_counters_for_auth_provider("")
|
||||
init_counters_for_auth_provider(
|
||||
auth_provider_id="", server_name=self.server_name
|
||||
)
|
||||
|
||||
async def check_username(
|
||||
self,
|
||||
@@ -362,6 +374,7 @@ class RegistrationHandler:
|
||||
guest=make_guest,
|
||||
shadow_banned=shadow_banned,
|
||||
auth_provider=(auth_provider_id or ""),
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
|
||||
# If the user does not need to consent at registration, auto-join any
|
||||
@@ -422,7 +435,7 @@ class RegistrationHandler:
|
||||
if self.hs.config.registration.auto_join_user_id:
|
||||
fake_requester = create_requester(
|
||||
self.hs.config.registration.auto_join_user_id,
|
||||
authenticated_entity=self._server_name,
|
||||
authenticated_entity=self.server_name,
|
||||
)
|
||||
|
||||
# If the room requires an invite, add the user to the list of invites.
|
||||
@@ -435,7 +448,7 @@ class RegistrationHandler:
|
||||
requires_join = True
|
||||
else:
|
||||
fake_requester = create_requester(
|
||||
user_id, authenticated_entity=self._server_name
|
||||
user_id, authenticated_entity=self.server_name
|
||||
)
|
||||
|
||||
# Choose whether to federate the new room.
|
||||
@@ -467,7 +480,7 @@ class RegistrationHandler:
|
||||
|
||||
await room_member_handler.update_membership(
|
||||
requester=create_requester(
|
||||
user_id, authenticated_entity=self._server_name
|
||||
user_id, authenticated_entity=self.server_name
|
||||
),
|
||||
target=UserID.from_string(user_id),
|
||||
room_id=room_id,
|
||||
@@ -493,7 +506,7 @@ class RegistrationHandler:
|
||||
if requires_join:
|
||||
await room_member_handler.update_membership(
|
||||
requester=create_requester(
|
||||
user_id, authenticated_entity=self._server_name
|
||||
user_id, authenticated_entity=self.server_name
|
||||
),
|
||||
target=UserID.from_string(user_id),
|
||||
room_id=room_id,
|
||||
@@ -539,7 +552,7 @@ class RegistrationHandler:
|
||||
# we don't have a local user in the room to craft up an invite with.
|
||||
requires_invite = await self.store.is_host_joined(
|
||||
room_id,
|
||||
self._server_name,
|
||||
self.server_name,
|
||||
)
|
||||
|
||||
if requires_invite:
|
||||
@@ -567,7 +580,7 @@ class RegistrationHandler:
|
||||
await room_member_handler.update_membership(
|
||||
requester=create_requester(
|
||||
self.hs.config.registration.auto_join_user_id,
|
||||
authenticated_entity=self._server_name,
|
||||
authenticated_entity=self.server_name,
|
||||
),
|
||||
target=UserID.from_string(user_id),
|
||||
room_id=room_id,
|
||||
@@ -579,7 +592,7 @@ class RegistrationHandler:
|
||||
# Send the join.
|
||||
await room_member_handler.update_membership(
|
||||
requester=create_requester(
|
||||
user_id, authenticated_entity=self._server_name
|
||||
user_id, authenticated_entity=self.server_name
|
||||
),
|
||||
target=UserID.from_string(user_id),
|
||||
room_id=room_id,
|
||||
@@ -790,6 +803,7 @@ class RegistrationHandler:
|
||||
login_counter.labels(
|
||||
guest=is_guest,
|
||||
auth_provider=(auth_provider_id or ""),
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
|
||||
return (
|
||||
|
||||
@@ -202,7 +202,7 @@ class SsoHandler:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._clock = hs.get_clock()
|
||||
self._store = hs.get_datastores().main
|
||||
self._server_name = hs.hostname
|
||||
self.server_name = hs.hostname
|
||||
self._is_mine_server_name = hs.is_mine_server_name
|
||||
self._registration_handler = hs.get_registration_handler()
|
||||
self._auth_handler = hs.get_auth_handler()
|
||||
@@ -238,7 +238,9 @@ class SsoHandler:
|
||||
p_id = p.idp_id
|
||||
assert p_id not in self._identity_providers
|
||||
self._identity_providers[p_id] = p
|
||||
init_counters_for_auth_provider(p_id)
|
||||
init_counters_for_auth_provider(
|
||||
auth_provider_id=p_id, server_name=self.server_name
|
||||
)
|
||||
|
||||
def get_identity_providers(self) -> Mapping[str, SsoIdentityProvider]:
|
||||
"""Get the configured identity providers"""
|
||||
@@ -569,7 +571,7 @@ class SsoHandler:
|
||||
return attributes
|
||||
|
||||
# Check if this mxid already exists
|
||||
user_id = UserID(attributes.localpart, self._server_name).to_string()
|
||||
user_id = UserID(attributes.localpart, self.server_name).to_string()
|
||||
if not await self._store.get_users_by_id_case_insensitive(user_id):
|
||||
# This mxid is free
|
||||
break
|
||||
@@ -907,7 +909,7 @@ class SsoHandler:
|
||||
|
||||
# render an error page.
|
||||
html = self._bad_user_template.render(
|
||||
server_name=self._server_name,
|
||||
server_name=self.server_name,
|
||||
user_id_to_verify=user_id_to_verify,
|
||||
)
|
||||
respond_with_html(request, 200, html)
|
||||
@@ -959,7 +961,7 @@ class SsoHandler:
|
||||
|
||||
if contains_invalid_mxid_characters(localpart):
|
||||
raise SynapseError(400, "localpart is invalid: %s" % (localpart,))
|
||||
user_id = UserID(localpart, self._server_name).to_string()
|
||||
user_id = UserID(localpart, self.server_name).to_string()
|
||||
user_infos = await self._store.get_users_by_id_case_insensitive(user_id)
|
||||
|
||||
logger.info("[session %s] users: %s", session_id, user_infos)
|
||||
|
||||
@@ -63,6 +63,7 @@ from synapse.logging.opentracing import (
|
||||
start_active_span,
|
||||
trace,
|
||||
)
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
|
||||
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
|
||||
from synapse.storage.databases.main.stream import PaginateFunction
|
||||
@@ -104,7 +105,7 @@ non_empty_sync_counter = Counter(
|
||||
"Count of non empty sync responses. type is initial_sync/full_state_sync"
|
||||
"/incremental_sync. lazy_loaded indicates if lazy loaded members were "
|
||||
"enabled for that request.",
|
||||
["type", "lazy_loaded"],
|
||||
labelnames=["type", "lazy_loaded", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
# Store the cache that tracks which lazy-loaded members have been sent to a given
|
||||
@@ -614,7 +615,11 @@ class SyncHandler:
|
||||
lazy_loaded = "true"
|
||||
else:
|
||||
lazy_loaded = "false"
|
||||
non_empty_sync_counter.labels(sync_label, lazy_loaded).inc()
|
||||
non_empty_sync_counter.labels(
|
||||
type=sync_label,
|
||||
lazy_loaded=lazy_loaded,
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@@ -85,6 +85,7 @@ from synapse.http.replicationagent import ReplicationAgent
|
||||
from synapse.http.types import QueryParams
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.logging.opentracing import set_tag, start_active_span, tags
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.types import ISynapseReactor, StrSequence
|
||||
from synapse.util import json_decoder
|
||||
from synapse.util.async_helpers import timeout_deferred
|
||||
@@ -108,9 +109,13 @@ except ImportError:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"])
|
||||
outgoing_requests_counter = Counter(
|
||||
"synapse_http_client_requests", "", labelnames=["method", SERVER_NAME_LABEL]
|
||||
)
|
||||
incoming_responses_counter = Counter(
|
||||
"synapse_http_client_responses", "", ["method", "code"]
|
||||
"synapse_http_client_responses",
|
||||
"",
|
||||
labelnames=["method", "code", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
# the type of the headers map, to be passed to the t.w.h.Headers.
|
||||
@@ -346,6 +351,7 @@ class BaseHttpClient:
|
||||
treq_args: Optional[Dict[str, Any]] = None,
|
||||
):
|
||||
self.hs = hs
|
||||
self.server_name = hs.hostname
|
||||
self.reactor = hs.get_reactor()
|
||||
|
||||
self._extra_treq_args = treq_args or {}
|
||||
@@ -384,7 +390,9 @@ class BaseHttpClient:
|
||||
RequestTimedOutError if the request times out before the headers are read
|
||||
|
||||
"""
|
||||
outgoing_requests_counter.labels(method).inc()
|
||||
outgoing_requests_counter.labels(
|
||||
method=method, **{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc()
|
||||
|
||||
# log request but strip `access_token` (AS requests for example include this)
|
||||
logger.debug("Sending request %s %s", method, redact_uri(uri))
|
||||
@@ -438,7 +446,11 @@ class BaseHttpClient:
|
||||
|
||||
response = await make_deferred_yieldable(request_deferred)
|
||||
|
||||
incoming_responses_counter.labels(method, response.code).inc()
|
||||
incoming_responses_counter.labels(
|
||||
method=method,
|
||||
code=response.code,
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
logger.info(
|
||||
"Received response to %s %s: %s",
|
||||
method,
|
||||
@@ -447,7 +459,11 @@ class BaseHttpClient:
|
||||
)
|
||||
return response
|
||||
except Exception as e:
|
||||
incoming_responses_counter.labels(method, "ERR").inc()
|
||||
incoming_responses_counter.labels(
|
||||
method=method,
|
||||
code="ERR",
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
logger.info(
|
||||
"Error sending request to %s %s: %s %s",
|
||||
method,
|
||||
@@ -855,6 +871,7 @@ class ReplicationClient(BaseHttpClient):
|
||||
hs: The HomeServer instance to pass in
|
||||
"""
|
||||
super().__init__(hs)
|
||||
self.server_name = hs.hostname
|
||||
|
||||
# Use a pool, but a very small one.
|
||||
pool = HTTPConnectionPool(self.reactor)
|
||||
@@ -891,7 +908,9 @@ class ReplicationClient(BaseHttpClient):
|
||||
RequestTimedOutError if the request times out before the headers are read
|
||||
|
||||
"""
|
||||
outgoing_requests_counter.labels(method).inc()
|
||||
outgoing_requests_counter.labels(
|
||||
method=method, **{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc()
|
||||
|
||||
logger.debug("Sending request %s %s", method, uri)
|
||||
|
||||
@@ -948,7 +967,11 @@ class ReplicationClient(BaseHttpClient):
|
||||
|
||||
response = await make_deferred_yieldable(request_deferred)
|
||||
|
||||
incoming_responses_counter.labels(method, response.code).inc()
|
||||
incoming_responses_counter.labels(
|
||||
method=method,
|
||||
code=response.code,
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
logger.info(
|
||||
"Received response to %s %s: %s",
|
||||
method,
|
||||
@@ -957,7 +980,11 @@ class ReplicationClient(BaseHttpClient):
|
||||
)
|
||||
return response
|
||||
except Exception as e:
|
||||
incoming_responses_counter.labels(method, "ERR").inc()
|
||||
incoming_responses_counter.labels(
|
||||
method=method,
|
||||
code="ERR",
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
logger.info(
|
||||
"Error sending request to %s %s: %s %s",
|
||||
method,
|
||||
|
||||
@@ -87,6 +87,7 @@ from synapse.http.types import QueryParams
|
||||
from synapse.logging import opentracing
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.logging.opentracing import set_tag, start_active_span, tags
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import json_decoder
|
||||
from synapse.util.async_helpers import AwakenableSleeper, Linearizer, timeout_deferred
|
||||
@@ -99,10 +100,14 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
outgoing_requests_counter = Counter(
|
||||
"synapse_http_matrixfederationclient_requests", "", ["method"]
|
||||
"synapse_http_matrixfederationclient_requests",
|
||||
"",
|
||||
labelnames=["method", SERVER_NAME_LABEL],
|
||||
)
|
||||
incoming_responses_counter = Counter(
|
||||
"synapse_http_matrixfederationclient_responses", "", ["method", "code"]
|
||||
"synapse_http_matrixfederationclient_responses",
|
||||
"",
|
||||
labelnames=["method", "code", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
@@ -697,7 +702,9 @@ class MatrixFederationHttpClient:
|
||||
_sec_timeout,
|
||||
)
|
||||
|
||||
outgoing_requests_counter.labels(request.method).inc()
|
||||
outgoing_requests_counter.labels(
|
||||
method=request.method, **{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc()
|
||||
|
||||
try:
|
||||
with Measure(
|
||||
@@ -736,7 +743,9 @@ class MatrixFederationHttpClient:
|
||||
raise RequestSendFailed(e, can_retry=True) from e
|
||||
|
||||
incoming_responses_counter.labels(
|
||||
request.method, response.code
|
||||
method=request.method,
|
||||
code=response.code,
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
|
||||
set_tag(tags.HTTP_STATUS_CODE, response.code)
|
||||
|
||||
@@ -27,40 +27,52 @@ from typing import Dict, Mapping, Set, Tuple
|
||||
from prometheus_client.core import Counter, Histogram
|
||||
|
||||
from synapse.logging.context import current_context
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# total number of responses served, split by method/servlet/tag
|
||||
response_count = Counter(
|
||||
"synapse_http_server_response_count", "", ["method", "servlet", "tag"]
|
||||
"synapse_http_server_response_count",
|
||||
"",
|
||||
labelnames=["method", "servlet", "tag", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
requests_counter = Counter(
|
||||
"synapse_http_server_requests_received", "", ["method", "servlet"]
|
||||
"synapse_http_server_requests_received",
|
||||
"",
|
||||
labelnames=["method", "servlet", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
outgoing_responses_counter = Counter(
|
||||
"synapse_http_server_responses", "", ["method", "code"]
|
||||
"synapse_http_server_responses",
|
||||
"",
|
||||
labelnames=["method", "code", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
response_timer = Histogram(
|
||||
"synapse_http_server_response_time_seconds",
|
||||
"sec",
|
||||
["method", "servlet", "tag", "code"],
|
||||
labelnames=["method", "servlet", "tag", "code", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
response_ru_utime = Counter(
|
||||
"synapse_http_server_response_ru_utime_seconds", "sec", ["method", "servlet", "tag"]
|
||||
"synapse_http_server_response_ru_utime_seconds",
|
||||
"sec",
|
||||
labelnames=["method", "servlet", "tag", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
response_ru_stime = Counter(
|
||||
"synapse_http_server_response_ru_stime_seconds", "sec", ["method", "servlet", "tag"]
|
||||
"synapse_http_server_response_ru_stime_seconds",
|
||||
"sec",
|
||||
labelnames=["method", "servlet", "tag", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
response_db_txn_count = Counter(
|
||||
"synapse_http_server_response_db_txn_count", "", ["method", "servlet", "tag"]
|
||||
"synapse_http_server_response_db_txn_count",
|
||||
"",
|
||||
labelnames=["method", "servlet", "tag", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
# seconds spent waiting for db txns, excluding scheduling time, when processing
|
||||
@@ -68,34 +80,42 @@ response_db_txn_count = Counter(
|
||||
response_db_txn_duration = Counter(
|
||||
"synapse_http_server_response_db_txn_duration_seconds",
|
||||
"",
|
||||
["method", "servlet", "tag"],
|
||||
labelnames=["method", "servlet", "tag", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
# seconds spent waiting for a db connection, when processing this request
|
||||
response_db_sched_duration = Counter(
|
||||
"synapse_http_server_response_db_sched_duration_seconds",
|
||||
"",
|
||||
["method", "servlet", "tag"],
|
||||
labelnames=["method", "servlet", "tag", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
# size in bytes of the response written
|
||||
response_size = Counter(
|
||||
"synapse_http_server_response_size", "", ["method", "servlet", "tag"]
|
||||
"synapse_http_server_response_size",
|
||||
"",
|
||||
labelnames=["method", "servlet", "tag", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
# In flight metrics are incremented while the requests are in flight, rather
|
||||
# than when the response was written.
|
||||
|
||||
in_flight_requests_ru_utime = Counter(
|
||||
"synapse_http_server_in_flight_requests_ru_utime_seconds", "", ["method", "servlet"]
|
||||
"synapse_http_server_in_flight_requests_ru_utime_seconds",
|
||||
"",
|
||||
labelnames=["method", "servlet", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
in_flight_requests_ru_stime = Counter(
|
||||
"synapse_http_server_in_flight_requests_ru_stime_seconds", "", ["method", "servlet"]
|
||||
"synapse_http_server_in_flight_requests_ru_stime_seconds",
|
||||
"",
|
||||
labelnames=["method", "servlet", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
in_flight_requests_db_txn_count = Counter(
|
||||
"synapse_http_server_in_flight_requests_db_txn_count", "", ["method", "servlet"]
|
||||
"synapse_http_server_in_flight_requests_db_txn_count",
|
||||
"",
|
||||
labelnames=["method", "servlet", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
# seconds spent waiting for db txns, excluding scheduling time, when processing
|
||||
@@ -103,14 +123,14 @@ in_flight_requests_db_txn_count = Counter(
|
||||
in_flight_requests_db_txn_duration = Counter(
|
||||
"synapse_http_server_in_flight_requests_db_txn_duration_seconds",
|
||||
"",
|
||||
["method", "servlet"],
|
||||
labelnames=["method", "servlet", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
# seconds spent waiting for a db connection, when processing this request
|
||||
in_flight_requests_db_sched_duration = Counter(
|
||||
"synapse_http_server_in_flight_requests_db_sched_duration_seconds",
|
||||
"",
|
||||
["method", "servlet"],
|
||||
labelnames=["method", "servlet", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
_in_flight_requests: Set["RequestMetrics"] = set()
|
||||
@@ -149,6 +169,13 @@ LaterGauge(
|
||||
|
||||
|
||||
class RequestMetrics:
|
||||
def __init__(self, our_server_name: str) -> None:
|
||||
"""
|
||||
Args:
|
||||
our_server_name: Our homeserver name (used to label metrics) (`hs.hostname`)
|
||||
"""
|
||||
self.our_server_name = our_server_name
|
||||
|
||||
def start(self, time_sec: float, name: str, method: str) -> None:
|
||||
self.start_ts = time_sec
|
||||
self.start_context = current_context()
|
||||
@@ -194,33 +221,39 @@ class RequestMetrics:
|
||||
|
||||
response_code_str = str(response_code)
|
||||
|
||||
outgoing_responses_counter.labels(self.method, response_code_str).inc()
|
||||
outgoing_responses_counter.labels(
|
||||
method=self.method,
|
||||
code=response_code_str,
|
||||
**{SERVER_NAME_LABEL: self.our_server_name},
|
||||
).inc()
|
||||
|
||||
response_count.labels(self.method, self.name, tag).inc()
|
||||
response_base_labels = {
|
||||
"method": self.method,
|
||||
"servlet": self.name,
|
||||
"tag": tag,
|
||||
SERVER_NAME_LABEL: self.our_server_name,
|
||||
}
|
||||
|
||||
response_timer.labels(self.method, self.name, tag, response_code_str).observe(
|
||||
response_count.labels(**response_base_labels).inc()
|
||||
|
||||
response_timer.labels(code=response_code_str, **response_base_labels).observe(
|
||||
time_sec - self.start_ts
|
||||
)
|
||||
|
||||
resource_usage = context.get_resource_usage()
|
||||
|
||||
response_ru_utime.labels(self.method, self.name, tag).inc(
|
||||
resource_usage.ru_utime
|
||||
)
|
||||
response_ru_stime.labels(self.method, self.name, tag).inc(
|
||||
resource_usage.ru_stime
|
||||
)
|
||||
response_db_txn_count.labels(self.method, self.name, tag).inc(
|
||||
response_ru_utime.labels(**response_base_labels).inc(resource_usage.ru_utime)
|
||||
response_ru_stime.labels(**response_base_labels).inc(resource_usage.ru_stime)
|
||||
response_db_txn_count.labels(**response_base_labels).inc(
|
||||
resource_usage.db_txn_count
|
||||
)
|
||||
response_db_txn_duration.labels(self.method, self.name, tag).inc(
|
||||
response_db_txn_duration.labels(**response_base_labels).inc(
|
||||
resource_usage.db_txn_duration_sec
|
||||
)
|
||||
response_db_sched_duration.labels(self.method, self.name, tag).inc(
|
||||
response_db_sched_duration.labels(**response_base_labels).inc(
|
||||
resource_usage.db_sched_duration_sec
|
||||
)
|
||||
|
||||
response_size.labels(self.method, self.name, tag).inc(sent_bytes)
|
||||
response_size.labels(**response_base_labels).inc(sent_bytes)
|
||||
|
||||
# We always call this at the end to ensure that we update the metrics
|
||||
# regardless of whether a call to /metrics while the request was in
|
||||
@@ -240,24 +273,30 @@ class RequestMetrics:
|
||||
diff = new_stats - self._request_stats
|
||||
self._request_stats = new_stats
|
||||
|
||||
in_flight_labels = {
|
||||
"method": self.method,
|
||||
"servlet": self.name,
|
||||
SERVER_NAME_LABEL: self.our_server_name,
|
||||
}
|
||||
|
||||
# max() is used since rapid use of ru_stime/ru_utime can end up with the
|
||||
# count going backwards due to NTP, time smearing, fine-grained
|
||||
# correction, or floating points. Who knows, really?
|
||||
in_flight_requests_ru_utime.labels(self.method, self.name).inc(
|
||||
in_flight_requests_ru_utime.labels(**in_flight_labels).inc(
|
||||
max(diff.ru_utime, 0)
|
||||
)
|
||||
in_flight_requests_ru_stime.labels(self.method, self.name).inc(
|
||||
in_flight_requests_ru_stime.labels(**in_flight_labels).inc(
|
||||
max(diff.ru_stime, 0)
|
||||
)
|
||||
|
||||
in_flight_requests_db_txn_count.labels(self.method, self.name).inc(
|
||||
in_flight_requests_db_txn_count.labels(**in_flight_labels).inc(
|
||||
diff.db_txn_count
|
||||
)
|
||||
|
||||
in_flight_requests_db_txn_duration.labels(self.method, self.name).inc(
|
||||
in_flight_requests_db_txn_duration.labels(**in_flight_labels).inc(
|
||||
diff.db_txn_duration_sec
|
||||
)
|
||||
|
||||
in_flight_requests_db_sched_duration.labels(self.method, self.name).inc(
|
||||
in_flight_requests_db_sched_duration.labels(**in_flight_labels).inc(
|
||||
diff.db_sched_duration_sec
|
||||
)
|
||||
|
||||
@@ -44,6 +44,7 @@ from synapse.logging.context import (
|
||||
LoggingContext,
|
||||
PreserveLoggingContext,
|
||||
)
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.types import ISynapseReactor, Requester
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -83,12 +84,14 @@ class SynapseRequest(Request):
|
||||
self,
|
||||
channel: HTTPChannel,
|
||||
site: "SynapseSite",
|
||||
our_server_name: str,
|
||||
*args: Any,
|
||||
max_request_body_size: int = 1024,
|
||||
request_id_header: Optional[str] = None,
|
||||
**kw: Any,
|
||||
):
|
||||
super().__init__(channel, *args, **kw)
|
||||
self.our_server_name = our_server_name
|
||||
self._max_request_body_size = max_request_body_size
|
||||
self.request_id_header = request_id_header
|
||||
self.synapse_site = site
|
||||
@@ -334,7 +337,11 @@ class SynapseRequest(Request):
|
||||
# dispatching to the handler, so that the handler
|
||||
# can update the servlet name in the request
|
||||
# metrics
|
||||
requests_counter.labels(self.get_method(), self.request_metrics.name).inc()
|
||||
requests_counter.labels(
|
||||
method=self.get_method(),
|
||||
servlet=self.request_metrics.name,
|
||||
**{SERVER_NAME_LABEL: self.our_server_name},
|
||||
).inc()
|
||||
|
||||
@contextlib.contextmanager
|
||||
def processing(self) -> Generator[None, None, None]:
|
||||
@@ -455,7 +462,7 @@ class SynapseRequest(Request):
|
||||
self.request_metrics.name.
|
||||
"""
|
||||
self.start_time = time.time()
|
||||
self.request_metrics = RequestMetrics()
|
||||
self.request_metrics = RequestMetrics(our_server_name=self.our_server_name)
|
||||
self.request_metrics.start(
|
||||
self.start_time, name=servlet_name, method=self.get_method()
|
||||
)
|
||||
@@ -694,6 +701,7 @@ class SynapseSite(ProxySite):
|
||||
|
||||
self.site_tag = site_tag
|
||||
self.reactor: ISynapseReactor = reactor
|
||||
self.server_name = hs.hostname
|
||||
|
||||
assert config.http_options is not None
|
||||
proxied = config.http_options.x_forwarded
|
||||
@@ -705,6 +713,7 @@ class SynapseSite(ProxySite):
|
||||
return request_class(
|
||||
channel,
|
||||
self,
|
||||
our_server_name=self.server_name,
|
||||
max_request_body_size=max_request_body_size,
|
||||
queued=queued,
|
||||
request_id_header=request_id_header,
|
||||
|
||||
@@ -91,6 +91,7 @@ terms, an endpoint you can scrape is called an *instance*, usually corresponding
|
||||
single process." (source: https://prometheus.io/docs/concepts/jobs_instances/)
|
||||
"""
|
||||
|
||||
|
||||
CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8"
|
||||
"""
|
||||
Content type of the latest text format for Prometheus metrics.
|
||||
@@ -471,18 +472,24 @@ REGISTRY.register(CPUMetrics())
|
||||
# Federation Metrics
|
||||
#
|
||||
|
||||
sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "")
|
||||
sent_transactions_counter = Counter(
|
||||
"synapse_federation_client_sent_transactions", "", labelnames=[SERVER_NAME_LABEL]
|
||||
)
|
||||
|
||||
events_processed_counter = Counter("synapse_federation_client_events_processed", "")
|
||||
events_processed_counter = Counter(
|
||||
"synapse_federation_client_events_processed", "", labelnames=[SERVER_NAME_LABEL]
|
||||
)
|
||||
|
||||
event_processing_loop_counter = Counter(
|
||||
"synapse_event_processing_loop_count", "Event processing loop iterations", ["name"]
|
||||
"synapse_event_processing_loop_count",
|
||||
"Event processing loop iterations",
|
||||
labelnames=["name", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
event_processing_loop_room_count = Counter(
|
||||
"synapse_event_processing_loop_room_count",
|
||||
"Rooms seen per event processing loop iteration",
|
||||
["name"],
|
||||
labelnames=["name", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -50,7 +50,7 @@ from synapse.handlers.presence import format_user_presence_state
|
||||
from synapse.logging import issue9533_logger
|
||||
from synapse.logging.context import PreserveLoggingContext
|
||||
from synapse.logging.opentracing import log_kv, start_active_span
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.types import (
|
||||
ISynapseReactor,
|
||||
@@ -74,10 +74,15 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
notified_events_counter = Counter("synapse_notifier_notified_events", "")
|
||||
# FIXME: Unused metric, remove if not needed.
|
||||
notified_events_counter = Counter(
|
||||
"synapse_notifier_notified_events", "", labelnames=[SERVER_NAME_LABEL]
|
||||
)
|
||||
|
||||
users_woken_by_stream_counter = Counter(
|
||||
"synapse_notifier_users_woken_by_stream", "", ["stream"]
|
||||
"synapse_notifier_users_woken_by_stream",
|
||||
"",
|
||||
labelnames=["stream", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
T = TypeVar("T")
|
||||
@@ -224,6 +229,7 @@ class Notifier:
|
||||
self.room_to_user_streams: Dict[str, Set[_NotifierUserStream]] = {}
|
||||
|
||||
self.hs = hs
|
||||
self.server_name = hs.hostname
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self.event_sources = hs.get_event_sources()
|
||||
self.store = hs.get_datastores().main
|
||||
@@ -350,9 +356,10 @@ class Notifier:
|
||||
for listener in listeners:
|
||||
listener.callback(current_token)
|
||||
|
||||
users_woken_by_stream_counter.labels(StreamKeyType.UN_PARTIAL_STATED_ROOMS).inc(
|
||||
len(user_streams)
|
||||
)
|
||||
users_woken_by_stream_counter.labels(
|
||||
stream=StreamKeyType.UN_PARTIAL_STATED_ROOMS,
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc(len(user_streams))
|
||||
|
||||
# Poke the replication so that other workers also see the write to
|
||||
# the un-partial-stated rooms stream.
|
||||
@@ -575,7 +582,10 @@ class Notifier:
|
||||
listener.callback(current_token)
|
||||
|
||||
if user_streams:
|
||||
users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
|
||||
users_woken_by_stream_counter.labels(
|
||||
stream=stream_key,
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc(len(user_streams))
|
||||
|
||||
self.notify_replication()
|
||||
|
||||
|
||||
@@ -50,6 +50,7 @@ from synapse.event_auth import auth_types_for_event, get_user_power_level
|
||||
from synapse.events import EventBase, relation_from_event
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.state import CREATE_KEY, POWER_KEY
|
||||
from synapse.storage.databases.main.roommember import EventIdMembership
|
||||
from synapse.storage.invite_rule import InviteRule
|
||||
@@ -68,11 +69,17 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# FIXME: Unused metric, remove if not needed.
|
||||
push_rules_invalidation_counter = Counter(
|
||||
"synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter", ""
|
||||
"synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter",
|
||||
"",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
# FIXME: Unused metric, remove if not needed.
|
||||
push_rules_state_size_counter = Counter(
|
||||
"synapse_push_bulk_push_rule_evaluator_push_rules_state_size_counter", ""
|
||||
"synapse_push_bulk_push_rule_evaluator_push_rules_state_size_counter",
|
||||
"",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -31,6 +31,7 @@ from twisted.internet.interfaces import IDelayedCall
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.events import EventBase
|
||||
from synapse.logging import opentracing
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.push import Pusher, PusherConfig, PusherConfigException
|
||||
from synapse.storage.databases.main.event_push_actions import HttpPushAction
|
||||
@@ -46,21 +47,25 @@ logger = logging.getLogger(__name__)
|
||||
http_push_processed_counter = Counter(
|
||||
"synapse_http_httppusher_http_pushes_processed",
|
||||
"Number of push notifications successfully sent",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
http_push_failed_counter = Counter(
|
||||
"synapse_http_httppusher_http_pushes_failed",
|
||||
"Number of push notifications which failed",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
http_badges_processed_counter = Counter(
|
||||
"synapse_http_httppusher_badge_updates_processed",
|
||||
"Number of badge updates successfully sent",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
http_badges_failed_counter = Counter(
|
||||
"synapse_http_httppusher_badge_updates_failed",
|
||||
"Number of badge updates which failed",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
@@ -268,7 +273,9 @@ class HttpPusher(Pusher):
|
||||
processed = await self._process_one(push_action)
|
||||
|
||||
if processed:
|
||||
http_push_processed_counter.inc()
|
||||
http_push_processed_counter.labels(
|
||||
**{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc()
|
||||
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
|
||||
self.last_stream_ordering = push_action.stream_ordering
|
||||
pusher_still_exists = (
|
||||
@@ -292,7 +299,9 @@ class HttpPusher(Pusher):
|
||||
self.app_id, self.pushkey, self.user_id, self.failing_since
|
||||
)
|
||||
else:
|
||||
http_push_failed_counter.inc()
|
||||
http_push_failed_counter.labels(
|
||||
**{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc()
|
||||
if not self.failing_since:
|
||||
self.failing_since = self.clock.time_msec()
|
||||
await self.store.update_pusher_failing_since(
|
||||
@@ -543,9 +552,13 @@ class HttpPusher(Pusher):
|
||||
}
|
||||
try:
|
||||
await self.http_client.post_json_get_json(self.url, d)
|
||||
http_badges_processed_counter.inc()
|
||||
http_badges_processed_counter.labels(
|
||||
**{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc()
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Failed to send badge count to %s: %s %s", self.name, type(e), e
|
||||
)
|
||||
http_badges_failed_counter.inc()
|
||||
http_badges_failed_counter.labels(
|
||||
**{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc()
|
||||
|
||||
@@ -32,6 +32,7 @@ from synapse.api.constants import EventContentFields, EventTypes, Membership, Ro
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.config.emailconfig import EmailSubjectConfig
|
||||
from synapse.events import EventBase
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.push.presentable_names import (
|
||||
calculate_room_name,
|
||||
descriptor_from_member_events,
|
||||
@@ -60,7 +61,7 @@ T = TypeVar("T")
|
||||
emails_sent_counter = Counter(
|
||||
"synapse_emails_sent_total",
|
||||
"Emails sent by type",
|
||||
["type"],
|
||||
labelnames=["type", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
@@ -123,6 +124,7 @@ class Mailer:
|
||||
template_text: jinja2.Template,
|
||||
):
|
||||
self.hs = hs
|
||||
self.server_name = hs.hostname
|
||||
self.template_html = template_html
|
||||
self.template_text = template_text
|
||||
|
||||
@@ -137,8 +139,6 @@ class Mailer:
|
||||
|
||||
logger.info("Created Mailer for app_name %s", app_name)
|
||||
|
||||
emails_sent_counter.labels("password_reset")
|
||||
|
||||
async def send_password_reset_mail(
|
||||
self, email_address: str, token: str, client_secret: str, sid: str
|
||||
) -> None:
|
||||
@@ -162,7 +162,10 @@ class Mailer:
|
||||
|
||||
template_vars: TemplateVars = {"link": link}
|
||||
|
||||
emails_sent_counter.labels("password_reset").inc()
|
||||
emails_sent_counter.labels(
|
||||
type="password_reset",
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
|
||||
await self.send_email(
|
||||
email_address,
|
||||
@@ -171,8 +174,6 @@ class Mailer:
|
||||
template_vars,
|
||||
)
|
||||
|
||||
emails_sent_counter.labels("registration")
|
||||
|
||||
async def send_registration_mail(
|
||||
self, email_address: str, token: str, client_secret: str, sid: str
|
||||
) -> None:
|
||||
@@ -196,7 +197,10 @@ class Mailer:
|
||||
|
||||
template_vars: TemplateVars = {"link": link}
|
||||
|
||||
emails_sent_counter.labels("registration").inc()
|
||||
emails_sent_counter.labels(
|
||||
type="registration",
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
|
||||
await self.send_email(
|
||||
email_address,
|
||||
@@ -205,8 +209,6 @@ class Mailer:
|
||||
template_vars,
|
||||
)
|
||||
|
||||
emails_sent_counter.labels("already_in_use")
|
||||
|
||||
async def send_already_in_use_mail(self, email_address: str) -> None:
|
||||
"""Send an email if the address is already bound to an user account
|
||||
|
||||
@@ -214,6 +216,11 @@ class Mailer:
|
||||
email_address: Email address we're sending to the "already in use" mail
|
||||
"""
|
||||
|
||||
emails_sent_counter.labels(
|
||||
type="already_in_use",
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
|
||||
await self.send_email(
|
||||
email_address,
|
||||
self.email_subjects.email_already_in_use
|
||||
@@ -221,8 +228,6 @@ class Mailer:
|
||||
{},
|
||||
)
|
||||
|
||||
emails_sent_counter.labels("add_threepid")
|
||||
|
||||
async def send_add_threepid_mail(
|
||||
self, email_address: str, token: str, client_secret: str, sid: str
|
||||
) -> None:
|
||||
@@ -247,7 +252,10 @@ class Mailer:
|
||||
|
||||
template_vars: TemplateVars = {"link": link}
|
||||
|
||||
emails_sent_counter.labels("add_threepid").inc()
|
||||
emails_sent_counter.labels(
|
||||
type="add_threepid",
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
|
||||
await self.send_email(
|
||||
email_address,
|
||||
@@ -256,8 +264,6 @@ class Mailer:
|
||||
template_vars,
|
||||
)
|
||||
|
||||
emails_sent_counter.labels("notification")
|
||||
|
||||
async def send_notification_mail(
|
||||
self,
|
||||
app_id: str,
|
||||
@@ -352,7 +358,10 @@ class Mailer:
|
||||
"reason": reason,
|
||||
}
|
||||
|
||||
emails_sent_counter.labels("notification").inc()
|
||||
emails_sent_counter.labels(
|
||||
type="notification",
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
|
||||
await self.send_email(
|
||||
email_address, summary_text, template_vars, unsubscribe_link
|
||||
|
||||
@@ -38,6 +38,7 @@ from synapse.http.servlet import parse_json_object_from_request
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging import opentracing
|
||||
from synapse.logging.opentracing import trace_with_opname
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
from synapse.util.cancellation import is_function_cancellable
|
||||
@@ -57,7 +58,7 @@ _pending_outgoing_requests = Gauge(
|
||||
_outgoing_request_counter = Counter(
|
||||
"synapse_outgoing_replication_requests",
|
||||
"Number of outgoing replication requests, by replication method name and result",
|
||||
["name", "code"],
|
||||
labelnames=["name", "code", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
@@ -205,6 +206,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
|
||||
parameter to specify which instance to hit (the instance must be in
|
||||
the `instance_map` config).
|
||||
"""
|
||||
server_name = hs.hostname
|
||||
clock = hs.get_clock()
|
||||
client = hs.get_replication_client()
|
||||
local_instance_name = hs.get_instance_name()
|
||||
@@ -333,15 +335,27 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
|
||||
# We convert to SynapseError as we know that it was a SynapseError
|
||||
# on the main process that we should send to the client. (And
|
||||
# importantly, not stack traces everywhere)
|
||||
_outgoing_request_counter.labels(cls.NAME, e.code).inc()
|
||||
_outgoing_request_counter.labels(
|
||||
name=cls.NAME,
|
||||
code=e.code,
|
||||
**{SERVER_NAME_LABEL: server_name},
|
||||
).inc()
|
||||
raise e.to_synapse_error()
|
||||
except Exception as e:
|
||||
_outgoing_request_counter.labels(cls.NAME, "ERR").inc()
|
||||
_outgoing_request_counter.labels(
|
||||
name=cls.NAME,
|
||||
code="ERR",
|
||||
**{SERVER_NAME_LABEL: server_name},
|
||||
).inc()
|
||||
raise SynapseError(
|
||||
502, f"Failed to talk to {instance_name} process"
|
||||
) from e
|
||||
|
||||
_outgoing_request_counter.labels(cls.NAME, 200).inc()
|
||||
_outgoing_request_counter.labels(
|
||||
name=cls.NAME,
|
||||
code=200,
|
||||
**{SERVER_NAME_LABEL: server_name},
|
||||
).inc()
|
||||
|
||||
# Wait on any streams that the remote may have written to.
|
||||
for stream_name, position in result.pop(
|
||||
|
||||
@@ -26,6 +26,7 @@ from prometheus_client import Counter, Histogram
|
||||
|
||||
from synapse.logging import opentracing
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.util import json_decoder, json_encoder
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -36,13 +37,13 @@ if TYPE_CHECKING:
|
||||
set_counter = Counter(
|
||||
"synapse_external_cache_set",
|
||||
"Number of times we set a cache",
|
||||
labelnames=["cache_name"],
|
||||
labelnames=["cache_name", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
get_counter = Counter(
|
||||
"synapse_external_cache_get",
|
||||
"Number of times we get a cache",
|
||||
labelnames=["cache_name", "hit"],
|
||||
labelnames=["cache_name", "hit", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
response_timer = Histogram(
|
||||
@@ -69,6 +70,8 @@ class ExternalCache:
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.server_name = hs.hostname
|
||||
|
||||
if hs.config.redis.redis_enabled:
|
||||
self._redis_connection: Optional["ConnectionHandler"] = (
|
||||
hs.get_outbound_redis_connection()
|
||||
@@ -93,7 +96,9 @@ class ExternalCache:
|
||||
if self._redis_connection is None:
|
||||
return
|
||||
|
||||
set_counter.labels(cache_name).inc()
|
||||
set_counter.labels(
|
||||
cache_name=cache_name, **{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc()
|
||||
|
||||
# txredisapi requires the value to be string, bytes or numbers, so we
|
||||
# encode stuff in JSON.
|
||||
@@ -131,7 +136,11 @@ class ExternalCache:
|
||||
|
||||
logger.debug("Got cache result %s %s: %r", cache_name, key, result)
|
||||
|
||||
get_counter.labels(cache_name, result is not None).inc()
|
||||
get_counter.labels(
|
||||
cache_name=cache_name,
|
||||
hit=result is not None,
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
|
||||
if not result:
|
||||
return None
|
||||
|
||||
@@ -40,7 +40,7 @@ from prometheus_client import Counter
|
||||
|
||||
from twisted.internet.protocol import ReconnectingClientFactory
|
||||
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.replication.tcp.commands import (
|
||||
ClearUserSyncsCommand,
|
||||
@@ -85,13 +85,26 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
# number of updates received for each RDATA stream
|
||||
inbound_rdata_count = Counter(
|
||||
"synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"]
|
||||
"synapse_replication_tcp_protocol_inbound_rdata_count",
|
||||
"",
|
||||
labelnames=["stream_name", SERVER_NAME_LABEL],
|
||||
)
|
||||
user_sync_counter = Counter(
|
||||
"synapse_replication_tcp_resource_user_sync", "", labelnames=[SERVER_NAME_LABEL]
|
||||
)
|
||||
federation_ack_counter = Counter(
|
||||
"synapse_replication_tcp_resource_federation_ack",
|
||||
"",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
# FIXME: Unused metric, remove if not needed.
|
||||
remove_pusher_counter = Counter(
|
||||
"synapse_replication_tcp_resource_remove_pusher", "", labelnames=[SERVER_NAME_LABEL]
|
||||
)
|
||||
user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
|
||||
federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "")
|
||||
remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
|
||||
|
||||
user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")
|
||||
user_ip_cache_counter = Counter(
|
||||
"synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL]
|
||||
)
|
||||
|
||||
|
||||
# the type of the entries in _command_queues_by_stream
|
||||
@@ -460,7 +473,7 @@ class ReplicationCommandHandler:
|
||||
def on_USER_SYNC(
|
||||
self, conn: IReplicationConnection, cmd: UserSyncCommand
|
||||
) -> Optional[Awaitable[None]]:
|
||||
user_sync_counter.inc()
|
||||
user_sync_counter.labels(**{SERVER_NAME_LABEL: self.server_name}).inc()
|
||||
|
||||
if self._is_presence_writer:
|
||||
return self._presence_handler.update_external_syncs_row(
|
||||
@@ -484,7 +497,7 @@ class ReplicationCommandHandler:
|
||||
def on_FEDERATION_ACK(
|
||||
self, conn: IReplicationConnection, cmd: FederationAckCommand
|
||||
) -> None:
|
||||
federation_ack_counter.inc()
|
||||
federation_ack_counter.labels(**{SERVER_NAME_LABEL: self.server_name}).inc()
|
||||
|
||||
if self._federation_sender:
|
||||
self._federation_sender.federation_ack(cmd.instance_name, cmd.token)
|
||||
@@ -492,7 +505,7 @@ class ReplicationCommandHandler:
|
||||
def on_USER_IP(
|
||||
self, conn: IReplicationConnection, cmd: UserIpCommand
|
||||
) -> Optional[Awaitable[None]]:
|
||||
user_ip_cache_counter.inc()
|
||||
user_ip_cache_counter.labels(**{SERVER_NAME_LABEL: self.server_name}).inc()
|
||||
|
||||
if self._is_master or self._should_insert_client_ips:
|
||||
# We make a point of only returning an awaitable if there's actually
|
||||
@@ -532,7 +545,9 @@ class ReplicationCommandHandler:
|
||||
return
|
||||
|
||||
stream_name = cmd.stream_name
|
||||
inbound_rdata_count.labels(stream_name).inc()
|
||||
inbound_rdata_count.labels(
|
||||
stream_name=stream_name, **{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc()
|
||||
|
||||
# We put the received command into a queue here for two reasons:
|
||||
# 1. so we don't try and concurrently handle multiple rows for the
|
||||
|
||||
@@ -36,10 +36,7 @@ from twisted.internet.address import IPv4Address, IPv6Address
|
||||
from twisted.internet.interfaces import IAddress, IConnector
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from synapse.logging.context import (
|
||||
PreserveLoggingContext,
|
||||
make_deferred_yieldable,
|
||||
)
|
||||
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.metrics.background_process_metrics import (
|
||||
BackgroundProcessLoggingContext,
|
||||
|
||||
@@ -29,6 +29,7 @@ from prometheus_client import Counter
|
||||
from twisted.internet.interfaces import IAddress
|
||||
from twisted.internet.protocol import ServerFactory
|
||||
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.replication.tcp.commands import PositionCommand
|
||||
from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
|
||||
@@ -40,7 +41,9 @@ if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
stream_updates_counter = Counter(
|
||||
"synapse_replication_tcp_resource_stream_updates", "", ["stream_name"]
|
||||
"synapse_replication_tcp_resource_stream_updates",
|
||||
"",
|
||||
labelnames=["stream_name", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -226,7 +229,10 @@ class ReplicationStreamer:
|
||||
len(updates),
|
||||
current_token,
|
||||
)
|
||||
stream_updates_counter.labels(stream.NAME).inc(len(updates))
|
||||
stream_updates_counter.labels(
|
||||
stream_name=stream.NAME,
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc(len(updates))
|
||||
|
||||
else:
|
||||
# The token has advanced but there is no data to
|
||||
|
||||
@@ -323,10 +323,12 @@ class UsernameAvailabilityRestServlet(RestServlet):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__()
|
||||
self.hs = hs
|
||||
self.server_name = hs.hostname
|
||||
self.registration_handler = hs.get_registration_handler()
|
||||
self.ratelimiter = FederationRateLimiter(
|
||||
hs.get_clock(),
|
||||
FederationRatelimitSettings(
|
||||
our_server_name=self.server_name,
|
||||
clock=hs.get_clock(),
|
||||
config=FederationRatelimitSettings(
|
||||
# Time window of 2s
|
||||
window_size=2000,
|
||||
# Artificially delay requests if rate > sleep_limit/window_size
|
||||
|
||||
@@ -849,7 +849,8 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
@cache_in_self
|
||||
def get_federation_ratelimiter(self) -> FederationRateLimiter:
|
||||
return FederationRateLimiter(
|
||||
self.get_clock(),
|
||||
our_server_name=self.hostname,
|
||||
clock=self.get_clock(),
|
||||
config=self.config.ratelimiting.rc_federation,
|
||||
metrics_name="federation_servlets",
|
||||
)
|
||||
|
||||
@@ -51,6 +51,7 @@ from synapse.events.snapshot import (
|
||||
)
|
||||
from synapse.logging.context import ContextResourceUsage
|
||||
from synapse.logging.opentracing import tag_args, trace
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServlet
|
||||
from synapse.state import v1, v2
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
@@ -607,11 +608,13 @@ _biggest_room_by_cpu_counter = Counter(
|
||||
"synapse_state_res_cpu_for_biggest_room_seconds",
|
||||
"CPU time spent performing state resolution for the single most expensive "
|
||||
"room for state resolution",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
_biggest_room_by_db_counter = Counter(
|
||||
"synapse_state_res_db_for_biggest_room_seconds",
|
||||
"Database time spent performing state resolution for the single most "
|
||||
"expensive room for state resolution",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
_cpu_times = Histogram(
|
||||
@@ -880,7 +883,9 @@ class StateResolutionHandler:
|
||||
|
||||
# report info on the single biggest to prometheus
|
||||
_, biggest_metrics = biggest[0]
|
||||
prometheus_counter_metric.inc(extract_key(biggest_metrics))
|
||||
prometheus_counter_metric.labels(**{SERVER_NAME_LABEL: self.server_name}).inc(
|
||||
extract_key(biggest_metrics)
|
||||
)
|
||||
|
||||
|
||||
def _make_state_cache_entry(
|
||||
|
||||
@@ -61,6 +61,7 @@ from synapse.logging.opentracing import (
|
||||
start_active_span_follows_from,
|
||||
trace,
|
||||
)
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage.controllers.state import StateStorageController
|
||||
from synapse.storage.databases import Databases
|
||||
@@ -82,19 +83,23 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# The number of times we are recalculating the current state
|
||||
state_delta_counter = Counter("synapse_storage_events_state_delta", "")
|
||||
state_delta_counter = Counter(
|
||||
"synapse_storage_events_state_delta", "", labelnames=[SERVER_NAME_LABEL]
|
||||
)
|
||||
|
||||
# The number of times we are recalculating state when there is only a
|
||||
# single forward extremity
|
||||
state_delta_single_event_counter = Counter(
|
||||
"synapse_storage_events_state_delta_single_event", ""
|
||||
"synapse_storage_events_state_delta_single_event",
|
||||
"",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
# The number of times we are reculating state when we could have resonably
|
||||
# calculated the delta when we calculated the state for an event we were
|
||||
# persisting.
|
||||
state_delta_reuse_delta_counter = Counter(
|
||||
"synapse_storage_events_state_delta_reuse_delta", ""
|
||||
"synapse_storage_events_state_delta_reuse_delta", "", labelnames=[SERVER_NAME_LABEL]
|
||||
)
|
||||
|
||||
# The number of forward extremities for each new event.
|
||||
@@ -115,16 +120,19 @@ stale_forward_extremities_counter = Histogram(
|
||||
state_resolutions_during_persistence = Counter(
|
||||
"synapse_storage_events_state_resolutions_during_persistence",
|
||||
"Number of times we had to do state res to calculate new current state",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
potential_times_prune_extremities = Counter(
|
||||
"synapse_storage_events_potential_times_prune_extremities",
|
||||
"Number of times we might be able to prune extremities",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
times_pruned_extremities = Counter(
|
||||
"synapse_storage_events_times_pruned_extremities",
|
||||
"Number of times we were actually be able to prune extremities",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
@@ -709,9 +717,11 @@ class EventsPersistenceStorageController:
|
||||
if all_single_prev_not_state:
|
||||
return (new_forward_extremities, None)
|
||||
|
||||
state_delta_counter.inc()
|
||||
state_delta_counter.labels(**{SERVER_NAME_LABEL: self.server_name}).inc()
|
||||
if len(new_latest_event_ids) == 1:
|
||||
state_delta_single_event_counter.inc()
|
||||
state_delta_single_event_counter.labels(
|
||||
**{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc()
|
||||
|
||||
# This is a fairly handwavey check to see if we could
|
||||
# have guessed what the delta would have been when
|
||||
@@ -726,7 +736,9 @@ class EventsPersistenceStorageController:
|
||||
for ev, _ in ev_ctx_rm:
|
||||
prev_event_ids = set(ev.prev_event_ids())
|
||||
if latest_event_ids == prev_event_ids:
|
||||
state_delta_reuse_delta_counter.inc()
|
||||
state_delta_reuse_delta_counter.labels(
|
||||
**{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc()
|
||||
break
|
||||
|
||||
logger.debug("Calculating state delta for room %s", room_id)
|
||||
@@ -996,7 +1008,9 @@ class EventsPersistenceStorageController:
|
||||
),
|
||||
)
|
||||
|
||||
state_resolutions_during_persistence.inc()
|
||||
state_resolutions_during_persistence.labels(
|
||||
**{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc()
|
||||
|
||||
# If the returned state matches the state group of one of the new
|
||||
# forward extremities then we check if we are able to prune some state
|
||||
@@ -1024,7 +1038,9 @@ class EventsPersistenceStorageController:
|
||||
"""See if we can prune any of the extremities after calculating the
|
||||
resolved state.
|
||||
"""
|
||||
potential_times_prune_extremities.inc()
|
||||
potential_times_prune_extremities.labels(
|
||||
**{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc()
|
||||
|
||||
# We keep all the extremities that have the same state group, and
|
||||
# see if we can drop the others.
|
||||
@@ -1122,7 +1138,7 @@ class EventsPersistenceStorageController:
|
||||
|
||||
return new_latest_event_ids
|
||||
|
||||
times_pruned_extremities.inc()
|
||||
times_pruned_extremities.labels(**{SERVER_NAME_LABEL: self.server_name}).inc()
|
||||
|
||||
logger.info(
|
||||
"Pruning forward extremities in room %s: from %s -> %s",
|
||||
|
||||
@@ -61,7 +61,7 @@ from synapse.logging.context import (
|
||||
current_context,
|
||||
make_deferred_yieldable,
|
||||
)
|
||||
from synapse.metrics import LaterGauge, register_threadpool
|
||||
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge, register_threadpool
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage.background_updates import BackgroundUpdater
|
||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
||||
@@ -85,8 +85,16 @@ perf_logger = logging.getLogger("synapse.storage.TIME")
|
||||
sql_scheduling_timer = Histogram("synapse_storage_schedule_time", "sec")
|
||||
|
||||
sql_query_timer = Histogram("synapse_storage_query_time", "sec", ["verb"])
|
||||
sql_txn_count = Counter("synapse_storage_transaction_time_count", "sec", ["desc"])
|
||||
sql_txn_duration = Counter("synapse_storage_transaction_time_sum", "sec", ["desc"])
|
||||
sql_txn_count = Counter(
|
||||
"synapse_storage_transaction_time_count",
|
||||
"sec",
|
||||
labelnames=["desc", SERVER_NAME_LABEL],
|
||||
)
|
||||
sql_txn_duration = Counter(
|
||||
"synapse_storage_transaction_time_sum",
|
||||
"sec",
|
||||
labelnames=["desc", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
# Unique indexes which have been added in background updates. Maps from table name
|
||||
@@ -869,8 +877,14 @@ class DatabasePool:
|
||||
|
||||
self._current_txn_total_time += duration
|
||||
self._txn_perf_counters.update(desc, duration)
|
||||
sql_txn_count.labels(desc).inc(1)
|
||||
sql_txn_duration.labels(desc).inc(duration)
|
||||
sql_txn_count.labels(
|
||||
desc=desc,
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc(1)
|
||||
sql_txn_duration.labels(
|
||||
desc=desc,
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc(duration)
|
||||
|
||||
async def runInteraction(
|
||||
self,
|
||||
|
||||
@@ -45,6 +45,7 @@ from synapse.api.errors import StoreError
|
||||
from synapse.api.room_versions import EventFormatVersions, RoomVersion
|
||||
from synapse.events import EventBase, make_event_from_dict
|
||||
from synapse.logging.opentracing import tag_args, trace
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.storage._base import db_to_json, make_in_list_sql_clause
|
||||
from synapse.storage.background_updates import ForeignKeyConstraint
|
||||
@@ -81,6 +82,7 @@ pdus_pruned_from_federation_queue = Counter(
|
||||
"synapse_federation_server_number_inbound_pdu_pruned",
|
||||
"The number of events in the inbound federation staging that have been "
|
||||
"pruned due to the queue getting too long",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -2003,7 +2005,9 @@ class EventFederationWorkerStore(
|
||||
if not to_delete:
|
||||
return False
|
||||
|
||||
pdus_pruned_from_federation_queue.inc(len(to_delete))
|
||||
pdus_pruned_from_federation_queue.labels(
|
||||
**{SERVER_NAME_LABEL: self.server_name}
|
||||
).inc(len(to_delete))
|
||||
logger.info(
|
||||
"Pruning %d events in room %s from federation queue",
|
||||
len(to_delete),
|
||||
|
||||
@@ -55,6 +55,7 @@ from synapse.events import EventBase, StrippedStateEvent, relation_from_event
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.events.utils import parse_stripped_state_event
|
||||
from synapse.logging.opentracing import trace
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.storage._base import db_to_json, make_in_list_sql_clause
|
||||
from synapse.storage.database import (
|
||||
DatabasePool,
|
||||
@@ -89,11 +90,13 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
persist_event_counter = Counter("synapse_storage_events_persisted_events", "")
|
||||
persist_event_counter = Counter(
|
||||
"synapse_storage_events_persisted_events", "", labelnames=[SERVER_NAME_LABEL]
|
||||
)
|
||||
event_counter = Counter(
|
||||
"synapse_storage_events_persisted_events_sep",
|
||||
"",
|
||||
["type", "origin_type", "origin_entity"],
|
||||
labelnames=["type", "origin_type", "origin_entity", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
# State event type/key pairs that we need to gather to fill in the
|
||||
@@ -237,6 +240,7 @@ class PersistEventsStore:
|
||||
db_conn: LoggingDatabaseConnection,
|
||||
):
|
||||
self.hs = hs
|
||||
self.server_name = hs.hostname
|
||||
self.db_pool = db
|
||||
self.store = main_data_store
|
||||
self.database_engine = db.engine
|
||||
@@ -357,7 +361,9 @@ class PersistEventsStore:
|
||||
new_event_links=new_event_links,
|
||||
sliding_sync_table_changes=sliding_sync_table_changes,
|
||||
)
|
||||
persist_event_counter.inc(len(events_and_contexts))
|
||||
persist_event_counter.labels(**{SERVER_NAME_LABEL: self.server_name}).inc(
|
||||
len(events_and_contexts)
|
||||
)
|
||||
|
||||
if not use_negative_stream_ordering:
|
||||
# we don't want to set the event_persisted_position to a negative
|
||||
@@ -375,7 +381,12 @@ class PersistEventsStore:
|
||||
origin_type = "remote"
|
||||
origin_entity = get_domain_from_id(event.sender)
|
||||
|
||||
event_counter.labels(event.type, origin_type, origin_entity).inc()
|
||||
event_counter.labels(
|
||||
type=event.type,
|
||||
origin_type=origin_type,
|
||||
origin_entity=origin_entity,
|
||||
**{SERVER_NAME_LABEL: self.server_name},
|
||||
).inc()
|
||||
|
||||
if (
|
||||
not self.hs.config.experimental.msc4293_enabled
|
||||
@@ -2839,7 +2850,7 @@ class PersistEventsStore:
|
||||
txn: LoggingTransaction,
|
||||
events_and_contexts: List[Tuple[EventBase, EventContext]],
|
||||
) -> None:
|
||||
to_prefill = []
|
||||
to_prefill: List[EventCacheEntry] = []
|
||||
|
||||
ev_map = {e.event_id: e for e, _ in events_and_contexts}
|
||||
if not ev_map:
|
||||
|
||||
@@ -52,7 +52,7 @@ from synapse.logging.context import (
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.logging.opentracing import start_active_span
|
||||
from synapse.metrics import Histogram, LaterGauge
|
||||
from synapse.metrics import SERVER_NAME_LABEL, Histogram, LaterGauge
|
||||
from synapse.util import Clock
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
@@ -65,12 +65,12 @@ logger = logging.getLogger(__name__)
|
||||
rate_limit_sleep_counter = Counter(
|
||||
"synapse_rate_limit_sleep",
|
||||
"Number of requests slept by the rate limiter",
|
||||
["rate_limiter_name"],
|
||||
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
|
||||
)
|
||||
rate_limit_reject_counter = Counter(
|
||||
"synapse_rate_limit_reject",
|
||||
"Number of requests rejected by the rate limiter",
|
||||
["rate_limiter_name"],
|
||||
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
|
||||
)
|
||||
queue_wait_timer = Histogram(
|
||||
"synapse_rate_limit_queue_wait_time_seconds",
|
||||
@@ -157,6 +157,7 @@ class FederationRateLimiter:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
our_server_name: str,
|
||||
clock: Clock,
|
||||
config: FederationRatelimitSettings,
|
||||
metrics_name: Optional[str] = None,
|
||||
@@ -174,7 +175,10 @@ class FederationRateLimiter:
|
||||
|
||||
def new_limiter() -> "_PerHostRatelimiter":
|
||||
return _PerHostRatelimiter(
|
||||
clock=clock, config=config, metrics_name=metrics_name
|
||||
our_server_name=our_server_name,
|
||||
clock=clock,
|
||||
config=config,
|
||||
metrics_name=metrics_name,
|
||||
)
|
||||
|
||||
self.ratelimiters: DefaultDict[str, "_PerHostRatelimiter"] = (
|
||||
@@ -205,6 +209,7 @@ class FederationRateLimiter:
|
||||
class _PerHostRatelimiter:
|
||||
def __init__(
|
||||
self,
|
||||
our_server_name: str,
|
||||
clock: Clock,
|
||||
config: FederationRatelimitSettings,
|
||||
metrics_name: Optional[str] = None,
|
||||
@@ -218,6 +223,7 @@ class _PerHostRatelimiter:
|
||||
for this rate limiter.
|
||||
from the rest in the metrics
|
||||
"""
|
||||
self.our_server_name = our_server_name
|
||||
self.clock = clock
|
||||
self.metrics_name = metrics_name
|
||||
|
||||
@@ -296,7 +302,10 @@ class _PerHostRatelimiter:
|
||||
if self.should_reject():
|
||||
logger.debug("Ratelimiter(%s): rejecting request", self.host)
|
||||
if self.metrics_name:
|
||||
rate_limit_reject_counter.labels(self.metrics_name).inc()
|
||||
rate_limit_reject_counter.labels(
|
||||
rate_limiter_name=self.metrics_name,
|
||||
**{SERVER_NAME_LABEL: self.our_server_name},
|
||||
).inc()
|
||||
raise LimitExceededError(
|
||||
limiter_name="rc_federation",
|
||||
retry_after_ms=int(self.window_size / self.sleep_limit),
|
||||
@@ -333,7 +342,10 @@ class _PerHostRatelimiter:
|
||||
self.sleep_sec,
|
||||
)
|
||||
if self.metrics_name:
|
||||
rate_limit_sleep_counter.labels(self.metrics_name).inc()
|
||||
rate_limit_sleep_counter.labels(
|
||||
rate_limiter_name=self.metrics_name,
|
||||
**{SERVER_NAME_LABEL: self.our_server_name},
|
||||
).inc()
|
||||
ret_defer = run_in_background(self.clock.sleep, self.sleep_sec)
|
||||
|
||||
self.sleeping_requests.add(request_id)
|
||||
|
||||
@@ -867,7 +867,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
|
||||
# First test a known access token
|
||||
channel = FakeChannel(self.site, self.reactor)
|
||||
# type-ignore: FakeChannel is a mock of an HTTPChannel, not a proper HTTPChannel
|
||||
req = SynapseRequest(channel, self.site) # type: ignore[arg-type]
|
||||
req = SynapseRequest(channel, self.site, self.hs.hostname) # type: ignore[arg-type]
|
||||
req.client.host = EXAMPLE_IPV4_ADDR
|
||||
req.requestHeaders.addRawHeader("Authorization", f"Bearer {known_token}")
|
||||
req.requestHeaders.addRawHeader("User-Agent", EXAMPLE_USER_AGENT)
|
||||
@@ -899,7 +899,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
|
||||
MAS_USER_AGENT = "masmasmas"
|
||||
|
||||
channel = FakeChannel(self.site, self.reactor)
|
||||
req = SynapseRequest(channel, self.site) # type: ignore[arg-type]
|
||||
req = SynapseRequest(channel, self.site, self.hs.hostname) # type: ignore[arg-type]
|
||||
req.client.host = MAS_IPV4_ADDR
|
||||
req.requestHeaders.addRawHeader(
|
||||
"Authorization", f"Bearer {self.auth._admin_token()}"
|
||||
|
||||
@@ -90,6 +90,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
||||
prev_state,
|
||||
new_state,
|
||||
is_mine=True,
|
||||
our_server_name=self.hs.hostname,
|
||||
wheel_timer=wheel_timer,
|
||||
now=now,
|
||||
persist=False,
|
||||
@@ -137,6 +138,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
||||
prev_state,
|
||||
new_state,
|
||||
is_mine=True,
|
||||
our_server_name=self.hs.hostname,
|
||||
wheel_timer=wheel_timer,
|
||||
now=now,
|
||||
persist=False,
|
||||
@@ -187,6 +189,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
||||
prev_state,
|
||||
new_state,
|
||||
is_mine=True,
|
||||
our_server_name=self.hs.hostname,
|
||||
wheel_timer=wheel_timer,
|
||||
now=now,
|
||||
persist=False,
|
||||
@@ -235,6 +238,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
||||
prev_state,
|
||||
new_state,
|
||||
is_mine=True,
|
||||
our_server_name=self.hs.hostname,
|
||||
wheel_timer=wheel_timer,
|
||||
now=now,
|
||||
persist=False,
|
||||
@@ -275,6 +279,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
||||
prev_state,
|
||||
new_state,
|
||||
is_mine=False,
|
||||
our_server_name=self.hs.hostname,
|
||||
wheel_timer=wheel_timer,
|
||||
now=now,
|
||||
persist=False,
|
||||
@@ -314,6 +319,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
||||
prev_state,
|
||||
new_state,
|
||||
is_mine=True,
|
||||
our_server_name=self.hs.hostname,
|
||||
wheel_timer=wheel_timer,
|
||||
now=now,
|
||||
persist=False,
|
||||
@@ -341,6 +347,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
||||
prev_state,
|
||||
new_state,
|
||||
is_mine=True,
|
||||
our_server_name=self.hs.hostname,
|
||||
wheel_timer=wheel_timer,
|
||||
now=now,
|
||||
persist=False,
|
||||
@@ -431,6 +438,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
||||
prev_state,
|
||||
new_state,
|
||||
is_mine=True,
|
||||
our_server_name=self.hs.hostname,
|
||||
wheel_timer=wheel_timer,
|
||||
now=now,
|
||||
persist=True,
|
||||
@@ -494,6 +502,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
||||
prev_state,
|
||||
new_state,
|
||||
is_mine=True,
|
||||
our_server_name=self.hs.hostname,
|
||||
wheel_timer=wheel_timer,
|
||||
now=now,
|
||||
persist=False,
|
||||
|
||||
@@ -171,8 +171,9 @@ class TerseJsonTestCase(LoggerCleanupMixin, TestCase):
|
||||
site.site_tag = "test-site"
|
||||
site.server_version_string = "Server v1"
|
||||
site.reactor = Mock()
|
||||
|
||||
request = SynapseRequest(
|
||||
cast(HTTPChannel, FakeChannel(site, self.reactor)), site
|
||||
cast(HTTPChannel, FakeChannel(site, self.reactor)), site, "test_server"
|
||||
)
|
||||
# Call requestReceived to finish instantiating the object.
|
||||
request.content = BytesIO()
|
||||
|
||||
@@ -99,7 +99,7 @@ class RemoteKeyResourceTestCase(BaseRemoteKeyResourceTestCase):
|
||||
"""
|
||||
channel = FakeChannel(self.site, self.reactor)
|
||||
# channel is a `FakeChannel` but `HTTPChannel` is expected
|
||||
req = SynapseRequest(channel, self.site) # type: ignore[arg-type]
|
||||
req = SynapseRequest(channel, self.site, self.hs.hostname) # type: ignore[arg-type]
|
||||
req.content = BytesIO(b"")
|
||||
req.requestReceived(
|
||||
b"GET",
|
||||
@@ -201,7 +201,7 @@ class EndToEndPerspectivesTests(BaseRemoteKeyResourceTestCase):
|
||||
|
||||
channel = FakeChannel(self.site, self.reactor)
|
||||
# channel is a `FakeChannel` but `HTTPChannel` is expected
|
||||
req = SynapseRequest(channel, self.site) # type: ignore[arg-type]
|
||||
req = SynapseRequest(channel, self.site, self.hs.hostname) # type: ignore[arg-type]
|
||||
req.content = BytesIO(encode_canonical_json(data))
|
||||
|
||||
req.requestReceived(
|
||||
|
||||
@@ -432,7 +432,7 @@ def make_request(
|
||||
|
||||
channel = FakeChannel(site, reactor, ip=client_ip)
|
||||
|
||||
req = request(channel, site)
|
||||
req = request(channel, site, our_server_name="test_server")
|
||||
channel.request = req
|
||||
|
||||
req.content = BytesIO(content)
|
||||
|
||||
@@ -37,7 +37,7 @@ class FederationRateLimiterTestCase(TestCase):
|
||||
"""A simple test with the default values"""
|
||||
reactor, clock = get_clock()
|
||||
rc_config = build_rc_config()
|
||||
ratelimiter = FederationRateLimiter(clock, rc_config)
|
||||
ratelimiter = FederationRateLimiter("test_server", clock, rc_config)
|
||||
|
||||
with ratelimiter.ratelimit("testhost") as d1:
|
||||
# shouldn't block
|
||||
@@ -47,7 +47,7 @@ class FederationRateLimiterTestCase(TestCase):
|
||||
"""Test what happens when we hit the concurrent limit"""
|
||||
reactor, clock = get_clock()
|
||||
rc_config = build_rc_config({"rc_federation": {"concurrent": 2}})
|
||||
ratelimiter = FederationRateLimiter(clock, rc_config)
|
||||
ratelimiter = FederationRateLimiter("test_server", clock, rc_config)
|
||||
|
||||
with ratelimiter.ratelimit("testhost") as d1:
|
||||
# shouldn't block
|
||||
@@ -74,7 +74,7 @@ class FederationRateLimiterTestCase(TestCase):
|
||||
rc_config = build_rc_config(
|
||||
{"rc_federation": {"sleep_limit": 2, "sleep_delay": 500}}
|
||||
)
|
||||
ratelimiter = FederationRateLimiter(clock, rc_config)
|
||||
ratelimiter = FederationRateLimiter("test_server", clock, rc_config)
|
||||
|
||||
with ratelimiter.ratelimit("testhost") as d1:
|
||||
# shouldn't block
|
||||
@@ -105,7 +105,7 @@ class FederationRateLimiterTestCase(TestCase):
|
||||
}
|
||||
}
|
||||
)
|
||||
ratelimiter = FederationRateLimiter(clock, rc_config)
|
||||
ratelimiter = FederationRateLimiter("test_server", clock, rc_config)
|
||||
|
||||
with ratelimiter.ratelimit("testhost") as d:
|
||||
# shouldn't block
|
||||
|
||||
Reference in New Issue
Block a user