Compare commits

...

11 Commits

Author SHA1 Message Date
Eric Eastwood
6ce2f3e59d Fix CPU time going backwards when daemonize
When we `daemonize`, we fork the process and cputime metrics get confused
about the per-thread resource usage appearing to go backwards because we're
comparing the resource usage (`rusage`) from the original process to the
forked process.

We now kick off the background tasks (`run_as_background_process`) after we
have forked the process so the `rusage` we record when we `start` is in the
same thread when we `stop`.

Bad log examples from before:
```
synapse.logging.context - ERROR - _schedule_next_expiry-0 - utime went backwards! 0.050467 < 0.886526
synapse.logging.context - ERROR - _schedule_db_events-0 - stime went backwards! 0.009941 < 0.155106
synapse.logging.context - ERROR - wake_destinations_needing_catchup-0 - stime went backwards! 0.010175 < 0.130923
synapse.logging.context - ERROR - resume_sync_partial_state_room-0 - utime went backwards! 0.052898 < 0.886526
```

Testing strategy:

 1. Run with `daemonize: true` in your `homeserver.yaml`
 1. `poetry run synapse_homeserver --config-path homeserver.yaml`
 1. Shutdown the server
 1. Look for any bad log entries in your homeserver logs:
    - `Expected logging context sentinel but found main`
    - `Expected logging context main was lost`
    - `utime went backwards!`/`stime went backwards!`
2025-09-02 15:11:04 -05:00
Eric Eastwood
93044f4c5b Fix lints 2025-09-02 14:28:41 -05:00
Eric Eastwood
c7a80b63ec No need to patch_inline_callbacks when we don't have log context rules anymore 2025-09-02 14:07:26 -05:00
Eric Eastwood
1f384b0e21 Merge branch 'develop' into madlittlemods/log-context-using-contextvars3 2025-09-02 13:39:44 -05:00
Eric Eastwood
b2997a8f20 Suppress "Applying schema" log noise bulk when running Complement tests (#18878)
If Synapse is under test (`SYNAPSE_LOG_TESTING` is set), we don't care
about seeing the "Applying schema" log lines at the INFO level every
time we run the tests (it's 100 lines of bulk for each homeserver).

```
synapse_main | 2025-08-29 22:34:03,453 - synapse.storage.prepare_database - 433 - INFO - main - Applying schema deltas for v73
synapse_main | 2025-08-29 22:34:03,454 - synapse.storage.prepare_database - 541 - INFO - main - Applying schema 73/01event_failed_pull_attempts.sql
synapse_main | 2025-08-29 22:34:03,463 - synapse.storage.prepare_database - 541 - INFO - main - Applying schema 73/02add_pusher_enabled.sql
synapse_main | 2025-08-29 22:34:03,473 - synapse.storage.prepare_database - 541 - INFO - main - Applying schema 73/02room_id_indexes_for_purging.sql
synapse_main | 2025-08-29 22:34:03,482 - synapse.storage.prepare_database - 541 - INFO - main - Applying schema 73/03pusher_device_id.sql
synapse_main | 2025-08-29 22:34:03,492 - synapse.storage.prepare_database - 541 - INFO - main - Applying schema 73/03users_approved_column.sql
synapse_main | 2025-08-29 22:34:03,502 - synapse.storage.prepare_database - 541 - INFO - main - Applying schema 73/04partial_join_details.sql
synapse_main | 2025-08-29 22:34:03,513 - synapse.storage.prepare_database - 541 - INFO - main - Applying schema 73/04pending_device_list_updates.sql
...
```


The Synapse logs are visible when a Complement test fails or you use
`COMPLEMENT_ALWAYS_PRINT_SERVER_LOGS=1`. This is spawning from a
Complement test with three homeservers and wanting less log noise to
scroll through.
2025-09-02 13:34:47 -05:00
Eric Eastwood
bff4a11b3f Re-introduce: Fix LaterGauge metrics to collect from all servers (#18791)
Re-introduce: https://github.com/element-hq/synapse/pull/18751 that was
reverted in https://github.com/element-hq/synapse/pull/18789 (explains
why the PR was reverted in the first place).

- Adds a `cleanup` pattern that cleans up metrics from each homeserver
in the tests. Previously, the list of hooks built up until our CI
machines couldn't operate properly, see
https://github.com/element-hq/synapse/pull/18789
- Fix long-standing issue with `synapse_background_update_status`
metrics only tracking the last database listed in the config (see
https://github.com/element-hq/synapse/pull/18791#discussion_r2261706749)
2025-09-02 12:14:27 -05:00
Eric Eastwood
3e66e0a1b8 Rewrite docs and remove log context rules (no longer relevant) 2025-08-27 22:46:58 -05:00
Eric Eastwood
0c8759bbb6 Remove mentions of Synapse logcontext rules (the distinction doesn't matter anymore) 2025-08-27 20:27:45 -05:00
Eric Eastwood
4303879cfe Add changelog 2025-08-27 20:13:34 -05:00
Eric Eastwood
3742b3b3fb Mark make_deferred_yieldable for future deletion 2025-08-27 19:57:32 -05:00
Quentin Gliech
224cb3f827 WIP: use a contextvar to store the logcontext 2025-08-27 19:57:28 -05:00
35 changed files with 540 additions and 829 deletions

1
changelog.d/18791.misc Normal file
View File

@@ -0,0 +1 @@
Fix `LaterGauge` metrics to collect from all servers.

1
changelog.d/18871.misc Normal file
View File

@@ -0,0 +1 @@
Store the `LoggingContext` in a `ContextVar` instead of a thread-local variable.

1
changelog.d/18878.docker Normal file
View File

@@ -0,0 +1 @@
Suppress "Applying schema" log noise bulk when `SYNAPSE_LOG_TESTING` is set.

View File

@@ -77,6 +77,13 @@ loggers:
#}
synapse.visibility.filtered_event_debug:
level: DEBUG
{#
If Synapse is under test, we don't care about seeing the "Applying schema" log
lines at the INFO level every time we run the tests (it's 100 lines of bulk)
#}
synapse.storage.prepare_database:
level: WARN
{% endif %}
root:

View File

@@ -1,12 +1,12 @@
# Log Contexts
To help track the processing of individual requests, synapse uses a
'`log context`' to track which request it is handling at any given
moment. This is done via a thread-local variable; a `logging.Filter` is
then used to fish the information back out of the thread-local variable
`LoggingContext` to track which request it is handling at any given
moment. This is done via a `ContextVar` variable; a `logging.Filter` is
then used to fish the information back out of the `ContextVar` variable
and add it to each log record.
Logcontexts are also used for CPU and database accounting, so that we
Log contexts are also used for CPU and database accounting, so that we
can track which requests were responsible for high CPU use or database
activity.
@@ -14,18 +14,11 @@ The `synapse.logging.context` module provides facilities for managing
the current log context (as well as providing the `LoggingContextFilter`
class).
Asynchronous functions make the whole thing complicated, so this document describes
how it all works, and how to write code which follows the rules.
In this document, "awaitable" refers to any object which can be `await`ed. In the context of
Synapse, that normally means either a coroutine or a Twisted
In this document, "awaitable" refers to any object which can be `await`ed. In the
context of Synapse, that normally means either a coroutine or a Twisted
[`Deferred`](https://twistedmatrix.com/documents/current/api/twisted.internet.defer.Deferred.html).
## Logcontexts without asynchronous code
In the absence of any asynchronous voodoo, things are simple enough. As with
any code of this nature, the rule is that our function should leave
things as it found them:
## Basic usage
```python
from synapse.logging import context # omitted from future snippets
@@ -45,7 +38,7 @@ def do_request_handling():
logger.debug("phew") # this will be logged against request_id
```
LoggingContext implements the context management methods, so the above
`LoggingContext` implements the context management methods, so the above
can be written much more succinctly as:
```python
@@ -59,197 +52,76 @@ def do_request_handling():
logger.debug("phew")
```
## Using logcontexts with awaitables
### The `sentinel` context
Awaitables break the linear flow of code so that there is no longer a single entry point
where we should set the logcontext and a single exit point where we should remove it.
The default context is `context.SENTINEL_CONTEXT`, which is a sentinel value to
represent the root context. This is what is used when there is no other context set.
Consider the example above, where `do_request_handling` needs to do some
blocking operation, and returns an awaitable:
No CPU/database usage metrics are recorded against the `sentinel` context.
Ideally, nothing from the Synapse homeserver would be logged against the `sentinel`
context as we want to know where the logs came from. In practice, this is not always the
case yet especially outside of request handling.
Previously, the `sentinel` context played a bigger role when we had to carefully deal
with thread-local storage; as we had to make sure to not leak another context to another
task after we gave up control to the reactor so we set the
### `PreserveLoggingContext`
In a similar vein of no longer as relevant, `PreserveLoggingContext` is another context
manager helper and a little bit of syntactic sugar to set the current log context
(without finishing it) and restore the previous context on exit.
```python
async def handle_request(request_id):
with context.LoggingContext() as request_context:
request_context.request = request_id
await do_request_handling()
import logging
from synapse.logging.context import LoggingContext
logger = logging.getLogger(__name__)
def main() -> None:
with context.LoggingContext("main"):
task_context = context.LoggingContext("task")
with task_context:
logger.debug("foo")
# Bad: will throw an error because `task_context` is already finished
with task_context:
logger.debug("bar")
logger.debug("finished")
```
In the above flow:
- The logcontext is set
- `do_request_handling` is called, and returns an awaitable
- `handle_request` awaits the awaitable
- Execution of `handle_request` is suspended
So we have stopped processing the request (and will probably go on to
start processing the next), without clearing the logcontext.
To circumvent this problem, synapse code assumes that, wherever you have
an awaitable, you will want to `await` it. To that end, wherever
functions return awaitables, we adopt the following conventions:
**Rules for functions returning awaitables:**
> - If the awaitable is already complete, the function returns with the
> same logcontext it started with.
> - If the awaitable is incomplete, the function clears the logcontext
> before returning; when the awaitable completes, it restores the
> logcontext before running any callbacks.
That sounds complicated, but actually it means a lot of code (including
the example above) "just works". There are two cases:
- If `do_request_handling` returns a completed awaitable, then the
logcontext will still be in place. In this case, execution will
continue immediately after the `await`; the "finished" line will
be logged against the right context, and the `with` block restores
the original context before we return to the caller.
- If the returned awaitable is incomplete, `do_request_handling` clears
the logcontext before returning. The logcontext is therefore clear
when `handle_request` `await`s the awaitable.
Once `do_request_handling`'s awaitable completes, it will reinstate
the logcontext, before running the second half of `handle_request`,
so again the "finished" line will be logged against the right context,
and the `with` block restores the original context.
As an aside, it's worth noting that `handle_request` follows our rules
- though that only matters if the caller has its own logcontext which it
cares about.
The following sections describe pitfalls and helpful patterns when
implementing these rules.
Always await your awaitables
----------------------------
Whenever you get an awaitable back from a function, you should `await` on
it as soon as possible. Do not pass go; do not do any logging; do not
call any other functions.
This can be fixed by using `PreserveLoggingContext`:
```python
async def fun():
logger.debug("starting")
await do_some_stuff() # just like this
import logging
from synapse.logging.context import LoggingContext
coro = more_stuff()
result = await coro # also fine, of course
logger = logging.getLogger(__name__)
return result
def main() -> None:
with context.LoggingContext("main"):
task_context = context.LoggingContext("task")
with PreserveLoggingContext(task_context):
logger.debug("foo")
with PreserveLoggingContext(task_context):
logger.debug("bar")
logger.debug("finished") # this will be logged against main
```
Provided this pattern is followed all the way back up to the callchain
to where the logcontext was set, this will make things work out ok:
provided `do_some_stuff` and `more_stuff` follow the rules above, then
so will `fun`.
Or you could equivalently just manage the log context manually via
`set_current_context`.
It's all too easy to forget to `await`: for instance if we forgot that
`do_some_stuff` returned an awaitable, we might plough on regardless. This
leads to a mess; it will probably work itself out eventually, but not
before a load of stuff has been logged against the wrong context.
(Normally, other things will break, more obviously, if you forget to
`await`, so this tends not to be a major problem in practice.)
Of course sometimes you need to do something a bit fancier with your
awaitable - not all code follows the linear A-then-B-then-C pattern.
Notes on implementing more complex patterns are in later sections.
## Where you create a new awaitable, make it follow the rules
Most of the time, an awaitable comes from another synapse function.
Sometimes, though, we need to make up a new awaitable, or we get an awaitable
back from external code. We need to make it follow our rules.
The easy way to do it is by using `context.make_deferred_yieldable`. Suppose we want to implement
`sleep`, which returns a deferred which will run its callbacks after a
given number of seconds. That might look like:
```python
# not a logcontext-rules-compliant function
def get_sleep_deferred(seconds):
d = defer.Deferred()
reactor.callLater(seconds, d.callback, None)
return d
```
That doesn't follow the rules, but we can fix it by calling it through
`context.make_deferred_yieldable`:
```python
async def sleep(seconds):
return await context.make_deferred_yieldable(get_sleep_deferred(seconds))
```
## Fire-and-forget
Sometimes you want to fire off a chain of execution, but not wait for
its result. That might look a bit like this:
```python
async def do_request_handling():
await foreground_operation()
# *don't* do this
background_operation()
logger.debug("Request handling complete")
async def background_operation():
await first_background_step()
logger.debug("Completed first step")
await second_background_step()
logger.debug("Completed second step")
```
The above code does a couple of steps in the background after
`do_request_handling` has finished. The log lines are still logged
against the `request_context` logcontext, which may or may not be
desirable. There are two big problems with the above, however. The first
problem is that, if `background_operation` returns an incomplete
awaitable, it will expect its caller to `await` immediately, so will have
cleared the logcontext. In this example, that means that 'Request
handling complete' will be logged without any context.
The second problem, which is potentially even worse, is that when the
awaitable returned by `background_operation` completes, it will restore
the original logcontext. There is nothing waiting on that awaitable, so
the logcontext will leak into the reactor and possibly get attached to
some arbitrary future operation.
There are two potential solutions to this.
One option is to surround the call to `background_operation` with a
`PreserveLoggingContext` call. That will reset the logcontext before
starting `background_operation` (so the context restored when the
deferred completes will be the empty logcontext), and will restore the
current logcontext before continuing the foreground process:
```python
async def do_request_handling():
await foreground_operation()
# start background_operation off in the empty logcontext, to
# avoid leaking the current context into the reactor.
with PreserveLoggingContext():
background_operation()
# this will now be logged against the request context
logger.debug("Request handling complete")
```
Obviously that option means that the operations done in
`background_operation` would be not be logged against a logcontext
(though that might be fixed by setting a different logcontext via a
`with LoggingContext(...)` in `background_operation`).
The second option is to use `context.run_in_background`, which wraps a
function so that it doesn't reset the logcontext even when it returns
an incomplete awaitable, and adds a callback to the returned awaitable to
reset the logcontext. In other words, it turns a function that follows
the Synapse rules about logcontexts and awaitables into one which behaves
more like an external function --- the opposite operation to that
described in the previous section. It can be used like this:
To drive an awaitable in the background, you can use `context.run_in_background`:
```python
async def do_request_handling():
@@ -261,104 +133,13 @@ async def do_request_handling():
logger.debug("Request handling complete")
```
## Passing synapse deferreds into third-party functions
A typical example of this is where we want to collect together two or
more awaitables via `defer.gatherResults`:
```python
a1 = operation1()
a2 = operation2()
a3 = defer.gatherResults([a1, a2])
```
This is really a variation of the fire-and-forget problem above, in that
we are firing off `a1` and `a2` without awaiting on them. The difference
is that we now have third-party code attached to their callbacks. Anyway
either technique given in the [Fire-and-forget](#fire-and-forget)
section will work.
Of course, the new awaitable returned by `gather` needs to be
wrapped in order to make it follow the logcontext rules before we can
yield it, as described in [Where you create a new awaitable, make it
follow the
rules](#where-you-create-a-new-awaitable-make-it-follow-the-rules).
So, option one: reset the logcontext before starting the operations to
be gathered:
```python
async def do_request_handling():
with PreserveLoggingContext():
a1 = operation1()
a2 = operation2()
result = await defer.gatherResults([a1, a2])
```
In this case particularly, though, option two, of using
`context.run_in_background` almost certainly makes more sense, so that
`operation1` and `operation2` are both logged against the original
logcontext. This looks like:
```python
async def do_request_handling():
a1 = context.run_in_background(operation1)
a2 = context.run_in_background(operation2)
result = await make_deferred_yieldable(defer.gatherResults([a1, a2]))
result = await defer.gatherResults([a1, a2])
```
## A note on garbage-collection of awaitable chains
It turns out that our logcontext rules do not play nicely with awaitable
chains which get orphaned and garbage-collected.
Imagine we have some code that looks like this:
```python
listener_queue = []
def on_something_interesting():
for d in listener_queue:
d.callback("foo")
async def await_something_interesting():
new_awaitable = defer.Deferred()
listener_queue.append(new_awaitable)
with PreserveLoggingContext():
await new_awaitable
```
Obviously, the idea here is that we have a bunch of things which are
waiting for an event. (It's just an example of the problem here, but a
relatively common one.)
Now let's imagine two further things happen. First of all, whatever was
waiting for the interesting thing goes away. (Perhaps the request times
out, or something *even more* interesting happens.)
Secondly, let's suppose that we decide that the interesting thing is
never going to happen, and we reset the listener queue:
```python
def reset_listener_queue():
listener_queue.clear()
```
So, both ends of the awaitable chain have now dropped their references,
and the awaitable chain is now orphaned, and will be garbage-collected at
some point. Note that `await_something_interesting` is a coroutine,
which Python implements as a generator function. When Python
garbage-collects generator functions, it gives them a chance to
clean up by making the `await` (or `yield`) raise a `GeneratorExit`
exception. In our case, that means that the `__exit__` handler of
`PreserveLoggingContext` will carefully restore the request context, but
there is now nothing waiting for its return, so the request context is
never cleared.
To reiterate, this problem only arises when *both* ends of a awaitable
chain are dropped. Dropping the the reference to an awaitable you're
supposed to be awaiting is bad practice, so this doesn't
actually happen too much. Unfortunately, when it does happen, it will
lead to leaked logcontexts which are incredibly hard to track down.
`background_process_metrics.run_as_background_process` also exists if you want some
automatic tracing and metrics for the background task.

View File

@@ -86,12 +86,5 @@ import synapse.util # noqa: E402
__version__ = synapse.util.SYNAPSE_VERSION
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when
# running the packaging tox test.
from synapse.util.patch_inline_callbacks import do_patch
do_patch()
check_rust_lib_up_to_date()

View File

@@ -153,9 +153,13 @@ def get_registered_paths_for_default(
"""
hs = MockHomeserver(base_config, worker_app)
# TODO We only do this to avoid an error, but don't need the database etc
hs.setup()
return get_registered_paths_for_hs(hs)
registered_paths = get_registered_paths_for_hs(hs)
hs.cleanup()
return registered_paths
def elide_http_methods_if_unconflicting(

View File

@@ -99,6 +99,7 @@ from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
from synapse.types import ISynapseReactor
from synapse.util import SYNAPSE_VERSION, Clock
from synapse.util.stringutils import random_string
# Cast safety: Twisted does some naughty magic which replaces the
# twisted.internet.reactor module with a Reactor instance at runtime.
@@ -323,6 +324,7 @@ class MockHomeserver:
self.config = config
self.hostname = config.server.server_name
self.version_string = SYNAPSE_VERSION
self.instance_id = random_string(5)
def get_clock(self) -> Clock:
return self.clock
@@ -330,6 +332,9 @@ class MockHomeserver:
def get_reactor(self) -> ISynapseReactor:
return reactor
def get_instance_id(self) -> str:
return self.instance_id
def get_instance_name(self) -> str:
return "master"
@@ -685,7 +690,15 @@ class Porter:
)
prepare_database(db_conn, engine, config=self.hs_config)
# Type safety: ignore that we're using Mock homeservers here.
store = Store(DatabasePool(hs, db_config, engine), db_conn, hs) # type: ignore[arg-type]
store = Store(
DatabasePool(
hs, # type: ignore[arg-type]
db_config,
engine,
),
db_conn,
hs, # type: ignore[arg-type]
)
db_conn.commit()
return store

View File

@@ -601,6 +601,12 @@ async def start(hs: "HomeServer") -> None:
hs.get_datastores().main.db_pool.start_profiling()
hs.get_pusherpool().start()
# Register background tasks required by this server. This must be done
# somewhat manually due to the background tasks not being registered
# unless handlers are instantiated.
if hs.config.worker.run_background_tasks:
hs.start_background_tasks()
# Log when we start the shut down process.
hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", logger.info, "Shutting down..."

View File

@@ -37,6 +37,7 @@ Events are replicated via a separate events stream.
"""
import logging
from enum import Enum
from typing import (
TYPE_CHECKING,
Dict,
@@ -67,6 +68,25 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
class QueueNames(str, Enum):
PRESENCE_MAP = "presence_map"
KEYED_EDU = "keyed_edu"
KEYED_EDU_CHANGED = "keyed_edu_changed"
EDUS = "edus"
POS_TIME = "pos_time"
PRESENCE_DESTINATIONS = "presence_destinations"
queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {}
for queue_name in QueueNames:
queue_name_to_gauge_map[queue_name] = LaterGauge(
name=f"synapse_federation_send_queue_{queue_name.value}_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
class FederationRemoteSendQueue(AbstractFederationSender):
"""A drop in replacement for FederationSender"""
@@ -111,23 +131,16 @@ class FederationRemoteSendQueue(AbstractFederationSender):
# we make a new function, so we need to make a new function so the inner
# lambda binds to the queue rather than to the name of the queue which
# changes. ARGH.
def register(name: str, queue: Sized) -> None:
LaterGauge(
name="synapse_federation_send_queue_%s_size" % (queue_name,),
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(queue)},
def register(queue_name: QueueNames, queue: Sized) -> None:
queue_name_to_gauge_map[queue_name].register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(queue)},
)
for queue_name in [
"presence_map",
"keyed_edu",
"keyed_edu_changed",
"edus",
"pos_time",
"presence_destinations",
]:
register(queue_name, getattr(self, queue_name))
for queue_name in QueueNames:
queue = getattr(self, queue_name.value)
assert isinstance(queue, Sized)
register(queue_name, queue=queue)
self.clock.looping_call(self._clear_queue, 30 * 1000)

View File

@@ -199,6 +199,24 @@ sent_pdus_destination_dist_total = Counter(
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_destinations_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_pdus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_edus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
# Time (in s) to wait before trying to wake up destinations that have
# catch-up outstanding.
# Please note that rate limiting still applies, so while the loop is
@@ -398,11 +416,9 @@ class FederationSender(AbstractFederationSender):
# map from destination to PerDestinationQueue
self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
transaction_queue_pending_destinations_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
(self.server_name,): sum(
1
for d in self._per_destination_queues.values()
@@ -410,22 +426,17 @@ class FederationSender(AbstractFederationSender):
)
},
)
LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
transaction_queue_pending_pdus_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
(self.server_name,): sum(
d.pending_pdu_count() for d in self._per_destination_queues.values()
)
},
)
LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
transaction_queue_pending_edus_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
(self.server_name,): sum(
d.pending_edu_count() for d in self._per_destination_queues.values()
)

View File

@@ -173,6 +173,18 @@ state_transition_counter = Counter(
labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
)
presence_user_to_current_state_size_gauge = LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
presence_wheel_timer_size_gauge = LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
# "currently_active"
LAST_ACTIVE_GRANULARITY = 60 * 1000
@@ -779,11 +791,9 @@ class PresenceHandler(BasePresenceHandler):
EduTypes.PRESENCE, self.incoming_presence
)
LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_current_state)},
presence_user_to_current_state_size_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(self.user_to_current_state)},
)
# The per-device presence state, maps user to devices to per-device presence state.
@@ -882,11 +892,9 @@ class PresenceHandler(BasePresenceHandler):
60 * 1000,
)
LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.wheel_timer)},
presence_wheel_timer_size_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(self.wheel_timer)},
)
# Used to handle sending of presence to newly joined users/servers

View File

@@ -164,11 +164,13 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
return counts
LaterGauge(
in_flight_requests = LaterGauge(
name="synapse_http_server_in_flight_requests_count",
desc="",
labelnames=["method", "servlet", SERVER_NAME_LABEL],
caller=_get_in_flight_counts,
)
in_flight_requests.register_hook(
homeserver_instance_id=None, hook=_get_in_flight_counts
)

View File

@@ -34,6 +34,7 @@ import logging
import threading
import typing
import warnings
from contextvars import ContextVar
from types import TracebackType
from typing import (
TYPE_CHECKING,
@@ -653,13 +654,12 @@ class PreserveLoggingContext:
)
_thread_local = threading.local()
_thread_local.current_context = SENTINEL_CONTEXT
_current_context: ContextVar[LoggingContextOrSentinel] = ContextVar("current_context")
def current_context() -> LoggingContextOrSentinel:
"""Get the current logging context from thread local storage"""
return getattr(_thread_local, "current_context", SENTINEL_CONTEXT)
return _current_context.get(SENTINEL_CONTEXT)
def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSentinel:
@@ -680,7 +680,7 @@ def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSe
if current is not context:
rusage = get_thread_resource_usage()
current.stop(rusage)
_thread_local.current_context = context
_current_context.set(context)
context.start(rusage)
return current
@@ -796,7 +796,6 @@ def run_in_background(
CRITICAL error about an unhandled error will be logged without much
indication about where it came from.
"""
current = current_context()
try:
res = f(*args, **kwargs)
except Exception:
@@ -825,23 +824,6 @@ def run_in_background(
# optimise out the messing about
return d
# The function may have reset the context before returning, so
# we need to restore it now.
ctx = set_current_context(current)
# The original context will be restored when the deferred
# completes, but there is nothing waiting for it, so it will
# get leaked into the reactor or some other function which
# wasn't expecting it. We therefore need to reset the context
# here.
#
# (If this feels asymmetric, consider it this way: we are
# effectively forking a new thread of execution. We are
# probably currently within a ``with LoggingContext()`` block,
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
d.addBoth(_set_context_cb, ctx)
return d
@@ -861,65 +843,20 @@ def run_coroutine_in_background(
cannot change the log contexts.
"""
current = current_context()
d = defer.ensureDeferred(coroutine)
# The function may have reset the context before returning, so
# we need to restore it now.
ctx = set_current_context(current)
# The original context will be restored when the deferred
# completes, but there is nothing waiting for it, so it will
# get leaked into the reactor or some other function which
# wasn't expecting it. We therefore need to reset the context
# here.
#
# (If this feels asymmetric, consider it this way: we are
# effectively forking a new thread of execution. We are
# probably currently within a ``with LoggingContext()`` block,
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
d.addBoth(_set_context_cb, ctx)
return d
return defer.ensureDeferred(coroutine)
T = TypeVar("T")
# TODO: This function is a no-op now and should be removed in a follow-up PR.
def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
"""Given a deferred, make it follow the Synapse logcontext rules:
If the deferred has completed, essentially does nothing (just returns another
completed deferred with the result/failure).
If the deferred has not yet completed, resets the logcontext before
returning a deferred. Then, when the deferred completes, restores the
current logcontext before running callbacks/errbacks.
(This is more-or-less the opposite operation to run_in_background.)
"""
if deferred.called and not deferred.paused:
# it looks like this deferred is ready to run any callbacks we give it
# immediately. We may as well optimise out the logcontext faffery.
return deferred
# ok, we can't be sure that a yield won't block, so let's reset the
# logcontext, and add a callback to the deferred to restore it.
prev_context = set_current_context(SENTINEL_CONTEXT)
deferred.addBoth(_set_context_cb, prev_context)
return deferred
ResultT = TypeVar("ResultT")
def _set_context_cb(result: ResultT, context: LoggingContextOrSentinel) -> ResultT:
"""A callback function which just sets the logging context"""
set_current_context(context)
return result
def defer_to_thread(
reactor: "ISynapseReactor", f: Callable[P, R], *args: P.args, **kwargs: P.kwargs
) -> "defer.Deferred[R]":
@@ -931,9 +868,6 @@ def defer_to_thread(
logcontext (so its CPU usage metrics will get attributed to the current
logcontext). `f` should preserve the logcontext it is given.
The result deferred follows the Synapse logcontext rules: you should `yield`
on it.
Args:
reactor: The reactor in whose main thread the Deferred will be invoked,
and whose threadpool we should use for the function.
@@ -971,9 +905,6 @@ def defer_to_threadpool(
logcontext (so its CPU usage metrics will get attributed to the current
logcontext). `f` should preserve the logcontext it is given.
The result deferred follows the Synapse logcontext rules: you should `yield`
on it.
Args:
reactor: The reactor in whose main thread the Deferred will be invoked.
Normally this will be hs.get_reactor().
@@ -991,18 +922,6 @@ def defer_to_threadpool(
A Deferred which fires a callback with the result of `f`, or an
errback if `f` throws an exception.
"""
curr_context = current_context()
if not curr_context:
logger.warning(
"Calling defer_to_threadpool from sentinel context: metrics will be lost"
return make_deferred_yieldable(
threads.deferToThreadPool(reactor, threadpool, f, *args, **kwargs)
)
parent_context = None
else:
assert isinstance(curr_context, LoggingContext)
parent_context = curr_context
def g() -> R:
with LoggingContext(str(curr_context), parent_context=parent_context):
return f(*args, **kwargs)
return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g))

View File

@@ -73,8 +73,6 @@ logger = logging.getLogger(__name__)
METRICS_PREFIX = "/_synapse/metrics"
all_gauges: Dict[str, Collector] = {}
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
SERVER_NAME_LABEL = "server_name"
@@ -163,42 +161,110 @@ class LaterGauge(Collector):
name: str
desc: str
labelnames: Optional[StrSequence] = attr.ib(hash=False)
# callback: should either return a value (if there are no labels for this metric),
# or dict mapping from a label tuple to a value
caller: Callable[
_instance_id_to_hook_map: Dict[
Optional[str], # instance_id
Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
]
],
] = attr.ib(factory=dict, hash=False)
"""
Map from homeserver instance_id to a callback. Each callback should either return a
value (if there are no labels for this metric), or dict mapping from a label tuple
to a value.
We use `instance_id` instead of `server_name` because it's possible to have multiple
workers running in the same process with the same `server_name`.
"""
def collect(self) -> Iterable[Metric]:
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
# (we don't enforce it here, one level up).
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]
for homeserver_instance_id, hook in self._instance_id_to_hook_map.items():
try:
calls = self.caller()
hook_result = hook()
except Exception:
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
yield g
return
logger.exception(
"Exception running callback for LaterGauge(%s) for homeserver_instance_id=%s",
self.name,
homeserver_instance_id,
)
# Continue to return the rest of the metrics that aren't broken
continue
if isinstance(calls, (int, float)):
g.add_metric([], calls)
if isinstance(hook_result, (int, float)):
g.add_metric([], hook_result)
else:
for k, v in calls.items():
for k, v in hook_result.items():
g.add_metric(k, v)
yield g
def register_hook(
self,
*,
homeserver_instance_id: Optional[str],
hook: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
],
) -> None:
"""
Register a callback/hook that will be called to generate a metric samples for
the gauge.
Args:
homeserver_instance_id: The unique ID for this Synapse process instance
(`hs.get_instance_id()`) that this hook is associated with. This can be used
later to lookup all hooks associated with a given server name in order to
unregister them. This should only be omitted for global hooks that work
across all homeservers.
hook: A callback that should either return a value (if there are no
labels for this metric), or dict mapping from a label tuple to a value
"""
# We shouldn't have multiple hooks registered for the same homeserver `instance_id`.
existing_hook = self._instance_id_to_hook_map.get(homeserver_instance_id)
assert existing_hook is None, (
f"LaterGauge(name={self.name}) hook already registered for homeserver_instance_id={homeserver_instance_id}. "
"This is likely a Synapse bug and you forgot to unregister the previous hooks for "
"the server (especially in tests)."
)
self._instance_id_to_hook_map[homeserver_instance_id] = hook
def unregister_hooks_for_homeserver_instance_id(
self, homeserver_instance_id: str
) -> None:
"""
Unregister all hooks associated with the given homeserver `instance_id`. This should be
called when a homeserver is shutdown to avoid extra hooks sitting around.
Args:
homeserver_instance_id: The unique ID for this Synapse process instance to
unregister hooks for (`hs.get_instance_id()`).
"""
self._instance_id_to_hook_map.pop(homeserver_instance_id, None)
def __attrs_post_init__(self) -> None:
self._register()
def _register(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
# We shouldn't have multiple metrics with the same name. Typically, metrics
# should be created globally so you shouldn't be running into this and this will
# catch any stupid mistakes. The `REGISTRY.register(self)` call above will also
# raise an error if the metric already exists but to make things explicit, we'll
# also check here.
existing_gauge = all_later_gauges_to_clean_up_on_shutdown.get(self.name)
assert existing_gauge is None, f"LaterGauge(name={self.name}) already exists. "
# Keep track of the gauge so we can clean it up later.
all_later_gauges_to_clean_up_on_shutdown[self.name] = self
all_later_gauges_to_clean_up_on_shutdown: Dict[str, LaterGauge] = {}
"""
Track all `LaterGauge` instances so we can remove any associated hooks during homeserver
shutdown.
"""
# `MetricsEntry` only makes sense when it is a `Protocol`,
@@ -250,7 +316,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
# Protects access to _registrations
self._lock = threading.Lock()
self._register_with_collector()
REGISTRY.register(self)
def register(
self,
@@ -341,14 +407,6 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
gauge.add_metric(labels=key, value=getattr(metrics, name))
yield gauge
def _register_with_collector(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
"""

View File

@@ -223,10 +223,9 @@ def run_as_background_process(
This should be used to wrap processes which are fired off to run in the
background, instead of being associated with a particular request.
It returns a Deferred which completes when the function completes, but it doesn't
follow the synapse logcontext rules, which makes it appropriate for passing to
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
normal synapse async function).
It returns a Deferred which completes when the function completes, which makes it
appropriate for passing to clock.looping_call and friends (or for
firing-and-forgetting in the middle of a normal synapse async function).
Args:
desc: a description for this background process type
@@ -241,8 +240,6 @@ def run_as_background_process(
Returns:
Deferred which returns the result of func, or `None` if func raises.
Note that the returned Deferred does not follow the synapse logcontext
rules.
"""
async def run() -> Optional[R]:

View File

@@ -237,10 +237,9 @@ def run_as_background_process(
This should be used to wrap processes which are fired off to run in the
background, instead of being associated with a particular request.
It returns a Deferred which completes when the function completes, but it doesn't
follow the synapse logcontext rules, which makes it appropriate for passing to
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
normal synapse async function).
It returns a Deferred which completes when the function completes, which makes it
appropriate for passing to clock.looping_call and friends (or for
firing-and-forgetting in the middle of a normal synapse async function).
Args:
desc: a description for this background process type
@@ -255,8 +254,6 @@ def run_as_background_process(
Returns:
Deferred which returns the result of func, or `None` if func raises.
Note that the returned Deferred does not follow the synapse logcontext
rules.
"""
logger.warning(
@@ -1375,9 +1372,7 @@ class ModuleApi:
Args:
f: The function to call repeatedly. f can be either synchronous or
asynchronous, and must follow Synapse's logcontext rules.
More info about logcontexts is available at
https://element-hq.github.io/synapse/latest/log_contexts.html
asynchronous.
msec: How long to wait between calls in milliseconds.
*args: Positional arguments to pass to function.
desc: The background task's description. Default to the function's name.
@@ -1431,9 +1426,7 @@ class ModuleApi:
Args:
msec: How long to wait before calling, in milliseconds.
f: The function to call once. f can be either synchronous or
asynchronous, and must follow Synapse's logcontext rules.
More info about logcontexts is available at
https://element-hq.github.io/synapse/latest/log_contexts.html
asynchronous.
*args: Positional arguments to pass to function.
desc: The background task's description. Default to the function's name.
**kwargs: Keyword arguments to pass to function.
@@ -1668,10 +1661,9 @@ class ModuleApi:
This should be used to wrap processes which are fired off to run in the
background, instead of being associated with a particular request.
It returns a Deferred which completes when the function completes, but it doesn't
follow the synapse logcontext rules, which makes it appropriate for passing to
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
normal synapse async function).
It returns a Deferred which completes when the function completes, which makes
it appropriate for passing to clock.looping_call and friends (or for
firing-and-forgetting in the middle of a normal synapse async function).
Args:
desc: a description for this background process type
@@ -1686,8 +1678,6 @@ class ModuleApi:
Returns:
Deferred which returns the result of func, or `None` if func raises.
Note that the returned Deferred does not follow the synapse logcontext
rules.
"""
return _run_as_background_process(
desc, self.server_name, func, *args, bg_start_span=bg_start_span, **kwargs

View File

@@ -86,6 +86,24 @@ users_woken_by_stream_counter = Counter(
labelnames=["stream", SERVER_NAME_LABEL],
)
notifier_listeners_gauge = LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
notifier_rooms_gauge = LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
notifier_users_gauge = LaterGauge(
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
T = TypeVar("T")
@@ -281,28 +299,20 @@ class Notifier:
)
}
LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=count_listeners,
notifier_listeners_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(), hook=count_listeners
)
LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
notifier_rooms_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
(self.server_name,): count(
bool, list(self.room_to_user_streams.values())
)
},
)
LaterGauge(
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_user_stream)},
notifier_users_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(self.user_to_user_stream)},
)
def add_replication_callback(self, cb: Callable[[], None]) -> None:

View File

@@ -106,6 +106,18 @@ user_ip_cache_counter = Counter(
"synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL]
)
tcp_resource_total_connections_gauge = LaterGauge(
name="synapse_replication_tcp_resource_total_connections",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
tcp_command_queue_gauge = LaterGauge(
name="synapse_replication_tcp_command_queue",
desc="Number of inbound RDATA/POSITION commands queued for processing",
labelnames=["stream_name", SERVER_NAME_LABEL],
)
# the type of the entries in _command_queues_by_stream
_StreamCommandQueue = Deque[
@@ -243,11 +255,9 @@ class ReplicationCommandHandler:
# outgoing replication commands to.)
self._connections: List[IReplicationConnection] = []
LaterGauge(
name="synapse_replication_tcp_resource_total_connections",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._connections)},
tcp_resource_total_connections_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(self._connections)},
)
# When POSITION or RDATA commands arrive, we stick them in a queue and process
@@ -266,11 +276,9 @@ class ReplicationCommandHandler:
# from that connection.
self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}
LaterGauge(
name="synapse_replication_tcp_command_queue",
desc="Number of inbound RDATA/POSITION commands queued for processing",
labelnames=["stream_name", SERVER_NAME_LABEL],
caller=lambda: {
tcp_command_queue_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
(stream_name, self.server_name): len(queue)
for stream_name, queue in self._command_queues_by_stream.items()
},

View File

@@ -527,7 +527,10 @@ pending_commands = LaterGauge(
name="synapse_replication_tcp_protocol_pending_commands",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: {
)
pending_commands.register_hook(
homeserver_instance_id=None,
hook=lambda: {
(p.name, p.server_name): len(p.pending_commands) for p in connected_connections
},
)
@@ -544,7 +547,10 @@ transport_send_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_send_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: {
)
transport_send_buffer.register_hook(
homeserver_instance_id=None,
hook=lambda: {
(p.name, p.server_name): transport_buffer_size(p) for p in connected_connections
},
)
@@ -571,7 +577,10 @@ tcp_transport_kernel_send_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_kernel_send_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: {
)
tcp_transport_kernel_send_buffer.register_hook(
homeserver_instance_id=None,
hook=lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, False)
for p in connected_connections
},
@@ -582,7 +591,10 @@ tcp_transport_kernel_read_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_kernel_read_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: {
)
tcp_transport_kernel_read_buffer.register_hook(
homeserver_instance_id=None,
hook=lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, True)
for p in connected_connections
},

View File

@@ -129,7 +129,10 @@ from synapse.http.client import (
)
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.media.media_repository import MediaRepository
from synapse.metrics import register_threadpool
from synapse.metrics import (
all_later_gauges_to_clean_up_on_shutdown,
register_threadpool,
)
from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager
from synapse.module_api import ModuleApi
from synapse.module_api.callbacks import ModuleApiCallbacks
@@ -363,11 +366,36 @@ class HomeServer(metaclass=abc.ABCMeta):
self.datastores = Databases(self.DATASTORE_CLASS, self)
logger.info("Finished setting up.")
# Register background tasks required by this server. This must be done
# somewhat manually due to the background tasks not being registered
# unless handlers are instantiated.
if self.config.worker.run_background_tasks:
self.setup_background_tasks()
def __del__(self) -> None:
"""
Called when an the homeserver is garbage collected.
Make sure we actually do some clean-up, rather than leak data.
"""
self.cleanup()
def cleanup(self) -> None:
"""
WIP: Clean-up any references to the homeserver and stop any running related
processes, timers, loops, replication stream, etc.
This should be called wherever you care about the HomeServer being completely
garbage collected like in tests. It's not necessary to call if you plan to just
shut down the whole Python process anyway.
Can be called multiple times.
"""
logger.info("Received cleanup request for %s.", self.hostname)
# TODO: Stop background processes, timers, loops, replication stream, etc.
# Cleanup metrics associated with the homeserver
for later_gauge in all_later_gauges_to_clean_up_on_shutdown.values():
later_gauge.unregister_hooks_for_homeserver_instance_id(
self.get_instance_id()
)
logger.info("Cleanup complete for %s.", self.hostname)
def start_listening(self) -> None: # noqa: B027 (no-op by design)
"""Start the HTTP, manhole, metrics, etc listeners
@@ -376,7 +404,7 @@ class HomeServer(metaclass=abc.ABCMeta):
appropriate listeners.
"""
def setup_background_tasks(self) -> None:
def start_background_tasks(self) -> None:
"""
Some handlers have side effects on instantiation (like registering
background updates). This function causes them to be fetched, and

View File

@@ -61,7 +61,7 @@ from synapse.logging.context import (
current_context,
make_deferred_yieldable,
)
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge, register_threadpool
from synapse.metrics import SERVER_NAME_LABEL, register_threadpool
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.background_updates import BackgroundUpdater
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
@@ -611,12 +611,6 @@ class DatabasePool:
)
self.updates = BackgroundUpdater(hs, self)
LaterGauge(
name="synapse_background_update_status",
desc="Background update status",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self.updates.get_status()},
)
self._previous_txn_total_time = 0.0
self._current_txn_total_time = 0.0

View File

@@ -22,6 +22,7 @@
import logging
from typing import TYPE_CHECKING, Generic, List, Optional, Type, TypeVar
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool, make_conn
from synapse.storage.databases.main.events import PersistEventsStore
@@ -40,6 +41,13 @@ logger = logging.getLogger(__name__)
DataStoreT = TypeVar("DataStoreT", bound=SQLBaseStore, covariant=True)
background_update_status = LaterGauge(
name="synapse_background_update_status",
desc="Background update status",
labelnames=["database_name", SERVER_NAME_LABEL],
)
class Databases(Generic[DataStoreT]):
"""The various databases.
@@ -143,6 +151,15 @@ class Databases(Generic[DataStoreT]):
db_conn.close()
# Track the background update status for each database
background_update_status.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
(database.name(), server_name): database.updates.get_status()
for database in self.databases
},
)
# Sanity check that we have actually configured all the required stores.
if not main:
raise Exception("No 'main' database configured")

View File

@@ -84,6 +84,13 @@ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership"
_POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000
federation_known_servers_gauge = LaterGauge(
name="synapse_federation_known_servers",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
@attr.s(frozen=True, slots=True, auto_attribs=True)
class EventIdMembership:
"""Returned by `get_membership_from_event_ids`"""
@@ -116,11 +123,9 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
1,
self._count_known_servers,
)
LaterGauge(
name="synapse_federation_known_servers",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self._known_servers_count},
federation_known_servers_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): self._known_servers_count},
)
@wrap_as_background_process("_count_known_servers")

View File

@@ -84,9 +84,6 @@ class AbstractObservableDeferred(Generic[_T], metaclass=abc.ABCMeta):
This returns a brand new deferred that is resolved when the underlying
deferred is resolved. Interacting with the returned deferred does not
effect the underlying deferred.
Note that the returned Deferred doesn't follow the Synapse logcontext rules -
you will probably want to `make_deferred_yieldable` it.
"""
...
@@ -100,11 +97,6 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
Cancelling or otherwise resolving an observer will not affect the original
ObservableDeferred.
NB that it does not attempt to do anything with logcontexts; in general
you should probably make_deferred_yieldable the deferreds
returned by `observe`, and ensure that the original deferred runs its
callbacks in the sentinel logcontext.
"""
__slots__ = ["_deferred", "_observers", "_result"]
@@ -861,16 +853,12 @@ def stop_cancellation(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
"""Prevent a `Deferred` from being cancelled by wrapping it in another `Deferred`.
Args:
deferred: The `Deferred` to protect against cancellation. Must not follow the
Synapse logcontext rules.
deferred: The `Deferred` to protect against cancellation.
Returns:
A new `Deferred`, which will contain the result of the original `Deferred`.
The new `Deferred` will not propagate cancellation through to the original.
When cancelled, the new `Deferred` will fail with a `CancelledError`.
The new `Deferred` will not follow the Synapse logcontext rules and should be
wrapped with `make_deferred_yieldable`.
"""
new_deferred: "defer.Deferred[T]" = defer.Deferred()
deferred.chainDeferred(new_deferred)
@@ -896,8 +884,7 @@ def delay_cancellation(awaitable: Awaitable[T]) -> Awaitable[T]:
resolve with a `CancelledError` until the original awaitable resolves.
Args:
deferred: The coroutine or `Deferred` to protect against cancellation. May
optionally follow the Synapse logcontext rules.
deferred: The coroutine or `Deferred` to protect against cancellation.
Returns:
A new `Deferred`, which will contain the result of the original coroutine or
@@ -906,10 +893,6 @@ def delay_cancellation(awaitable: Awaitable[T]) -> Awaitable[T]:
When cancelled, the new `Deferred` will wait until the original coroutine or
`Deferred` resolves before failing with a `CancelledError`.
The new `Deferred` will follow the Synapse logcontext rules if `awaitable`
follows the Synapse logcontext rules. Otherwise the new `Deferred` should be
wrapped with `make_deferred_yieldable`.
"""
# First, convert the awaitable into a `Deferred`.

View File

@@ -295,9 +295,6 @@ class DeferredCache(Generic[KT, VT]):
*original* `value`, (c) any future calls to `get()` will complete with the
result from the *new* `value`.
It is expected that `value` does *not* follow the synapse logcontext rules - ie,
if it is incomplete, it runs its callbacks in the sentinel context.
Args:
key: Key to be set
value: a deferred which will complete with a result to add to the cache

View File

@@ -234,11 +234,9 @@ class ResponseCache(Generic[KV]):
) -> RV:
"""Wrap together a *get* and *set* call, taking care of logcontexts
First looks up the key in the cache, and if it is present makes it
follow the synapse logcontext rules and returns it.
First looks up the key in the cache, and if present, returns it.
Otherwise, makes a call to *callback(*args, **kwargs)*, which should
follow the synapse logcontext rules, and adds the result to the cache.
Otherwise, makes a call to *callback(*args, **kwargs)* and adds the result to the cache.
Example usage:

View File

@@ -1,250 +0,0 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#
import functools
import sys
from types import GeneratorType
from typing import Any, Callable, Generator, List, TypeVar, cast
from typing_extensions import ParamSpec
from twisted.internet import defer
from twisted.internet.defer import Deferred
from twisted.python.failure import Failure
# Tracks if we've already patched inlineCallbacks
_already_patched = False
T = TypeVar("T")
P = ParamSpec("P")
def do_patch() -> None:
"""
Patch defer.inlineCallbacks so that it checks the state of the logcontext on exit
"""
from synapse.logging.context import current_context
global _already_patched
orig_inline_callbacks = defer.inlineCallbacks
if _already_patched:
return
def new_inline_callbacks(
f: Callable[P, Generator["Deferred[object]", object, T]],
) -> Callable[P, "Deferred[T]"]:
@functools.wraps(f)
def wrapped(*args: P.args, **kwargs: P.kwargs) -> "Deferred[T]":
start_context = current_context()
changes: List[str] = []
orig: Callable[P, "Deferred[T]"] = orig_inline_callbacks(
_check_yield_points(f, changes)
)
try:
res: "Deferred[T]" = orig(*args, **kwargs)
except Exception:
if current_context() != start_context:
for err in changes:
print(err, file=sys.stderr)
err = "%s changed context from %s to %s on exception" % (
f,
start_context,
current_context(),
)
print(err, file=sys.stderr)
raise Exception(err)
raise
if not isinstance(res, Deferred) or res.called:
if current_context() != start_context:
for err in changes:
print(err, file=sys.stderr)
err = "Completed %s changed context from %s to %s" % (
f,
start_context,
current_context(),
)
# print the error to stderr because otherwise all we
# see in travis-ci is the 500 error
print(err, file=sys.stderr)
raise Exception(err)
return res
if current_context():
err = (
"%s returned incomplete deferred in non-sentinel context "
"%s (start was %s)"
) % (f, current_context(), start_context)
print(err, file=sys.stderr)
raise Exception(err)
def check_ctx(r: T) -> T:
if current_context() != start_context:
for err in changes:
print(err, file=sys.stderr)
err = "%s completion of %s changed context from %s to %s" % (
"Failure" if isinstance(r, Failure) else "Success",
f,
start_context,
current_context(),
)
print(err, file=sys.stderr)
raise Exception(err)
return r
res.addBoth(check_ctx)
return res
return wrapped
defer.inlineCallbacks = new_inline_callbacks
_already_patched = True
def _check_yield_points(
f: Callable[P, Generator["Deferred[object]", object, T]],
changes: List[str],
) -> Callable:
"""Wraps a generator that is about to be passed to defer.inlineCallbacks
checking that after every yield the log contexts are correct.
It's perfectly valid for log contexts to change within a function, e.g. due
to new Measure blocks, so such changes are added to the given `changes`
list instead of triggering an exception.
Args:
f: generator function to wrap
changes: A list of strings detailing how the contexts
changed within a function.
Returns:
function
"""
from synapse.logging.context import current_context
@functools.wraps(f)
def check_yield_points_inner(
*args: P.args, **kwargs: P.kwargs
) -> Generator["Deferred[object]", object, T]:
gen = f(*args, **kwargs)
# We only patch if we have a native generator function, as we rely on
# `gen.gi_frame`.
if not isinstance(gen, GeneratorType):
ret = yield from gen
return ret
last_yield_line_no = gen.gi_frame.f_lineno
result: Any = None
while True:
expected_context = current_context()
try:
isFailure = isinstance(result, Failure)
if isFailure:
d = result.throwExceptionIntoGenerator(gen)
else:
d = gen.send(result)
except StopIteration as e:
if current_context() != expected_context:
# This happens when the context is lost sometime *after* the
# final yield and returning. E.g. we forgot to yield on a
# function that returns a deferred.
#
# We don't raise here as it's perfectly valid for contexts to
# change in a function, as long as it sets the correct context
# on resolving (which is checked separately).
err = (
"Function %r returned and changed context from %s to %s,"
" in %s between %d and end of func"
% (
f.__qualname__,
expected_context,
current_context(),
f.__code__.co_filename,
last_yield_line_no,
)
)
changes.append(err)
# The `StopIteration` contains the return value from the
# generator.
return cast(T, e.value)
frame = gen.gi_frame
if isinstance(d, defer.Deferred) and not d.called:
# This happens if we yield on a deferred that doesn't follow
# the log context rules without wrapping in a `make_deferred_yieldable`.
# We raise here as this should never happen.
if current_context():
err = (
"%s yielded with context %s rather than sentinel,"
" yielded on line %d in %s"
% (
frame.f_code.co_name,
current_context(),
frame.f_lineno,
frame.f_code.co_filename,
)
)
raise Exception(err)
# the wrapped function yielded a Deferred: yield it back up to the parent
# inlineCallbacks().
try:
result = yield d
except Exception:
# this will fish an earlier Failure out of the stack where possible, and
# thus is preferable to passing in an exception to the Failure
# constructor, since it results in less stack-mangling.
result = Failure()
if current_context() != expected_context:
# This happens because the context is lost sometime *after* the
# previous yield and *after* the current yield. E.g. the
# deferred we waited on didn't follow the rules, or we forgot to
# yield on a function between the two yield points.
#
# We don't raise here as its perfectly valid for contexts to
# change in a function, as long as it sets the correct context
# on resolving (which is checked separately).
err = (
"%s changed context from %s to %s, happened between lines %d and %d in %s"
% (
frame.f_code.co_name,
expected_context,
current_context(),
last_yield_line_no,
frame.f_lineno,
frame.f_code.co_filename,
)
)
changes.append(err)
last_yield_line_no = frame.f_lineno
return check_yield_points_inner

View File

@@ -131,22 +131,28 @@ def _get_counts_from_rate_limiter_instance(
# We track the number of affected hosts per time-period so we can
# differentiate one really noisy homeserver from a general
# ratelimit tuning problem across the federation.
LaterGauge(
sleep_affected_hosts_gauge = LaterGauge(
name="synapse_rate_limit_sleep_affected_hosts",
desc="Number of hosts that had requests put to sleep",
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
caller=lambda: _get_counts_from_rate_limiter_instance(
)
sleep_affected_hosts_gauge.register_hook(
homeserver_instance_id=None,
hook=lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum(
ratelimiter.should_sleep()
for ratelimiter in rate_limiter_instance.ratelimiters.values()
)
),
)
LaterGauge(
reject_affected_hosts_gauge = LaterGauge(
name="synapse_rate_limit_reject_affected_hosts",
desc="Number of hosts that had requests rejected",
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
caller=lambda: _get_counts_from_rate_limiter_instance(
)
reject_affected_hosts_gauge.register_hook(
homeserver_instance_id=None,
hook=lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum(
ratelimiter.should_reject()
for ratelimiter in rate_limiter_instance.ratelimiters.values()

View File

@@ -44,6 +44,13 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
running_tasks_gauge = LaterGauge(
name="synapse_scheduler_running_tasks",
desc="The number of concurrent running tasks handled by the TaskScheduler",
labelnames=[SERVER_NAME_LABEL],
)
class TaskScheduler:
"""
This is a simple task scheduler designed for resumable tasks. Normally,
@@ -130,11 +137,9 @@ class TaskScheduler:
TaskScheduler.SCHEDULE_INTERVAL_MS,
)
LaterGauge(
name="synapse_scheduler_running_tasks",
desc="The number of concurrent running tasks handled by the TaskScheduler",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._running_tasks)},
running_tasks_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(self._running_tasks)},
)
def register_action(

View File

@@ -21,9 +21,4 @@
from twisted.trial import util
from synapse.util.patch_inline_callbacks import do_patch
# attempt to do the patch before we load any synapse code
do_patch()
util.DEFAULT_TIMEOUT_DURATION = 20

View File

@@ -18,11 +18,18 @@
# [This file includes modifications made by New Vector Limited]
#
#
from typing import Dict, Protocol, Tuple
from typing import Dict, NoReturn, Protocol, Tuple
from prometheus_client.core import Sample
from synapse.metrics import REGISTRY, InFlightGauge, generate_latest
from synapse.metrics import (
REGISTRY,
SERVER_NAME_LABEL,
InFlightGauge,
LaterGauge,
all_later_gauges_to_clean_up_on_shutdown,
generate_latest,
)
from synapse.util.caches.deferred_cache import DeferredCache
from tests import unittest
@@ -285,6 +292,95 @@ class CacheMetricsTests(unittest.HomeserverTestCase):
self.assertEqual(hs2_cache_max_size_metric_value, "777.0")
class LaterGaugeTests(unittest.HomeserverTestCase):
def setUp(self) -> None:
super().setUp()
self.later_gauge = LaterGauge(
name="foo",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
def tearDown(self) -> None:
super().tearDown()
REGISTRY.unregister(self.later_gauge)
all_later_gauges_to_clean_up_on_shutdown.pop(self.later_gauge.name, None)
def test_later_gauge_multiple_servers(self) -> None:
"""
Test that LaterGauge metrics are reported correctly across multiple servers. We
will have an metrics entry for each homeserver that is labeled with the
`server_name` label.
"""
self.later_gauge.register_hook(
homeserver_instance_id="123", hook=lambda: {("hs1",): 1}
)
self.later_gauge.register_hook(
homeserver_instance_id="456", hook=lambda: {("hs2",): 2}
)
metrics_map = get_latest_metrics()
# Find the metrics from both homeservers
hs1_metric = 'foo{server_name="hs1"}'
hs1_metric_value = metrics_map.get(hs1_metric)
self.assertIsNotNone(
hs1_metric_value,
f"Missing metric {hs1_metric} in metrics {metrics_map}",
)
self.assertEqual(hs1_metric_value, "1.0")
hs2_metric = 'foo{server_name="hs2"}'
hs2_metric_value = metrics_map.get(hs2_metric)
self.assertIsNotNone(
hs2_metric_value,
f"Missing metric {hs2_metric} in metrics {metrics_map}",
)
self.assertEqual(hs2_metric_value, "2.0")
def test_later_gauge_hook_exception(self) -> None:
"""
Test that LaterGauge metrics are collected across multiple servers even if one
hooks is throwing an exception.
"""
def raise_exception() -> NoReturn:
raise Exception("fake error generating data")
# Make the hook for hs1 throw an exception
self.later_gauge.register_hook(
homeserver_instance_id="123", hook=raise_exception
)
# Metrics from hs2 still work fine
self.later_gauge.register_hook(
homeserver_instance_id="456", hook=lambda: {("hs2",): 2}
)
metrics_map = get_latest_metrics()
# Since we encountered an exception while trying to collect metrics from hs1, we
# don't expect to see it here.
hs1_metric = 'foo{server_name="hs1"}'
hs1_metric_value = metrics_map.get(hs1_metric)
self.assertIsNone(
hs1_metric_value,
(
"Since we encountered an exception while trying to collect metrics from hs1"
f"we don't expect to see it the metrics_map {metrics_map}"
),
)
# We should still see metrics from hs2 though
hs2_metric = 'foo{server_name="hs2"}'
hs2_metric_value = metrics_map.get(hs2_metric)
self.assertIsNotNone(
hs2_metric_value,
f"Missing metric {hs2_metric} in cache metrics {metrics_map}",
)
self.assertEqual(hs2_metric_value, "2.0")
def get_latest_metrics() -> Dict[str, str]:
"""
Collect the latest metrics from the registry and parse them into an easy to use map.

View File

@@ -32,7 +32,6 @@ from synapse.config.workers import InstanceTcpLocationConfig, InstanceUnixLocati
from synapse.http.site import SynapseRequest, SynapseSite
from synapse.replication.http import ReplicationRestResource
from synapse.replication.tcp.client import ReplicationDataHandler
from synapse.replication.tcp.handler import ReplicationCommandHandler
from synapse.replication.tcp.protocol import (
ClientReplicationStreamProtocol,
ServerReplicationStreamProtocol,
@@ -97,7 +96,7 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
self.test_handler = self._build_replication_data_handler()
self.worker_hs._replication_data_handler = self.test_handler # type: ignore[attr-defined]
repl_handler = ReplicationCommandHandler(self.worker_hs)
repl_handler = self.worker_hs.get_replication_command_handler()
self.client = ClientReplicationStreamProtocol(
self.worker_hs,
"client",

View File

@@ -1145,6 +1145,9 @@ def setup_test_homeserver(
reactor=reactor,
)
# Register the cleanup hook
cleanup_func(hs.cleanup)
# Install @cache_in_self attributes
for key, val in kwargs.items():
setattr(hs, "_" + key, val)

View File

@@ -355,7 +355,7 @@ class DescriptorTestCase(unittest.TestCase):
d = obj.fn(1)
self.assertEqual(
current_context(),
SENTINEL_CONTEXT,
c1,
)
yield d
self.fail("No exception thrown")
@@ -849,7 +849,7 @@ class CachedListDescriptorTestCase(unittest.TestCase):
# start the lookup off
d1 = obj.list_fn([10, 20], 2)
self.assertEqual(current_context(), SENTINEL_CONTEXT)
self.assertEqual(current_context(), c1)
r = yield d1
self.assertEqual(current_context(), c1)
obj.mock.assert_called_once_with({10, 20}, 2)