mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-11 01:40:27 +00:00
Compare commits
11 Commits
anoa/fix_m
...
madlittlem
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6ce2f3e59d | ||
|
|
93044f4c5b | ||
|
|
c7a80b63ec | ||
|
|
1f384b0e21 | ||
|
|
b2997a8f20 | ||
|
|
bff4a11b3f | ||
|
|
3e66e0a1b8 | ||
|
|
0c8759bbb6 | ||
|
|
4303879cfe | ||
|
|
3742b3b3fb | ||
|
|
224cb3f827 |
1
changelog.d/18791.misc
Normal file
1
changelog.d/18791.misc
Normal file
@@ -0,0 +1 @@
|
||||
Fix `LaterGauge` metrics to collect from all servers.
|
||||
1
changelog.d/18871.misc
Normal file
1
changelog.d/18871.misc
Normal file
@@ -0,0 +1 @@
|
||||
Store the `LoggingContext` in a `ContextVar` instead of a thread-local variable.
|
||||
1
changelog.d/18878.docker
Normal file
1
changelog.d/18878.docker
Normal file
@@ -0,0 +1 @@
|
||||
Suppress "Applying schema" log noise bulk when `SYNAPSE_LOG_TESTING` is set.
|
||||
@@ -77,6 +77,13 @@ loggers:
|
||||
#}
|
||||
synapse.visibility.filtered_event_debug:
|
||||
level: DEBUG
|
||||
|
||||
{#
|
||||
If Synapse is under test, we don't care about seeing the "Applying schema" log
|
||||
lines at the INFO level every time we run the tests (it's 100 lines of bulk)
|
||||
#}
|
||||
synapse.storage.prepare_database:
|
||||
level: WARN
|
||||
{% endif %}
|
||||
|
||||
root:
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
# Log Contexts
|
||||
|
||||
To help track the processing of individual requests, synapse uses a
|
||||
'`log context`' to track which request it is handling at any given
|
||||
moment. This is done via a thread-local variable; a `logging.Filter` is
|
||||
then used to fish the information back out of the thread-local variable
|
||||
`LoggingContext` to track which request it is handling at any given
|
||||
moment. This is done via a `ContextVar` variable; a `logging.Filter` is
|
||||
then used to fish the information back out of the `ContextVar` variable
|
||||
and add it to each log record.
|
||||
|
||||
Logcontexts are also used for CPU and database accounting, so that we
|
||||
Log contexts are also used for CPU and database accounting, so that we
|
||||
can track which requests were responsible for high CPU use or database
|
||||
activity.
|
||||
|
||||
@@ -14,18 +14,11 @@ The `synapse.logging.context` module provides facilities for managing
|
||||
the current log context (as well as providing the `LoggingContextFilter`
|
||||
class).
|
||||
|
||||
Asynchronous functions make the whole thing complicated, so this document describes
|
||||
how it all works, and how to write code which follows the rules.
|
||||
|
||||
In this document, "awaitable" refers to any object which can be `await`ed. In the context of
|
||||
Synapse, that normally means either a coroutine or a Twisted
|
||||
In this document, "awaitable" refers to any object which can be `await`ed. In the
|
||||
context of Synapse, that normally means either a coroutine or a Twisted
|
||||
[`Deferred`](https://twistedmatrix.com/documents/current/api/twisted.internet.defer.Deferred.html).
|
||||
|
||||
## Logcontexts without asynchronous code
|
||||
|
||||
In the absence of any asynchronous voodoo, things are simple enough. As with
|
||||
any code of this nature, the rule is that our function should leave
|
||||
things as it found them:
|
||||
## Basic usage
|
||||
|
||||
```python
|
||||
from synapse.logging import context # omitted from future snippets
|
||||
@@ -45,7 +38,7 @@ def do_request_handling():
|
||||
logger.debug("phew") # this will be logged against request_id
|
||||
```
|
||||
|
||||
LoggingContext implements the context management methods, so the above
|
||||
`LoggingContext` implements the context management methods, so the above
|
||||
can be written much more succinctly as:
|
||||
|
||||
```python
|
||||
@@ -59,197 +52,76 @@ def do_request_handling():
|
||||
logger.debug("phew")
|
||||
```
|
||||
|
||||
## Using logcontexts with awaitables
|
||||
### The `sentinel` context
|
||||
|
||||
Awaitables break the linear flow of code so that there is no longer a single entry point
|
||||
where we should set the logcontext and a single exit point where we should remove it.
|
||||
The default context is `context.SENTINEL_CONTEXT`, which is a sentinel value to
|
||||
represent the root context. This is what is used when there is no other context set.
|
||||
|
||||
Consider the example above, where `do_request_handling` needs to do some
|
||||
blocking operation, and returns an awaitable:
|
||||
No CPU/database usage metrics are recorded against the `sentinel` context.
|
||||
|
||||
Ideally, nothing from the Synapse homeserver would be logged against the `sentinel`
|
||||
context as we want to know where the logs came from. In practice, this is not always the
|
||||
case yet especially outside of request handling.
|
||||
|
||||
Previously, the `sentinel` context played a bigger role when we had to carefully deal
|
||||
with thread-local storage; as we had to make sure to not leak another context to another
|
||||
task after we gave up control to the reactor so we set the
|
||||
|
||||
|
||||
|
||||
### `PreserveLoggingContext`
|
||||
|
||||
In a similar vein of no longer as relevant, `PreserveLoggingContext` is another context
|
||||
manager helper and a little bit of syntactic sugar to set the current log context
|
||||
(without finishing it) and restore the previous context on exit.
|
||||
|
||||
```python
|
||||
async def handle_request(request_id):
|
||||
with context.LoggingContext() as request_context:
|
||||
request_context.request = request_id
|
||||
await do_request_handling()
|
||||
import logging
|
||||
from synapse.logging.context import LoggingContext
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def main() -> None:
|
||||
with context.LoggingContext("main"):
|
||||
task_context = context.LoggingContext("task")
|
||||
|
||||
with task_context:
|
||||
logger.debug("foo")
|
||||
|
||||
# Bad: will throw an error because `task_context` is already finished
|
||||
with task_context:
|
||||
logger.debug("bar")
|
||||
|
||||
logger.debug("finished")
|
||||
```
|
||||
|
||||
In the above flow:
|
||||
|
||||
- The logcontext is set
|
||||
- `do_request_handling` is called, and returns an awaitable
|
||||
- `handle_request` awaits the awaitable
|
||||
- Execution of `handle_request` is suspended
|
||||
|
||||
So we have stopped processing the request (and will probably go on to
|
||||
start processing the next), without clearing the logcontext.
|
||||
|
||||
To circumvent this problem, synapse code assumes that, wherever you have
|
||||
an awaitable, you will want to `await` it. To that end, wherever
|
||||
functions return awaitables, we adopt the following conventions:
|
||||
|
||||
**Rules for functions returning awaitables:**
|
||||
|
||||
> - If the awaitable is already complete, the function returns with the
|
||||
> same logcontext it started with.
|
||||
> - If the awaitable is incomplete, the function clears the logcontext
|
||||
> before returning; when the awaitable completes, it restores the
|
||||
> logcontext before running any callbacks.
|
||||
|
||||
That sounds complicated, but actually it means a lot of code (including
|
||||
the example above) "just works". There are two cases:
|
||||
|
||||
- If `do_request_handling` returns a completed awaitable, then the
|
||||
logcontext will still be in place. In this case, execution will
|
||||
continue immediately after the `await`; the "finished" line will
|
||||
be logged against the right context, and the `with` block restores
|
||||
the original context before we return to the caller.
|
||||
- If the returned awaitable is incomplete, `do_request_handling` clears
|
||||
the logcontext before returning. The logcontext is therefore clear
|
||||
when `handle_request` `await`s the awaitable.
|
||||
|
||||
Once `do_request_handling`'s awaitable completes, it will reinstate
|
||||
the logcontext, before running the second half of `handle_request`,
|
||||
so again the "finished" line will be logged against the right context,
|
||||
and the `with` block restores the original context.
|
||||
|
||||
As an aside, it's worth noting that `handle_request` follows our rules
|
||||
- though that only matters if the caller has its own logcontext which it
|
||||
cares about.
|
||||
|
||||
The following sections describe pitfalls and helpful patterns when
|
||||
implementing these rules.
|
||||
|
||||
Always await your awaitables
|
||||
----------------------------
|
||||
|
||||
Whenever you get an awaitable back from a function, you should `await` on
|
||||
it as soon as possible. Do not pass go; do not do any logging; do not
|
||||
call any other functions.
|
||||
This can be fixed by using `PreserveLoggingContext`:
|
||||
|
||||
```python
|
||||
async def fun():
|
||||
logger.debug("starting")
|
||||
await do_some_stuff() # just like this
|
||||
import logging
|
||||
from synapse.logging.context import LoggingContext
|
||||
|
||||
coro = more_stuff()
|
||||
result = await coro # also fine, of course
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
return result
|
||||
def main() -> None:
|
||||
with context.LoggingContext("main"):
|
||||
task_context = context.LoggingContext("task")
|
||||
|
||||
with PreserveLoggingContext(task_context):
|
||||
logger.debug("foo")
|
||||
with PreserveLoggingContext(task_context):
|
||||
logger.debug("bar")
|
||||
|
||||
logger.debug("finished") # this will be logged against main
|
||||
```
|
||||
|
||||
Provided this pattern is followed all the way back up to the callchain
|
||||
to where the logcontext was set, this will make things work out ok:
|
||||
provided `do_some_stuff` and `more_stuff` follow the rules above, then
|
||||
so will `fun`.
|
||||
Or you could equivalently just manage the log context manually via
|
||||
`set_current_context`.
|
||||
|
||||
It's all too easy to forget to `await`: for instance if we forgot that
|
||||
`do_some_stuff` returned an awaitable, we might plough on regardless. This
|
||||
leads to a mess; it will probably work itself out eventually, but not
|
||||
before a load of stuff has been logged against the wrong context.
|
||||
(Normally, other things will break, more obviously, if you forget to
|
||||
`await`, so this tends not to be a major problem in practice.)
|
||||
|
||||
Of course sometimes you need to do something a bit fancier with your
|
||||
awaitable - not all code follows the linear A-then-B-then-C pattern.
|
||||
Notes on implementing more complex patterns are in later sections.
|
||||
|
||||
## Where you create a new awaitable, make it follow the rules
|
||||
|
||||
Most of the time, an awaitable comes from another synapse function.
|
||||
Sometimes, though, we need to make up a new awaitable, or we get an awaitable
|
||||
back from external code. We need to make it follow our rules.
|
||||
|
||||
The easy way to do it is by using `context.make_deferred_yieldable`. Suppose we want to implement
|
||||
`sleep`, which returns a deferred which will run its callbacks after a
|
||||
given number of seconds. That might look like:
|
||||
|
||||
```python
|
||||
# not a logcontext-rules-compliant function
|
||||
def get_sleep_deferred(seconds):
|
||||
d = defer.Deferred()
|
||||
reactor.callLater(seconds, d.callback, None)
|
||||
return d
|
||||
```
|
||||
|
||||
That doesn't follow the rules, but we can fix it by calling it through
|
||||
`context.make_deferred_yieldable`:
|
||||
|
||||
```python
|
||||
async def sleep(seconds):
|
||||
return await context.make_deferred_yieldable(get_sleep_deferred(seconds))
|
||||
```
|
||||
|
||||
## Fire-and-forget
|
||||
|
||||
Sometimes you want to fire off a chain of execution, but not wait for
|
||||
its result. That might look a bit like this:
|
||||
|
||||
```python
|
||||
async def do_request_handling():
|
||||
await foreground_operation()
|
||||
|
||||
# *don't* do this
|
||||
background_operation()
|
||||
|
||||
logger.debug("Request handling complete")
|
||||
|
||||
async def background_operation():
|
||||
await first_background_step()
|
||||
logger.debug("Completed first step")
|
||||
await second_background_step()
|
||||
logger.debug("Completed second step")
|
||||
```
|
||||
|
||||
The above code does a couple of steps in the background after
|
||||
`do_request_handling` has finished. The log lines are still logged
|
||||
against the `request_context` logcontext, which may or may not be
|
||||
desirable. There are two big problems with the above, however. The first
|
||||
problem is that, if `background_operation` returns an incomplete
|
||||
awaitable, it will expect its caller to `await` immediately, so will have
|
||||
cleared the logcontext. In this example, that means that 'Request
|
||||
handling complete' will be logged without any context.
|
||||
|
||||
The second problem, which is potentially even worse, is that when the
|
||||
awaitable returned by `background_operation` completes, it will restore
|
||||
the original logcontext. There is nothing waiting on that awaitable, so
|
||||
the logcontext will leak into the reactor and possibly get attached to
|
||||
some arbitrary future operation.
|
||||
|
||||
There are two potential solutions to this.
|
||||
|
||||
One option is to surround the call to `background_operation` with a
|
||||
`PreserveLoggingContext` call. That will reset the logcontext before
|
||||
starting `background_operation` (so the context restored when the
|
||||
deferred completes will be the empty logcontext), and will restore the
|
||||
current logcontext before continuing the foreground process:
|
||||
|
||||
```python
|
||||
async def do_request_handling():
|
||||
await foreground_operation()
|
||||
|
||||
# start background_operation off in the empty logcontext, to
|
||||
# avoid leaking the current context into the reactor.
|
||||
with PreserveLoggingContext():
|
||||
background_operation()
|
||||
|
||||
# this will now be logged against the request context
|
||||
logger.debug("Request handling complete")
|
||||
```
|
||||
|
||||
Obviously that option means that the operations done in
|
||||
`background_operation` would be not be logged against a logcontext
|
||||
(though that might be fixed by setting a different logcontext via a
|
||||
`with LoggingContext(...)` in `background_operation`).
|
||||
|
||||
The second option is to use `context.run_in_background`, which wraps a
|
||||
function so that it doesn't reset the logcontext even when it returns
|
||||
an incomplete awaitable, and adds a callback to the returned awaitable to
|
||||
reset the logcontext. In other words, it turns a function that follows
|
||||
the Synapse rules about logcontexts and awaitables into one which behaves
|
||||
more like an external function --- the opposite operation to that
|
||||
described in the previous section. It can be used like this:
|
||||
To drive an awaitable in the background, you can use `context.run_in_background`:
|
||||
|
||||
```python
|
||||
async def do_request_handling():
|
||||
@@ -261,104 +133,13 @@ async def do_request_handling():
|
||||
logger.debug("Request handling complete")
|
||||
```
|
||||
|
||||
## Passing synapse deferreds into third-party functions
|
||||
|
||||
A typical example of this is where we want to collect together two or
|
||||
more awaitables via `defer.gatherResults`:
|
||||
|
||||
```python
|
||||
a1 = operation1()
|
||||
a2 = operation2()
|
||||
a3 = defer.gatherResults([a1, a2])
|
||||
```
|
||||
|
||||
This is really a variation of the fire-and-forget problem above, in that
|
||||
we are firing off `a1` and `a2` without awaiting on them. The difference
|
||||
is that we now have third-party code attached to their callbacks. Anyway
|
||||
either technique given in the [Fire-and-forget](#fire-and-forget)
|
||||
section will work.
|
||||
|
||||
Of course, the new awaitable returned by `gather` needs to be
|
||||
wrapped in order to make it follow the logcontext rules before we can
|
||||
yield it, as described in [Where you create a new awaitable, make it
|
||||
follow the
|
||||
rules](#where-you-create-a-new-awaitable-make-it-follow-the-rules).
|
||||
|
||||
So, option one: reset the logcontext before starting the operations to
|
||||
be gathered:
|
||||
|
||||
```python
|
||||
async def do_request_handling():
|
||||
with PreserveLoggingContext():
|
||||
a1 = operation1()
|
||||
a2 = operation2()
|
||||
result = await defer.gatherResults([a1, a2])
|
||||
```
|
||||
|
||||
In this case particularly, though, option two, of using
|
||||
`context.run_in_background` almost certainly makes more sense, so that
|
||||
`operation1` and `operation2` are both logged against the original
|
||||
logcontext. This looks like:
|
||||
|
||||
```python
|
||||
async def do_request_handling():
|
||||
a1 = context.run_in_background(operation1)
|
||||
a2 = context.run_in_background(operation2)
|
||||
|
||||
result = await make_deferred_yieldable(defer.gatherResults([a1, a2]))
|
||||
result = await defer.gatherResults([a1, a2])
|
||||
```
|
||||
|
||||
## A note on garbage-collection of awaitable chains
|
||||
|
||||
It turns out that our logcontext rules do not play nicely with awaitable
|
||||
chains which get orphaned and garbage-collected.
|
||||
|
||||
Imagine we have some code that looks like this:
|
||||
|
||||
```python
|
||||
listener_queue = []
|
||||
|
||||
def on_something_interesting():
|
||||
for d in listener_queue:
|
||||
d.callback("foo")
|
||||
|
||||
async def await_something_interesting():
|
||||
new_awaitable = defer.Deferred()
|
||||
listener_queue.append(new_awaitable)
|
||||
|
||||
with PreserveLoggingContext():
|
||||
await new_awaitable
|
||||
```
|
||||
|
||||
Obviously, the idea here is that we have a bunch of things which are
|
||||
waiting for an event. (It's just an example of the problem here, but a
|
||||
relatively common one.)
|
||||
|
||||
Now let's imagine two further things happen. First of all, whatever was
|
||||
waiting for the interesting thing goes away. (Perhaps the request times
|
||||
out, or something *even more* interesting happens.)
|
||||
|
||||
Secondly, let's suppose that we decide that the interesting thing is
|
||||
never going to happen, and we reset the listener queue:
|
||||
|
||||
```python
|
||||
def reset_listener_queue():
|
||||
listener_queue.clear()
|
||||
```
|
||||
|
||||
So, both ends of the awaitable chain have now dropped their references,
|
||||
and the awaitable chain is now orphaned, and will be garbage-collected at
|
||||
some point. Note that `await_something_interesting` is a coroutine,
|
||||
which Python implements as a generator function. When Python
|
||||
garbage-collects generator functions, it gives them a chance to
|
||||
clean up by making the `await` (or `yield`) raise a `GeneratorExit`
|
||||
exception. In our case, that means that the `__exit__` handler of
|
||||
`PreserveLoggingContext` will carefully restore the request context, but
|
||||
there is now nothing waiting for its return, so the request context is
|
||||
never cleared.
|
||||
|
||||
To reiterate, this problem only arises when *both* ends of a awaitable
|
||||
chain are dropped. Dropping the the reference to an awaitable you're
|
||||
supposed to be awaiting is bad practice, so this doesn't
|
||||
actually happen too much. Unfortunately, when it does happen, it will
|
||||
lead to leaked logcontexts which are incredibly hard to track down.
|
||||
`background_process_metrics.run_as_background_process` also exists if you want some
|
||||
automatic tracing and metrics for the background task.
|
||||
|
||||
@@ -86,12 +86,5 @@ import synapse.util # noqa: E402
|
||||
|
||||
__version__ = synapse.util.SYNAPSE_VERSION
|
||||
|
||||
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
||||
# We import here so that we don't have to install a bunch of deps when
|
||||
# running the packaging tox test.
|
||||
from synapse.util.patch_inline_callbacks import do_patch
|
||||
|
||||
do_patch()
|
||||
|
||||
|
||||
check_rust_lib_up_to_date()
|
||||
|
||||
@@ -153,9 +153,13 @@ def get_registered_paths_for_default(
|
||||
"""
|
||||
|
||||
hs = MockHomeserver(base_config, worker_app)
|
||||
|
||||
# TODO We only do this to avoid an error, but don't need the database etc
|
||||
hs.setup()
|
||||
return get_registered_paths_for_hs(hs)
|
||||
registered_paths = get_registered_paths_for_hs(hs)
|
||||
hs.cleanup()
|
||||
|
||||
return registered_paths
|
||||
|
||||
|
||||
def elide_http_methods_if_unconflicting(
|
||||
|
||||
@@ -99,6 +99,7 @@ from synapse.storage.engines import create_engine
|
||||
from synapse.storage.prepare_database import prepare_database
|
||||
from synapse.types import ISynapseReactor
|
||||
from synapse.util import SYNAPSE_VERSION, Clock
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
# Cast safety: Twisted does some naughty magic which replaces the
|
||||
# twisted.internet.reactor module with a Reactor instance at runtime.
|
||||
@@ -323,6 +324,7 @@ class MockHomeserver:
|
||||
self.config = config
|
||||
self.hostname = config.server.server_name
|
||||
self.version_string = SYNAPSE_VERSION
|
||||
self.instance_id = random_string(5)
|
||||
|
||||
def get_clock(self) -> Clock:
|
||||
return self.clock
|
||||
@@ -330,6 +332,9 @@ class MockHomeserver:
|
||||
def get_reactor(self) -> ISynapseReactor:
|
||||
return reactor
|
||||
|
||||
def get_instance_id(self) -> str:
|
||||
return self.instance_id
|
||||
|
||||
def get_instance_name(self) -> str:
|
||||
return "master"
|
||||
|
||||
@@ -685,7 +690,15 @@ class Porter:
|
||||
)
|
||||
prepare_database(db_conn, engine, config=self.hs_config)
|
||||
# Type safety: ignore that we're using Mock homeservers here.
|
||||
store = Store(DatabasePool(hs, db_config, engine), db_conn, hs) # type: ignore[arg-type]
|
||||
store = Store(
|
||||
DatabasePool(
|
||||
hs, # type: ignore[arg-type]
|
||||
db_config,
|
||||
engine,
|
||||
),
|
||||
db_conn,
|
||||
hs, # type: ignore[arg-type]
|
||||
)
|
||||
db_conn.commit()
|
||||
|
||||
return store
|
||||
|
||||
@@ -601,6 +601,12 @@ async def start(hs: "HomeServer") -> None:
|
||||
hs.get_datastores().main.db_pool.start_profiling()
|
||||
hs.get_pusherpool().start()
|
||||
|
||||
# Register background tasks required by this server. This must be done
|
||||
# somewhat manually due to the background tasks not being registered
|
||||
# unless handlers are instantiated.
|
||||
if hs.config.worker.run_background_tasks:
|
||||
hs.start_background_tasks()
|
||||
|
||||
# Log when we start the shut down process.
|
||||
hs.get_reactor().addSystemEventTrigger(
|
||||
"before", "shutdown", logger.info, "Shutting down..."
|
||||
|
||||
@@ -37,6 +37,7 @@ Events are replicated via a separate events stream.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from enum import Enum
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Dict,
|
||||
@@ -67,6 +68,25 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class QueueNames(str, Enum):
|
||||
PRESENCE_MAP = "presence_map"
|
||||
KEYED_EDU = "keyed_edu"
|
||||
KEYED_EDU_CHANGED = "keyed_edu_changed"
|
||||
EDUS = "edus"
|
||||
POS_TIME = "pos_time"
|
||||
PRESENCE_DESTINATIONS = "presence_destinations"
|
||||
|
||||
|
||||
queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {}
|
||||
|
||||
for queue_name in QueueNames:
|
||||
queue_name_to_gauge_map[queue_name] = LaterGauge(
|
||||
name=f"synapse_federation_send_queue_{queue_name.value}_size",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
class FederationRemoteSendQueue(AbstractFederationSender):
|
||||
"""A drop in replacement for FederationSender"""
|
||||
|
||||
@@ -111,23 +131,16 @@ class FederationRemoteSendQueue(AbstractFederationSender):
|
||||
# we make a new function, so we need to make a new function so the inner
|
||||
# lambda binds to the queue rather than to the name of the queue which
|
||||
# changes. ARGH.
|
||||
def register(name: str, queue: Sized) -> None:
|
||||
LaterGauge(
|
||||
name="synapse_federation_send_queue_%s_size" % (queue_name,),
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): len(queue)},
|
||||
def register(queue_name: QueueNames, queue: Sized) -> None:
|
||||
queue_name_to_gauge_map[queue_name].register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {(self.server_name,): len(queue)},
|
||||
)
|
||||
|
||||
for queue_name in [
|
||||
"presence_map",
|
||||
"keyed_edu",
|
||||
"keyed_edu_changed",
|
||||
"edus",
|
||||
"pos_time",
|
||||
"presence_destinations",
|
||||
]:
|
||||
register(queue_name, getattr(self, queue_name))
|
||||
for queue_name in QueueNames:
|
||||
queue = getattr(self, queue_name.value)
|
||||
assert isinstance(queue, Sized)
|
||||
register(queue_name, queue=queue)
|
||||
|
||||
self.clock.looping_call(self._clear_queue, 30 * 1000)
|
||||
|
||||
|
||||
@@ -199,6 +199,24 @@ sent_pdus_destination_dist_total = Counter(
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
transaction_queue_pending_destinations_gauge = LaterGauge(
|
||||
name="synapse_federation_transaction_queue_pending_destinations",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
transaction_queue_pending_pdus_gauge = LaterGauge(
|
||||
name="synapse_federation_transaction_queue_pending_pdus",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
transaction_queue_pending_edus_gauge = LaterGauge(
|
||||
name="synapse_federation_transaction_queue_pending_edus",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
# Time (in s) to wait before trying to wake up destinations that have
|
||||
# catch-up outstanding.
|
||||
# Please note that rate limiting still applies, so while the loop is
|
||||
@@ -398,11 +416,9 @@ class FederationSender(AbstractFederationSender):
|
||||
# map from destination to PerDestinationQueue
|
||||
self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
|
||||
|
||||
LaterGauge(
|
||||
name="synapse_federation_transaction_queue_pending_destinations",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
transaction_queue_pending_destinations_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {
|
||||
(self.server_name,): sum(
|
||||
1
|
||||
for d in self._per_destination_queues.values()
|
||||
@@ -410,22 +426,17 @@ class FederationSender(AbstractFederationSender):
|
||||
)
|
||||
},
|
||||
)
|
||||
|
||||
LaterGauge(
|
||||
name="synapse_federation_transaction_queue_pending_pdus",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
transaction_queue_pending_pdus_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {
|
||||
(self.server_name,): sum(
|
||||
d.pending_pdu_count() for d in self._per_destination_queues.values()
|
||||
)
|
||||
},
|
||||
)
|
||||
LaterGauge(
|
||||
name="synapse_federation_transaction_queue_pending_edus",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
transaction_queue_pending_edus_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {
|
||||
(self.server_name,): sum(
|
||||
d.pending_edu_count() for d in self._per_destination_queues.values()
|
||||
)
|
||||
|
||||
@@ -173,6 +173,18 @@ state_transition_counter = Counter(
|
||||
labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
presence_user_to_current_state_size_gauge = LaterGauge(
|
||||
name="synapse_handlers_presence_user_to_current_state_size",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
presence_wheel_timer_size_gauge = LaterGauge(
|
||||
name="synapse_handlers_presence_wheel_timer_size",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
|
||||
# "currently_active"
|
||||
LAST_ACTIVE_GRANULARITY = 60 * 1000
|
||||
@@ -779,11 +791,9 @@ class PresenceHandler(BasePresenceHandler):
|
||||
EduTypes.PRESENCE, self.incoming_presence
|
||||
)
|
||||
|
||||
LaterGauge(
|
||||
name="synapse_handlers_presence_user_to_current_state_size",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): len(self.user_to_current_state)},
|
||||
presence_user_to_current_state_size_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {(self.server_name,): len(self.user_to_current_state)},
|
||||
)
|
||||
|
||||
# The per-device presence state, maps user to devices to per-device presence state.
|
||||
@@ -882,11 +892,9 @@ class PresenceHandler(BasePresenceHandler):
|
||||
60 * 1000,
|
||||
)
|
||||
|
||||
LaterGauge(
|
||||
name="synapse_handlers_presence_wheel_timer_size",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): len(self.wheel_timer)},
|
||||
presence_wheel_timer_size_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {(self.server_name,): len(self.wheel_timer)},
|
||||
)
|
||||
|
||||
# Used to handle sending of presence to newly joined users/servers
|
||||
|
||||
@@ -164,11 +164,13 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
|
||||
return counts
|
||||
|
||||
|
||||
LaterGauge(
|
||||
in_flight_requests = LaterGauge(
|
||||
name="synapse_http_server_in_flight_requests_count",
|
||||
desc="",
|
||||
labelnames=["method", "servlet", SERVER_NAME_LABEL],
|
||||
caller=_get_in_flight_counts,
|
||||
)
|
||||
in_flight_requests.register_hook(
|
||||
homeserver_instance_id=None, hook=_get_in_flight_counts
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -34,6 +34,7 @@ import logging
|
||||
import threading
|
||||
import typing
|
||||
import warnings
|
||||
from contextvars import ContextVar
|
||||
from types import TracebackType
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
@@ -653,13 +654,12 @@ class PreserveLoggingContext:
|
||||
)
|
||||
|
||||
|
||||
_thread_local = threading.local()
|
||||
_thread_local.current_context = SENTINEL_CONTEXT
|
||||
_current_context: ContextVar[LoggingContextOrSentinel] = ContextVar("current_context")
|
||||
|
||||
|
||||
def current_context() -> LoggingContextOrSentinel:
|
||||
"""Get the current logging context from thread local storage"""
|
||||
return getattr(_thread_local, "current_context", SENTINEL_CONTEXT)
|
||||
return _current_context.get(SENTINEL_CONTEXT)
|
||||
|
||||
|
||||
def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSentinel:
|
||||
@@ -680,7 +680,7 @@ def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSe
|
||||
if current is not context:
|
||||
rusage = get_thread_resource_usage()
|
||||
current.stop(rusage)
|
||||
_thread_local.current_context = context
|
||||
_current_context.set(context)
|
||||
context.start(rusage)
|
||||
|
||||
return current
|
||||
@@ -796,7 +796,6 @@ def run_in_background(
|
||||
CRITICAL error about an unhandled error will be logged without much
|
||||
indication about where it came from.
|
||||
"""
|
||||
current = current_context()
|
||||
try:
|
||||
res = f(*args, **kwargs)
|
||||
except Exception:
|
||||
@@ -825,23 +824,6 @@ def run_in_background(
|
||||
# optimise out the messing about
|
||||
return d
|
||||
|
||||
# The function may have reset the context before returning, so
|
||||
# we need to restore it now.
|
||||
ctx = set_current_context(current)
|
||||
|
||||
# The original context will be restored when the deferred
|
||||
# completes, but there is nothing waiting for it, so it will
|
||||
# get leaked into the reactor or some other function which
|
||||
# wasn't expecting it. We therefore need to reset the context
|
||||
# here.
|
||||
#
|
||||
# (If this feels asymmetric, consider it this way: we are
|
||||
# effectively forking a new thread of execution. We are
|
||||
# probably currently within a ``with LoggingContext()`` block,
|
||||
# which is supposed to have a single entry and exit point. But
|
||||
# by spawning off another deferred, we are effectively
|
||||
# adding a new exit point.)
|
||||
d.addBoth(_set_context_cb, ctx)
|
||||
return d
|
||||
|
||||
|
||||
@@ -861,65 +843,20 @@ def run_coroutine_in_background(
|
||||
cannot change the log contexts.
|
||||
"""
|
||||
|
||||
current = current_context()
|
||||
d = defer.ensureDeferred(coroutine)
|
||||
|
||||
# The function may have reset the context before returning, so
|
||||
# we need to restore it now.
|
||||
ctx = set_current_context(current)
|
||||
|
||||
# The original context will be restored when the deferred
|
||||
# completes, but there is nothing waiting for it, so it will
|
||||
# get leaked into the reactor or some other function which
|
||||
# wasn't expecting it. We therefore need to reset the context
|
||||
# here.
|
||||
#
|
||||
# (If this feels asymmetric, consider it this way: we are
|
||||
# effectively forking a new thread of execution. We are
|
||||
# probably currently within a ``with LoggingContext()`` block,
|
||||
# which is supposed to have a single entry and exit point. But
|
||||
# by spawning off another deferred, we are effectively
|
||||
# adding a new exit point.)
|
||||
d.addBoth(_set_context_cb, ctx)
|
||||
return d
|
||||
return defer.ensureDeferred(coroutine)
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
# TODO: This function is a no-op now and should be removed in a follow-up PR.
|
||||
def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
|
||||
"""Given a deferred, make it follow the Synapse logcontext rules:
|
||||
|
||||
If the deferred has completed, essentially does nothing (just returns another
|
||||
completed deferred with the result/failure).
|
||||
|
||||
If the deferred has not yet completed, resets the logcontext before
|
||||
returning a deferred. Then, when the deferred completes, restores the
|
||||
current logcontext before running callbacks/errbacks.
|
||||
|
||||
(This is more-or-less the opposite operation to run_in_background.)
|
||||
"""
|
||||
if deferred.called and not deferred.paused:
|
||||
# it looks like this deferred is ready to run any callbacks we give it
|
||||
# immediately. We may as well optimise out the logcontext faffery.
|
||||
return deferred
|
||||
|
||||
# ok, we can't be sure that a yield won't block, so let's reset the
|
||||
# logcontext, and add a callback to the deferred to restore it.
|
||||
prev_context = set_current_context(SENTINEL_CONTEXT)
|
||||
deferred.addBoth(_set_context_cb, prev_context)
|
||||
return deferred
|
||||
|
||||
|
||||
ResultT = TypeVar("ResultT")
|
||||
|
||||
|
||||
def _set_context_cb(result: ResultT, context: LoggingContextOrSentinel) -> ResultT:
|
||||
"""A callback function which just sets the logging context"""
|
||||
set_current_context(context)
|
||||
return result
|
||||
|
||||
|
||||
def defer_to_thread(
|
||||
reactor: "ISynapseReactor", f: Callable[P, R], *args: P.args, **kwargs: P.kwargs
|
||||
) -> "defer.Deferred[R]":
|
||||
@@ -931,9 +868,6 @@ def defer_to_thread(
|
||||
logcontext (so its CPU usage metrics will get attributed to the current
|
||||
logcontext). `f` should preserve the logcontext it is given.
|
||||
|
||||
The result deferred follows the Synapse logcontext rules: you should `yield`
|
||||
on it.
|
||||
|
||||
Args:
|
||||
reactor: The reactor in whose main thread the Deferred will be invoked,
|
||||
and whose threadpool we should use for the function.
|
||||
@@ -971,9 +905,6 @@ def defer_to_threadpool(
|
||||
logcontext (so its CPU usage metrics will get attributed to the current
|
||||
logcontext). `f` should preserve the logcontext it is given.
|
||||
|
||||
The result deferred follows the Synapse logcontext rules: you should `yield`
|
||||
on it.
|
||||
|
||||
Args:
|
||||
reactor: The reactor in whose main thread the Deferred will be invoked.
|
||||
Normally this will be hs.get_reactor().
|
||||
@@ -991,18 +922,6 @@ def defer_to_threadpool(
|
||||
A Deferred which fires a callback with the result of `f`, or an
|
||||
errback if `f` throws an exception.
|
||||
"""
|
||||
curr_context = current_context()
|
||||
if not curr_context:
|
||||
logger.warning(
|
||||
"Calling defer_to_threadpool from sentinel context: metrics will be lost"
|
||||
return make_deferred_yieldable(
|
||||
threads.deferToThreadPool(reactor, threadpool, f, *args, **kwargs)
|
||||
)
|
||||
parent_context = None
|
||||
else:
|
||||
assert isinstance(curr_context, LoggingContext)
|
||||
parent_context = curr_context
|
||||
|
||||
def g() -> R:
|
||||
with LoggingContext(str(curr_context), parent_context=parent_context):
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g))
|
||||
|
||||
@@ -73,8 +73,6 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
METRICS_PREFIX = "/_synapse/metrics"
|
||||
|
||||
all_gauges: Dict[str, Collector] = {}
|
||||
|
||||
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
|
||||
|
||||
SERVER_NAME_LABEL = "server_name"
|
||||
@@ -163,42 +161,110 @@ class LaterGauge(Collector):
|
||||
name: str
|
||||
desc: str
|
||||
labelnames: Optional[StrSequence] = attr.ib(hash=False)
|
||||
# callback: should either return a value (if there are no labels for this metric),
|
||||
# or dict mapping from a label tuple to a value
|
||||
caller: Callable[
|
||||
_instance_id_to_hook_map: Dict[
|
||||
Optional[str], # instance_id
|
||||
Callable[
|
||||
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
|
||||
]
|
||||
],
|
||||
] = attr.ib(factory=dict, hash=False)
|
||||
"""
|
||||
Map from homeserver instance_id to a callback. Each callback should either return a
|
||||
value (if there are no labels for this metric), or dict mapping from a label tuple
|
||||
to a value.
|
||||
|
||||
We use `instance_id` instead of `server_name` because it's possible to have multiple
|
||||
workers running in the same process with the same `server_name`.
|
||||
"""
|
||||
|
||||
def collect(self) -> Iterable[Metric]:
|
||||
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
|
||||
# (we don't enforce it here, one level up).
|
||||
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]
|
||||
|
||||
for homeserver_instance_id, hook in self._instance_id_to_hook_map.items():
|
||||
try:
|
||||
calls = self.caller()
|
||||
hook_result = hook()
|
||||
except Exception:
|
||||
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
|
||||
yield g
|
||||
return
|
||||
logger.exception(
|
||||
"Exception running callback for LaterGauge(%s) for homeserver_instance_id=%s",
|
||||
self.name,
|
||||
homeserver_instance_id,
|
||||
)
|
||||
# Continue to return the rest of the metrics that aren't broken
|
||||
continue
|
||||
|
||||
if isinstance(calls, (int, float)):
|
||||
g.add_metric([], calls)
|
||||
if isinstance(hook_result, (int, float)):
|
||||
g.add_metric([], hook_result)
|
||||
else:
|
||||
for k, v in calls.items():
|
||||
for k, v in hook_result.items():
|
||||
g.add_metric(k, v)
|
||||
|
||||
yield g
|
||||
|
||||
def register_hook(
|
||||
self,
|
||||
*,
|
||||
homeserver_instance_id: Optional[str],
|
||||
hook: Callable[
|
||||
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
|
||||
],
|
||||
) -> None:
|
||||
"""
|
||||
Register a callback/hook that will be called to generate a metric samples for
|
||||
the gauge.
|
||||
|
||||
Args:
|
||||
homeserver_instance_id: The unique ID for this Synapse process instance
|
||||
(`hs.get_instance_id()`) that this hook is associated with. This can be used
|
||||
later to lookup all hooks associated with a given server name in order to
|
||||
unregister them. This should only be omitted for global hooks that work
|
||||
across all homeservers.
|
||||
hook: A callback that should either return a value (if there are no
|
||||
labels for this metric), or dict mapping from a label tuple to a value
|
||||
"""
|
||||
# We shouldn't have multiple hooks registered for the same homeserver `instance_id`.
|
||||
existing_hook = self._instance_id_to_hook_map.get(homeserver_instance_id)
|
||||
assert existing_hook is None, (
|
||||
f"LaterGauge(name={self.name}) hook already registered for homeserver_instance_id={homeserver_instance_id}. "
|
||||
"This is likely a Synapse bug and you forgot to unregister the previous hooks for "
|
||||
"the server (especially in tests)."
|
||||
)
|
||||
|
||||
self._instance_id_to_hook_map[homeserver_instance_id] = hook
|
||||
|
||||
def unregister_hooks_for_homeserver_instance_id(
|
||||
self, homeserver_instance_id: str
|
||||
) -> None:
|
||||
"""
|
||||
Unregister all hooks associated with the given homeserver `instance_id`. This should be
|
||||
called when a homeserver is shutdown to avoid extra hooks sitting around.
|
||||
|
||||
Args:
|
||||
homeserver_instance_id: The unique ID for this Synapse process instance to
|
||||
unregister hooks for (`hs.get_instance_id()`).
|
||||
"""
|
||||
self._instance_id_to_hook_map.pop(homeserver_instance_id, None)
|
||||
|
||||
def __attrs_post_init__(self) -> None:
|
||||
self._register()
|
||||
|
||||
def _register(self) -> None:
|
||||
if self.name in all_gauges.keys():
|
||||
logger.warning("%s already registered, reregistering", self.name)
|
||||
REGISTRY.unregister(all_gauges.pop(self.name))
|
||||
|
||||
REGISTRY.register(self)
|
||||
all_gauges[self.name] = self
|
||||
|
||||
# We shouldn't have multiple metrics with the same name. Typically, metrics
|
||||
# should be created globally so you shouldn't be running into this and this will
|
||||
# catch any stupid mistakes. The `REGISTRY.register(self)` call above will also
|
||||
# raise an error if the metric already exists but to make things explicit, we'll
|
||||
# also check here.
|
||||
existing_gauge = all_later_gauges_to_clean_up_on_shutdown.get(self.name)
|
||||
assert existing_gauge is None, f"LaterGauge(name={self.name}) already exists. "
|
||||
|
||||
# Keep track of the gauge so we can clean it up later.
|
||||
all_later_gauges_to_clean_up_on_shutdown[self.name] = self
|
||||
|
||||
|
||||
all_later_gauges_to_clean_up_on_shutdown: Dict[str, LaterGauge] = {}
|
||||
"""
|
||||
Track all `LaterGauge` instances so we can remove any associated hooks during homeserver
|
||||
shutdown.
|
||||
"""
|
||||
|
||||
|
||||
# `MetricsEntry` only makes sense when it is a `Protocol`,
|
||||
@@ -250,7 +316,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
|
||||
# Protects access to _registrations
|
||||
self._lock = threading.Lock()
|
||||
|
||||
self._register_with_collector()
|
||||
REGISTRY.register(self)
|
||||
|
||||
def register(
|
||||
self,
|
||||
@@ -341,14 +407,6 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
|
||||
gauge.add_metric(labels=key, value=getattr(metrics, name))
|
||||
yield gauge
|
||||
|
||||
def _register_with_collector(self) -> None:
|
||||
if self.name in all_gauges.keys():
|
||||
logger.warning("%s already registered, reregistering", self.name)
|
||||
REGISTRY.unregister(all_gauges.pop(self.name))
|
||||
|
||||
REGISTRY.register(self)
|
||||
all_gauges[self.name] = self
|
||||
|
||||
|
||||
class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
|
||||
"""
|
||||
|
||||
@@ -223,10 +223,9 @@ def run_as_background_process(
|
||||
This should be used to wrap processes which are fired off to run in the
|
||||
background, instead of being associated with a particular request.
|
||||
|
||||
It returns a Deferred which completes when the function completes, but it doesn't
|
||||
follow the synapse logcontext rules, which makes it appropriate for passing to
|
||||
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
|
||||
normal synapse async function).
|
||||
It returns a Deferred which completes when the function completes, which makes it
|
||||
appropriate for passing to clock.looping_call and friends (or for
|
||||
firing-and-forgetting in the middle of a normal synapse async function).
|
||||
|
||||
Args:
|
||||
desc: a description for this background process type
|
||||
@@ -241,8 +240,6 @@ def run_as_background_process(
|
||||
|
||||
Returns:
|
||||
Deferred which returns the result of func, or `None` if func raises.
|
||||
Note that the returned Deferred does not follow the synapse logcontext
|
||||
rules.
|
||||
"""
|
||||
|
||||
async def run() -> Optional[R]:
|
||||
|
||||
@@ -237,10 +237,9 @@ def run_as_background_process(
|
||||
This should be used to wrap processes which are fired off to run in the
|
||||
background, instead of being associated with a particular request.
|
||||
|
||||
It returns a Deferred which completes when the function completes, but it doesn't
|
||||
follow the synapse logcontext rules, which makes it appropriate for passing to
|
||||
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
|
||||
normal synapse async function).
|
||||
It returns a Deferred which completes when the function completes, which makes it
|
||||
appropriate for passing to clock.looping_call and friends (or for
|
||||
firing-and-forgetting in the middle of a normal synapse async function).
|
||||
|
||||
Args:
|
||||
desc: a description for this background process type
|
||||
@@ -255,8 +254,6 @@ def run_as_background_process(
|
||||
|
||||
Returns:
|
||||
Deferred which returns the result of func, or `None` if func raises.
|
||||
Note that the returned Deferred does not follow the synapse logcontext
|
||||
rules.
|
||||
"""
|
||||
|
||||
logger.warning(
|
||||
@@ -1375,9 +1372,7 @@ class ModuleApi:
|
||||
|
||||
Args:
|
||||
f: The function to call repeatedly. f can be either synchronous or
|
||||
asynchronous, and must follow Synapse's logcontext rules.
|
||||
More info about logcontexts is available at
|
||||
https://element-hq.github.io/synapse/latest/log_contexts.html
|
||||
asynchronous.
|
||||
msec: How long to wait between calls in milliseconds.
|
||||
*args: Positional arguments to pass to function.
|
||||
desc: The background task's description. Default to the function's name.
|
||||
@@ -1431,9 +1426,7 @@ class ModuleApi:
|
||||
Args:
|
||||
msec: How long to wait before calling, in milliseconds.
|
||||
f: The function to call once. f can be either synchronous or
|
||||
asynchronous, and must follow Synapse's logcontext rules.
|
||||
More info about logcontexts is available at
|
||||
https://element-hq.github.io/synapse/latest/log_contexts.html
|
||||
asynchronous.
|
||||
*args: Positional arguments to pass to function.
|
||||
desc: The background task's description. Default to the function's name.
|
||||
**kwargs: Keyword arguments to pass to function.
|
||||
@@ -1668,10 +1661,9 @@ class ModuleApi:
|
||||
This should be used to wrap processes which are fired off to run in the
|
||||
background, instead of being associated with a particular request.
|
||||
|
||||
It returns a Deferred which completes when the function completes, but it doesn't
|
||||
follow the synapse logcontext rules, which makes it appropriate for passing to
|
||||
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
|
||||
normal synapse async function).
|
||||
It returns a Deferred which completes when the function completes, which makes
|
||||
it appropriate for passing to clock.looping_call and friends (or for
|
||||
firing-and-forgetting in the middle of a normal synapse async function).
|
||||
|
||||
Args:
|
||||
desc: a description for this background process type
|
||||
@@ -1686,8 +1678,6 @@ class ModuleApi:
|
||||
|
||||
Returns:
|
||||
Deferred which returns the result of func, or `None` if func raises.
|
||||
Note that the returned Deferred does not follow the synapse logcontext
|
||||
rules.
|
||||
"""
|
||||
return _run_as_background_process(
|
||||
desc, self.server_name, func, *args, bg_start_span=bg_start_span, **kwargs
|
||||
|
||||
@@ -86,6 +86,24 @@ users_woken_by_stream_counter = Counter(
|
||||
labelnames=["stream", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
notifier_listeners_gauge = LaterGauge(
|
||||
name="synapse_notifier_listeners",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
notifier_rooms_gauge = LaterGauge(
|
||||
name="synapse_notifier_rooms",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
notifier_users_gauge = LaterGauge(
|
||||
name="synapse_notifier_users",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
@@ -281,28 +299,20 @@ class Notifier:
|
||||
)
|
||||
}
|
||||
|
||||
LaterGauge(
|
||||
name="synapse_notifier_listeners",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=count_listeners,
|
||||
notifier_listeners_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(), hook=count_listeners
|
||||
)
|
||||
|
||||
LaterGauge(
|
||||
name="synapse_notifier_rooms",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
notifier_rooms_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {
|
||||
(self.server_name,): count(
|
||||
bool, list(self.room_to_user_streams.values())
|
||||
)
|
||||
},
|
||||
)
|
||||
LaterGauge(
|
||||
name="synapse_notifier_users",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): len(self.user_to_user_stream)},
|
||||
notifier_users_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {(self.server_name,): len(self.user_to_user_stream)},
|
||||
)
|
||||
|
||||
def add_replication_callback(self, cb: Callable[[], None]) -> None:
|
||||
|
||||
@@ -106,6 +106,18 @@ user_ip_cache_counter = Counter(
|
||||
"synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL]
|
||||
)
|
||||
|
||||
tcp_resource_total_connections_gauge = LaterGauge(
|
||||
name="synapse_replication_tcp_resource_total_connections",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
tcp_command_queue_gauge = LaterGauge(
|
||||
name="synapse_replication_tcp_command_queue",
|
||||
desc="Number of inbound RDATA/POSITION commands queued for processing",
|
||||
labelnames=["stream_name", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
# the type of the entries in _command_queues_by_stream
|
||||
_StreamCommandQueue = Deque[
|
||||
@@ -243,11 +255,9 @@ class ReplicationCommandHandler:
|
||||
# outgoing replication commands to.)
|
||||
self._connections: List[IReplicationConnection] = []
|
||||
|
||||
LaterGauge(
|
||||
name="synapse_replication_tcp_resource_total_connections",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): len(self._connections)},
|
||||
tcp_resource_total_connections_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {(self.server_name,): len(self._connections)},
|
||||
)
|
||||
|
||||
# When POSITION or RDATA commands arrive, we stick them in a queue and process
|
||||
@@ -266,11 +276,9 @@ class ReplicationCommandHandler:
|
||||
# from that connection.
|
||||
self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}
|
||||
|
||||
LaterGauge(
|
||||
name="synapse_replication_tcp_command_queue",
|
||||
desc="Number of inbound RDATA/POSITION commands queued for processing",
|
||||
labelnames=["stream_name", SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
tcp_command_queue_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {
|
||||
(stream_name, self.server_name): len(queue)
|
||||
for stream_name, queue in self._command_queues_by_stream.items()
|
||||
},
|
||||
|
||||
@@ -527,7 +527,10 @@ pending_commands = LaterGauge(
|
||||
name="synapse_replication_tcp_protocol_pending_commands",
|
||||
desc="",
|
||||
labelnames=["name", SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
)
|
||||
pending_commands.register_hook(
|
||||
homeserver_instance_id=None,
|
||||
hook=lambda: {
|
||||
(p.name, p.server_name): len(p.pending_commands) for p in connected_connections
|
||||
},
|
||||
)
|
||||
@@ -544,7 +547,10 @@ transport_send_buffer = LaterGauge(
|
||||
name="synapse_replication_tcp_protocol_transport_send_buffer",
|
||||
desc="",
|
||||
labelnames=["name", SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
)
|
||||
transport_send_buffer.register_hook(
|
||||
homeserver_instance_id=None,
|
||||
hook=lambda: {
|
||||
(p.name, p.server_name): transport_buffer_size(p) for p in connected_connections
|
||||
},
|
||||
)
|
||||
@@ -571,7 +577,10 @@ tcp_transport_kernel_send_buffer = LaterGauge(
|
||||
name="synapse_replication_tcp_protocol_transport_kernel_send_buffer",
|
||||
desc="",
|
||||
labelnames=["name", SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
)
|
||||
tcp_transport_kernel_send_buffer.register_hook(
|
||||
homeserver_instance_id=None,
|
||||
hook=lambda: {
|
||||
(p.name, p.server_name): transport_kernel_read_buffer_size(p, False)
|
||||
for p in connected_connections
|
||||
},
|
||||
@@ -582,7 +591,10 @@ tcp_transport_kernel_read_buffer = LaterGauge(
|
||||
name="synapse_replication_tcp_protocol_transport_kernel_read_buffer",
|
||||
desc="",
|
||||
labelnames=["name", SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
)
|
||||
tcp_transport_kernel_read_buffer.register_hook(
|
||||
homeserver_instance_id=None,
|
||||
hook=lambda: {
|
||||
(p.name, p.server_name): transport_kernel_read_buffer_size(p, True)
|
||||
for p in connected_connections
|
||||
},
|
||||
|
||||
@@ -129,7 +129,10 @@ from synapse.http.client import (
|
||||
)
|
||||
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
|
||||
from synapse.media.media_repository import MediaRepository
|
||||
from synapse.metrics import register_threadpool
|
||||
from synapse.metrics import (
|
||||
all_later_gauges_to_clean_up_on_shutdown,
|
||||
register_threadpool,
|
||||
)
|
||||
from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager
|
||||
from synapse.module_api import ModuleApi
|
||||
from synapse.module_api.callbacks import ModuleApiCallbacks
|
||||
@@ -363,11 +366,36 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
self.datastores = Databases(self.DATASTORE_CLASS, self)
|
||||
logger.info("Finished setting up.")
|
||||
|
||||
# Register background tasks required by this server. This must be done
|
||||
# somewhat manually due to the background tasks not being registered
|
||||
# unless handlers are instantiated.
|
||||
if self.config.worker.run_background_tasks:
|
||||
self.setup_background_tasks()
|
||||
def __del__(self) -> None:
|
||||
"""
|
||||
Called when an the homeserver is garbage collected.
|
||||
|
||||
Make sure we actually do some clean-up, rather than leak data.
|
||||
"""
|
||||
self.cleanup()
|
||||
|
||||
def cleanup(self) -> None:
|
||||
"""
|
||||
WIP: Clean-up any references to the homeserver and stop any running related
|
||||
processes, timers, loops, replication stream, etc.
|
||||
|
||||
This should be called wherever you care about the HomeServer being completely
|
||||
garbage collected like in tests. It's not necessary to call if you plan to just
|
||||
shut down the whole Python process anyway.
|
||||
|
||||
Can be called multiple times.
|
||||
"""
|
||||
logger.info("Received cleanup request for %s.", self.hostname)
|
||||
|
||||
# TODO: Stop background processes, timers, loops, replication stream, etc.
|
||||
|
||||
# Cleanup metrics associated with the homeserver
|
||||
for later_gauge in all_later_gauges_to_clean_up_on_shutdown.values():
|
||||
later_gauge.unregister_hooks_for_homeserver_instance_id(
|
||||
self.get_instance_id()
|
||||
)
|
||||
|
||||
logger.info("Cleanup complete for %s.", self.hostname)
|
||||
|
||||
def start_listening(self) -> None: # noqa: B027 (no-op by design)
|
||||
"""Start the HTTP, manhole, metrics, etc listeners
|
||||
@@ -376,7 +404,7 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
appropriate listeners.
|
||||
"""
|
||||
|
||||
def setup_background_tasks(self) -> None:
|
||||
def start_background_tasks(self) -> None:
|
||||
"""
|
||||
Some handlers have side effects on instantiation (like registering
|
||||
background updates). This function causes them to be fetched, and
|
||||
|
||||
@@ -61,7 +61,7 @@ from synapse.logging.context import (
|
||||
current_context,
|
||||
make_deferred_yieldable,
|
||||
)
|
||||
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge, register_threadpool
|
||||
from synapse.metrics import SERVER_NAME_LABEL, register_threadpool
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage.background_updates import BackgroundUpdater
|
||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
||||
@@ -611,12 +611,6 @@ class DatabasePool:
|
||||
)
|
||||
|
||||
self.updates = BackgroundUpdater(hs, self)
|
||||
LaterGauge(
|
||||
name="synapse_background_update_status",
|
||||
desc="Background update status",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): self.updates.get_status()},
|
||||
)
|
||||
|
||||
self._previous_txn_total_time = 0.0
|
||||
self._current_txn_total_time = 0.0
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Generic, List, Optional, Type, TypeVar
|
||||
|
||||
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import DatabasePool, make_conn
|
||||
from synapse.storage.databases.main.events import PersistEventsStore
|
||||
@@ -40,6 +41,13 @@ logger = logging.getLogger(__name__)
|
||||
DataStoreT = TypeVar("DataStoreT", bound=SQLBaseStore, covariant=True)
|
||||
|
||||
|
||||
background_update_status = LaterGauge(
|
||||
name="synapse_background_update_status",
|
||||
desc="Background update status",
|
||||
labelnames=["database_name", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
class Databases(Generic[DataStoreT]):
|
||||
"""The various databases.
|
||||
|
||||
@@ -143,6 +151,15 @@ class Databases(Generic[DataStoreT]):
|
||||
|
||||
db_conn.close()
|
||||
|
||||
# Track the background update status for each database
|
||||
background_update_status.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {
|
||||
(database.name(), server_name): database.updates.get_status()
|
||||
for database in self.databases
|
||||
},
|
||||
)
|
||||
|
||||
# Sanity check that we have actually configured all the required stores.
|
||||
if not main:
|
||||
raise Exception("No 'main' database configured")
|
||||
|
||||
@@ -84,6 +84,13 @@ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership"
|
||||
_POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000
|
||||
|
||||
|
||||
federation_known_servers_gauge = LaterGauge(
|
||||
name="synapse_federation_known_servers",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
@attr.s(frozen=True, slots=True, auto_attribs=True)
|
||||
class EventIdMembership:
|
||||
"""Returned by `get_membership_from_event_ids`"""
|
||||
@@ -116,11 +123,9 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
1,
|
||||
self._count_known_servers,
|
||||
)
|
||||
LaterGauge(
|
||||
name="synapse_federation_known_servers",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): self._known_servers_count},
|
||||
federation_known_servers_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {(self.server_name,): self._known_servers_count},
|
||||
)
|
||||
|
||||
@wrap_as_background_process("_count_known_servers")
|
||||
|
||||
@@ -84,9 +84,6 @@ class AbstractObservableDeferred(Generic[_T], metaclass=abc.ABCMeta):
|
||||
This returns a brand new deferred that is resolved when the underlying
|
||||
deferred is resolved. Interacting with the returned deferred does not
|
||||
effect the underlying deferred.
|
||||
|
||||
Note that the returned Deferred doesn't follow the Synapse logcontext rules -
|
||||
you will probably want to `make_deferred_yieldable` it.
|
||||
"""
|
||||
...
|
||||
|
||||
@@ -100,11 +97,6 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
|
||||
|
||||
Cancelling or otherwise resolving an observer will not affect the original
|
||||
ObservableDeferred.
|
||||
|
||||
NB that it does not attempt to do anything with logcontexts; in general
|
||||
you should probably make_deferred_yieldable the deferreds
|
||||
returned by `observe`, and ensure that the original deferred runs its
|
||||
callbacks in the sentinel logcontext.
|
||||
"""
|
||||
|
||||
__slots__ = ["_deferred", "_observers", "_result"]
|
||||
@@ -861,16 +853,12 @@ def stop_cancellation(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
|
||||
"""Prevent a `Deferred` from being cancelled by wrapping it in another `Deferred`.
|
||||
|
||||
Args:
|
||||
deferred: The `Deferred` to protect against cancellation. Must not follow the
|
||||
Synapse logcontext rules.
|
||||
deferred: The `Deferred` to protect against cancellation.
|
||||
|
||||
Returns:
|
||||
A new `Deferred`, which will contain the result of the original `Deferred`.
|
||||
The new `Deferred` will not propagate cancellation through to the original.
|
||||
When cancelled, the new `Deferred` will fail with a `CancelledError`.
|
||||
|
||||
The new `Deferred` will not follow the Synapse logcontext rules and should be
|
||||
wrapped with `make_deferred_yieldable`.
|
||||
"""
|
||||
new_deferred: "defer.Deferred[T]" = defer.Deferred()
|
||||
deferred.chainDeferred(new_deferred)
|
||||
@@ -896,8 +884,7 @@ def delay_cancellation(awaitable: Awaitable[T]) -> Awaitable[T]:
|
||||
resolve with a `CancelledError` until the original awaitable resolves.
|
||||
|
||||
Args:
|
||||
deferred: The coroutine or `Deferred` to protect against cancellation. May
|
||||
optionally follow the Synapse logcontext rules.
|
||||
deferred: The coroutine or `Deferred` to protect against cancellation.
|
||||
|
||||
Returns:
|
||||
A new `Deferred`, which will contain the result of the original coroutine or
|
||||
@@ -906,10 +893,6 @@ def delay_cancellation(awaitable: Awaitable[T]) -> Awaitable[T]:
|
||||
|
||||
When cancelled, the new `Deferred` will wait until the original coroutine or
|
||||
`Deferred` resolves before failing with a `CancelledError`.
|
||||
|
||||
The new `Deferred` will follow the Synapse logcontext rules if `awaitable`
|
||||
follows the Synapse logcontext rules. Otherwise the new `Deferred` should be
|
||||
wrapped with `make_deferred_yieldable`.
|
||||
"""
|
||||
|
||||
# First, convert the awaitable into a `Deferred`.
|
||||
|
||||
@@ -295,9 +295,6 @@ class DeferredCache(Generic[KT, VT]):
|
||||
*original* `value`, (c) any future calls to `get()` will complete with the
|
||||
result from the *new* `value`.
|
||||
|
||||
It is expected that `value` does *not* follow the synapse logcontext rules - ie,
|
||||
if it is incomplete, it runs its callbacks in the sentinel context.
|
||||
|
||||
Args:
|
||||
key: Key to be set
|
||||
value: a deferred which will complete with a result to add to the cache
|
||||
|
||||
@@ -234,11 +234,9 @@ class ResponseCache(Generic[KV]):
|
||||
) -> RV:
|
||||
"""Wrap together a *get* and *set* call, taking care of logcontexts
|
||||
|
||||
First looks up the key in the cache, and if it is present makes it
|
||||
follow the synapse logcontext rules and returns it.
|
||||
First looks up the key in the cache, and if present, returns it.
|
||||
|
||||
Otherwise, makes a call to *callback(*args, **kwargs)*, which should
|
||||
follow the synapse logcontext rules, and adds the result to the cache.
|
||||
Otherwise, makes a call to *callback(*args, **kwargs)* and adds the result to the cache.
|
||||
|
||||
Example usage:
|
||||
|
||||
|
||||
@@ -1,250 +0,0 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2023 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
# Originally licensed under the Apache License, Version 2.0:
|
||||
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
||||
#
|
||||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
|
||||
import functools
|
||||
import sys
|
||||
from types import GeneratorType
|
||||
from typing import Any, Callable, Generator, List, TypeVar, cast
|
||||
|
||||
from typing_extensions import ParamSpec
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import Deferred
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
# Tracks if we've already patched inlineCallbacks
|
||||
_already_patched = False
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
P = ParamSpec("P")
|
||||
|
||||
|
||||
def do_patch() -> None:
|
||||
"""
|
||||
Patch defer.inlineCallbacks so that it checks the state of the logcontext on exit
|
||||
"""
|
||||
|
||||
from synapse.logging.context import current_context
|
||||
|
||||
global _already_patched
|
||||
|
||||
orig_inline_callbacks = defer.inlineCallbacks
|
||||
if _already_patched:
|
||||
return
|
||||
|
||||
def new_inline_callbacks(
|
||||
f: Callable[P, Generator["Deferred[object]", object, T]],
|
||||
) -> Callable[P, "Deferred[T]"]:
|
||||
@functools.wraps(f)
|
||||
def wrapped(*args: P.args, **kwargs: P.kwargs) -> "Deferred[T]":
|
||||
start_context = current_context()
|
||||
changes: List[str] = []
|
||||
orig: Callable[P, "Deferred[T]"] = orig_inline_callbacks(
|
||||
_check_yield_points(f, changes)
|
||||
)
|
||||
|
||||
try:
|
||||
res: "Deferred[T]" = orig(*args, **kwargs)
|
||||
except Exception:
|
||||
if current_context() != start_context:
|
||||
for err in changes:
|
||||
print(err, file=sys.stderr)
|
||||
|
||||
err = "%s changed context from %s to %s on exception" % (
|
||||
f,
|
||||
start_context,
|
||||
current_context(),
|
||||
)
|
||||
print(err, file=sys.stderr)
|
||||
raise Exception(err)
|
||||
raise
|
||||
|
||||
if not isinstance(res, Deferred) or res.called:
|
||||
if current_context() != start_context:
|
||||
for err in changes:
|
||||
print(err, file=sys.stderr)
|
||||
|
||||
err = "Completed %s changed context from %s to %s" % (
|
||||
f,
|
||||
start_context,
|
||||
current_context(),
|
||||
)
|
||||
# print the error to stderr because otherwise all we
|
||||
# see in travis-ci is the 500 error
|
||||
print(err, file=sys.stderr)
|
||||
raise Exception(err)
|
||||
return res
|
||||
|
||||
if current_context():
|
||||
err = (
|
||||
"%s returned incomplete deferred in non-sentinel context "
|
||||
"%s (start was %s)"
|
||||
) % (f, current_context(), start_context)
|
||||
print(err, file=sys.stderr)
|
||||
raise Exception(err)
|
||||
|
||||
def check_ctx(r: T) -> T:
|
||||
if current_context() != start_context:
|
||||
for err in changes:
|
||||
print(err, file=sys.stderr)
|
||||
err = "%s completion of %s changed context from %s to %s" % (
|
||||
"Failure" if isinstance(r, Failure) else "Success",
|
||||
f,
|
||||
start_context,
|
||||
current_context(),
|
||||
)
|
||||
print(err, file=sys.stderr)
|
||||
raise Exception(err)
|
||||
return r
|
||||
|
||||
res.addBoth(check_ctx)
|
||||
return res
|
||||
|
||||
return wrapped
|
||||
|
||||
defer.inlineCallbacks = new_inline_callbacks
|
||||
_already_patched = True
|
||||
|
||||
|
||||
def _check_yield_points(
|
||||
f: Callable[P, Generator["Deferred[object]", object, T]],
|
||||
changes: List[str],
|
||||
) -> Callable:
|
||||
"""Wraps a generator that is about to be passed to defer.inlineCallbacks
|
||||
checking that after every yield the log contexts are correct.
|
||||
|
||||
It's perfectly valid for log contexts to change within a function, e.g. due
|
||||
to new Measure blocks, so such changes are added to the given `changes`
|
||||
list instead of triggering an exception.
|
||||
|
||||
Args:
|
||||
f: generator function to wrap
|
||||
changes: A list of strings detailing how the contexts
|
||||
changed within a function.
|
||||
|
||||
Returns:
|
||||
function
|
||||
"""
|
||||
|
||||
from synapse.logging.context import current_context
|
||||
|
||||
@functools.wraps(f)
|
||||
def check_yield_points_inner(
|
||||
*args: P.args, **kwargs: P.kwargs
|
||||
) -> Generator["Deferred[object]", object, T]:
|
||||
gen = f(*args, **kwargs)
|
||||
|
||||
# We only patch if we have a native generator function, as we rely on
|
||||
# `gen.gi_frame`.
|
||||
if not isinstance(gen, GeneratorType):
|
||||
ret = yield from gen
|
||||
return ret
|
||||
|
||||
last_yield_line_no = gen.gi_frame.f_lineno
|
||||
result: Any = None
|
||||
while True:
|
||||
expected_context = current_context()
|
||||
|
||||
try:
|
||||
isFailure = isinstance(result, Failure)
|
||||
if isFailure:
|
||||
d = result.throwExceptionIntoGenerator(gen)
|
||||
else:
|
||||
d = gen.send(result)
|
||||
except StopIteration as e:
|
||||
if current_context() != expected_context:
|
||||
# This happens when the context is lost sometime *after* the
|
||||
# final yield and returning. E.g. we forgot to yield on a
|
||||
# function that returns a deferred.
|
||||
#
|
||||
# We don't raise here as it's perfectly valid for contexts to
|
||||
# change in a function, as long as it sets the correct context
|
||||
# on resolving (which is checked separately).
|
||||
err = (
|
||||
"Function %r returned and changed context from %s to %s,"
|
||||
" in %s between %d and end of func"
|
||||
% (
|
||||
f.__qualname__,
|
||||
expected_context,
|
||||
current_context(),
|
||||
f.__code__.co_filename,
|
||||
last_yield_line_no,
|
||||
)
|
||||
)
|
||||
changes.append(err)
|
||||
# The `StopIteration` contains the return value from the
|
||||
# generator.
|
||||
return cast(T, e.value)
|
||||
|
||||
frame = gen.gi_frame
|
||||
|
||||
if isinstance(d, defer.Deferred) and not d.called:
|
||||
# This happens if we yield on a deferred that doesn't follow
|
||||
# the log context rules without wrapping in a `make_deferred_yieldable`.
|
||||
# We raise here as this should never happen.
|
||||
if current_context():
|
||||
err = (
|
||||
"%s yielded with context %s rather than sentinel,"
|
||||
" yielded on line %d in %s"
|
||||
% (
|
||||
frame.f_code.co_name,
|
||||
current_context(),
|
||||
frame.f_lineno,
|
||||
frame.f_code.co_filename,
|
||||
)
|
||||
)
|
||||
raise Exception(err)
|
||||
|
||||
# the wrapped function yielded a Deferred: yield it back up to the parent
|
||||
# inlineCallbacks().
|
||||
try:
|
||||
result = yield d
|
||||
except Exception:
|
||||
# this will fish an earlier Failure out of the stack where possible, and
|
||||
# thus is preferable to passing in an exception to the Failure
|
||||
# constructor, since it results in less stack-mangling.
|
||||
result = Failure()
|
||||
|
||||
if current_context() != expected_context:
|
||||
# This happens because the context is lost sometime *after* the
|
||||
# previous yield and *after* the current yield. E.g. the
|
||||
# deferred we waited on didn't follow the rules, or we forgot to
|
||||
# yield on a function between the two yield points.
|
||||
#
|
||||
# We don't raise here as its perfectly valid for contexts to
|
||||
# change in a function, as long as it sets the correct context
|
||||
# on resolving (which is checked separately).
|
||||
err = (
|
||||
"%s changed context from %s to %s, happened between lines %d and %d in %s"
|
||||
% (
|
||||
frame.f_code.co_name,
|
||||
expected_context,
|
||||
current_context(),
|
||||
last_yield_line_no,
|
||||
frame.f_lineno,
|
||||
frame.f_code.co_filename,
|
||||
)
|
||||
)
|
||||
changes.append(err)
|
||||
|
||||
last_yield_line_no = frame.f_lineno
|
||||
|
||||
return check_yield_points_inner
|
||||
@@ -131,22 +131,28 @@ def _get_counts_from_rate_limiter_instance(
|
||||
# We track the number of affected hosts per time-period so we can
|
||||
# differentiate one really noisy homeserver from a general
|
||||
# ratelimit tuning problem across the federation.
|
||||
LaterGauge(
|
||||
sleep_affected_hosts_gauge = LaterGauge(
|
||||
name="synapse_rate_limit_sleep_affected_hosts",
|
||||
desc="Number of hosts that had requests put to sleep",
|
||||
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
|
||||
caller=lambda: _get_counts_from_rate_limiter_instance(
|
||||
)
|
||||
sleep_affected_hosts_gauge.register_hook(
|
||||
homeserver_instance_id=None,
|
||||
hook=lambda: _get_counts_from_rate_limiter_instance(
|
||||
lambda rate_limiter_instance: sum(
|
||||
ratelimiter.should_sleep()
|
||||
for ratelimiter in rate_limiter_instance.ratelimiters.values()
|
||||
)
|
||||
),
|
||||
)
|
||||
LaterGauge(
|
||||
reject_affected_hosts_gauge = LaterGauge(
|
||||
name="synapse_rate_limit_reject_affected_hosts",
|
||||
desc="Number of hosts that had requests rejected",
|
||||
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
|
||||
caller=lambda: _get_counts_from_rate_limiter_instance(
|
||||
)
|
||||
reject_affected_hosts_gauge.register_hook(
|
||||
homeserver_instance_id=None,
|
||||
hook=lambda: _get_counts_from_rate_limiter_instance(
|
||||
lambda rate_limiter_instance: sum(
|
||||
ratelimiter.should_reject()
|
||||
for ratelimiter in rate_limiter_instance.ratelimiters.values()
|
||||
|
||||
@@ -44,6 +44,13 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
running_tasks_gauge = LaterGauge(
|
||||
name="synapse_scheduler_running_tasks",
|
||||
desc="The number of concurrent running tasks handled by the TaskScheduler",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
class TaskScheduler:
|
||||
"""
|
||||
This is a simple task scheduler designed for resumable tasks. Normally,
|
||||
@@ -130,11 +137,9 @@ class TaskScheduler:
|
||||
TaskScheduler.SCHEDULE_INTERVAL_MS,
|
||||
)
|
||||
|
||||
LaterGauge(
|
||||
name="synapse_scheduler_running_tasks",
|
||||
desc="The number of concurrent running tasks handled by the TaskScheduler",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): len(self._running_tasks)},
|
||||
running_tasks_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {(self.server_name,): len(self._running_tasks)},
|
||||
)
|
||||
|
||||
def register_action(
|
||||
|
||||
@@ -21,9 +21,4 @@
|
||||
|
||||
from twisted.trial import util
|
||||
|
||||
from synapse.util.patch_inline_callbacks import do_patch
|
||||
|
||||
# attempt to do the patch before we load any synapse code
|
||||
do_patch()
|
||||
|
||||
util.DEFAULT_TIMEOUT_DURATION = 20
|
||||
|
||||
@@ -18,11 +18,18 @@
|
||||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
from typing import Dict, Protocol, Tuple
|
||||
from typing import Dict, NoReturn, Protocol, Tuple
|
||||
|
||||
from prometheus_client.core import Sample
|
||||
|
||||
from synapse.metrics import REGISTRY, InFlightGauge, generate_latest
|
||||
from synapse.metrics import (
|
||||
REGISTRY,
|
||||
SERVER_NAME_LABEL,
|
||||
InFlightGauge,
|
||||
LaterGauge,
|
||||
all_later_gauges_to_clean_up_on_shutdown,
|
||||
generate_latest,
|
||||
)
|
||||
from synapse.util.caches.deferred_cache import DeferredCache
|
||||
|
||||
from tests import unittest
|
||||
@@ -285,6 +292,95 @@ class CacheMetricsTests(unittest.HomeserverTestCase):
|
||||
self.assertEqual(hs2_cache_max_size_metric_value, "777.0")
|
||||
|
||||
|
||||
class LaterGaugeTests(unittest.HomeserverTestCase):
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
self.later_gauge = LaterGauge(
|
||||
name="foo",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
def tearDown(self) -> None:
|
||||
super().tearDown()
|
||||
|
||||
REGISTRY.unregister(self.later_gauge)
|
||||
all_later_gauges_to_clean_up_on_shutdown.pop(self.later_gauge.name, None)
|
||||
|
||||
def test_later_gauge_multiple_servers(self) -> None:
|
||||
"""
|
||||
Test that LaterGauge metrics are reported correctly across multiple servers. We
|
||||
will have an metrics entry for each homeserver that is labeled with the
|
||||
`server_name` label.
|
||||
"""
|
||||
self.later_gauge.register_hook(
|
||||
homeserver_instance_id="123", hook=lambda: {("hs1",): 1}
|
||||
)
|
||||
self.later_gauge.register_hook(
|
||||
homeserver_instance_id="456", hook=lambda: {("hs2",): 2}
|
||||
)
|
||||
|
||||
metrics_map = get_latest_metrics()
|
||||
|
||||
# Find the metrics from both homeservers
|
||||
hs1_metric = 'foo{server_name="hs1"}'
|
||||
hs1_metric_value = metrics_map.get(hs1_metric)
|
||||
self.assertIsNotNone(
|
||||
hs1_metric_value,
|
||||
f"Missing metric {hs1_metric} in metrics {metrics_map}",
|
||||
)
|
||||
self.assertEqual(hs1_metric_value, "1.0")
|
||||
|
||||
hs2_metric = 'foo{server_name="hs2"}'
|
||||
hs2_metric_value = metrics_map.get(hs2_metric)
|
||||
self.assertIsNotNone(
|
||||
hs2_metric_value,
|
||||
f"Missing metric {hs2_metric} in metrics {metrics_map}",
|
||||
)
|
||||
self.assertEqual(hs2_metric_value, "2.0")
|
||||
|
||||
def test_later_gauge_hook_exception(self) -> None:
|
||||
"""
|
||||
Test that LaterGauge metrics are collected across multiple servers even if one
|
||||
hooks is throwing an exception.
|
||||
"""
|
||||
|
||||
def raise_exception() -> NoReturn:
|
||||
raise Exception("fake error generating data")
|
||||
|
||||
# Make the hook for hs1 throw an exception
|
||||
self.later_gauge.register_hook(
|
||||
homeserver_instance_id="123", hook=raise_exception
|
||||
)
|
||||
# Metrics from hs2 still work fine
|
||||
self.later_gauge.register_hook(
|
||||
homeserver_instance_id="456", hook=lambda: {("hs2",): 2}
|
||||
)
|
||||
|
||||
metrics_map = get_latest_metrics()
|
||||
|
||||
# Since we encountered an exception while trying to collect metrics from hs1, we
|
||||
# don't expect to see it here.
|
||||
hs1_metric = 'foo{server_name="hs1"}'
|
||||
hs1_metric_value = metrics_map.get(hs1_metric)
|
||||
self.assertIsNone(
|
||||
hs1_metric_value,
|
||||
(
|
||||
"Since we encountered an exception while trying to collect metrics from hs1"
|
||||
f"we don't expect to see it the metrics_map {metrics_map}"
|
||||
),
|
||||
)
|
||||
|
||||
# We should still see metrics from hs2 though
|
||||
hs2_metric = 'foo{server_name="hs2"}'
|
||||
hs2_metric_value = metrics_map.get(hs2_metric)
|
||||
self.assertIsNotNone(
|
||||
hs2_metric_value,
|
||||
f"Missing metric {hs2_metric} in cache metrics {metrics_map}",
|
||||
)
|
||||
self.assertEqual(hs2_metric_value, "2.0")
|
||||
|
||||
|
||||
def get_latest_metrics() -> Dict[str, str]:
|
||||
"""
|
||||
Collect the latest metrics from the registry and parse them into an easy to use map.
|
||||
|
||||
@@ -32,7 +32,6 @@ from synapse.config.workers import InstanceTcpLocationConfig, InstanceUnixLocati
|
||||
from synapse.http.site import SynapseRequest, SynapseSite
|
||||
from synapse.replication.http import ReplicationRestResource
|
||||
from synapse.replication.tcp.client import ReplicationDataHandler
|
||||
from synapse.replication.tcp.handler import ReplicationCommandHandler
|
||||
from synapse.replication.tcp.protocol import (
|
||||
ClientReplicationStreamProtocol,
|
||||
ServerReplicationStreamProtocol,
|
||||
@@ -97,7 +96,7 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
|
||||
self.test_handler = self._build_replication_data_handler()
|
||||
self.worker_hs._replication_data_handler = self.test_handler # type: ignore[attr-defined]
|
||||
|
||||
repl_handler = ReplicationCommandHandler(self.worker_hs)
|
||||
repl_handler = self.worker_hs.get_replication_command_handler()
|
||||
self.client = ClientReplicationStreamProtocol(
|
||||
self.worker_hs,
|
||||
"client",
|
||||
|
||||
@@ -1145,6 +1145,9 @@ def setup_test_homeserver(
|
||||
reactor=reactor,
|
||||
)
|
||||
|
||||
# Register the cleanup hook
|
||||
cleanup_func(hs.cleanup)
|
||||
|
||||
# Install @cache_in_self attributes
|
||||
for key, val in kwargs.items():
|
||||
setattr(hs, "_" + key, val)
|
||||
|
||||
@@ -355,7 +355,7 @@ class DescriptorTestCase(unittest.TestCase):
|
||||
d = obj.fn(1)
|
||||
self.assertEqual(
|
||||
current_context(),
|
||||
SENTINEL_CONTEXT,
|
||||
c1,
|
||||
)
|
||||
yield d
|
||||
self.fail("No exception thrown")
|
||||
@@ -849,7 +849,7 @@ class CachedListDescriptorTestCase(unittest.TestCase):
|
||||
|
||||
# start the lookup off
|
||||
d1 = obj.list_fn([10, 20], 2)
|
||||
self.assertEqual(current_context(), SENTINEL_CONTEXT)
|
||||
self.assertEqual(current_context(), c1)
|
||||
r = yield d1
|
||||
self.assertEqual(current_context(), c1)
|
||||
obj.mock.assert_called_once_with({10, 20}, 2)
|
||||
|
||||
Reference in New Issue
Block a user