mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Compare commits
9 Commits
travis/hma
...
madlittlem
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6ce2f3e59d | ||
|
|
93044f4c5b | ||
|
|
c7a80b63ec | ||
|
|
1f384b0e21 | ||
|
|
3e66e0a1b8 | ||
|
|
0c8759bbb6 | ||
|
|
4303879cfe | ||
|
|
3742b3b3fb | ||
|
|
224cb3f827 |
1
changelog.d/18871.misc
Normal file
1
changelog.d/18871.misc
Normal file
@@ -0,0 +1 @@
|
||||
Store the `LoggingContext` in a `ContextVar` instead of a thread-local variable.
|
||||
@@ -1,12 +1,12 @@
|
||||
# Log Contexts
|
||||
|
||||
To help track the processing of individual requests, synapse uses a
|
||||
'`log context`' to track which request it is handling at any given
|
||||
moment. This is done via a thread-local variable; a `logging.Filter` is
|
||||
then used to fish the information back out of the thread-local variable
|
||||
`LoggingContext` to track which request it is handling at any given
|
||||
moment. This is done via a `ContextVar` variable; a `logging.Filter` is
|
||||
then used to fish the information back out of the `ContextVar` variable
|
||||
and add it to each log record.
|
||||
|
||||
Logcontexts are also used for CPU and database accounting, so that we
|
||||
Log contexts are also used for CPU and database accounting, so that we
|
||||
can track which requests were responsible for high CPU use or database
|
||||
activity.
|
||||
|
||||
@@ -14,18 +14,11 @@ The `synapse.logging.context` module provides facilities for managing
|
||||
the current log context (as well as providing the `LoggingContextFilter`
|
||||
class).
|
||||
|
||||
Asynchronous functions make the whole thing complicated, so this document describes
|
||||
how it all works, and how to write code which follows the rules.
|
||||
|
||||
In this document, "awaitable" refers to any object which can be `await`ed. In the context of
|
||||
Synapse, that normally means either a coroutine or a Twisted
|
||||
In this document, "awaitable" refers to any object which can be `await`ed. In the
|
||||
context of Synapse, that normally means either a coroutine or a Twisted
|
||||
[`Deferred`](https://twistedmatrix.com/documents/current/api/twisted.internet.defer.Deferred.html).
|
||||
|
||||
## Logcontexts without asynchronous code
|
||||
|
||||
In the absence of any asynchronous voodoo, things are simple enough. As with
|
||||
any code of this nature, the rule is that our function should leave
|
||||
things as it found them:
|
||||
## Basic usage
|
||||
|
||||
```python
|
||||
from synapse.logging import context # omitted from future snippets
|
||||
@@ -45,7 +38,7 @@ def do_request_handling():
|
||||
logger.debug("phew") # this will be logged against request_id
|
||||
```
|
||||
|
||||
LoggingContext implements the context management methods, so the above
|
||||
`LoggingContext` implements the context management methods, so the above
|
||||
can be written much more succinctly as:
|
||||
|
||||
```python
|
||||
@@ -59,197 +52,76 @@ def do_request_handling():
|
||||
logger.debug("phew")
|
||||
```
|
||||
|
||||
## Using logcontexts with awaitables
|
||||
### The `sentinel` context
|
||||
|
||||
Awaitables break the linear flow of code so that there is no longer a single entry point
|
||||
where we should set the logcontext and a single exit point where we should remove it.
|
||||
The default context is `context.SENTINEL_CONTEXT`, which is a sentinel value to
|
||||
represent the root context. This is what is used when there is no other context set.
|
||||
|
||||
Consider the example above, where `do_request_handling` needs to do some
|
||||
blocking operation, and returns an awaitable:
|
||||
No CPU/database usage metrics are recorded against the `sentinel` context.
|
||||
|
||||
Ideally, nothing from the Synapse homeserver would be logged against the `sentinel`
|
||||
context as we want to know where the logs came from. In practice, this is not always the
|
||||
case yet especially outside of request handling.
|
||||
|
||||
Previously, the `sentinel` context played a bigger role when we had to carefully deal
|
||||
with thread-local storage; as we had to make sure to not leak another context to another
|
||||
task after we gave up control to the reactor so we set the
|
||||
|
||||
|
||||
|
||||
### `PreserveLoggingContext`
|
||||
|
||||
In a similar vein of no longer as relevant, `PreserveLoggingContext` is another context
|
||||
manager helper and a little bit of syntactic sugar to set the current log context
|
||||
(without finishing it) and restore the previous context on exit.
|
||||
|
||||
```python
|
||||
async def handle_request(request_id):
|
||||
with context.LoggingContext() as request_context:
|
||||
request_context.request = request_id
|
||||
await do_request_handling()
|
||||
import logging
|
||||
from synapse.logging.context import LoggingContext
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def main() -> None:
|
||||
with context.LoggingContext("main"):
|
||||
task_context = context.LoggingContext("task")
|
||||
|
||||
with task_context:
|
||||
logger.debug("foo")
|
||||
|
||||
# Bad: will throw an error because `task_context` is already finished
|
||||
with task_context:
|
||||
logger.debug("bar")
|
||||
|
||||
logger.debug("finished")
|
||||
```
|
||||
|
||||
In the above flow:
|
||||
|
||||
- The logcontext is set
|
||||
- `do_request_handling` is called, and returns an awaitable
|
||||
- `handle_request` awaits the awaitable
|
||||
- Execution of `handle_request` is suspended
|
||||
|
||||
So we have stopped processing the request (and will probably go on to
|
||||
start processing the next), without clearing the logcontext.
|
||||
|
||||
To circumvent this problem, synapse code assumes that, wherever you have
|
||||
an awaitable, you will want to `await` it. To that end, wherever
|
||||
functions return awaitables, we adopt the following conventions:
|
||||
|
||||
**Rules for functions returning awaitables:**
|
||||
|
||||
> - If the awaitable is already complete, the function returns with the
|
||||
> same logcontext it started with.
|
||||
> - If the awaitable is incomplete, the function clears the logcontext
|
||||
> before returning; when the awaitable completes, it restores the
|
||||
> logcontext before running any callbacks.
|
||||
|
||||
That sounds complicated, but actually it means a lot of code (including
|
||||
the example above) "just works". There are two cases:
|
||||
|
||||
- If `do_request_handling` returns a completed awaitable, then the
|
||||
logcontext will still be in place. In this case, execution will
|
||||
continue immediately after the `await`; the "finished" line will
|
||||
be logged against the right context, and the `with` block restores
|
||||
the original context before we return to the caller.
|
||||
- If the returned awaitable is incomplete, `do_request_handling` clears
|
||||
the logcontext before returning. The logcontext is therefore clear
|
||||
when `handle_request` `await`s the awaitable.
|
||||
|
||||
Once `do_request_handling`'s awaitable completes, it will reinstate
|
||||
the logcontext, before running the second half of `handle_request`,
|
||||
so again the "finished" line will be logged against the right context,
|
||||
and the `with` block restores the original context.
|
||||
|
||||
As an aside, it's worth noting that `handle_request` follows our rules
|
||||
- though that only matters if the caller has its own logcontext which it
|
||||
cares about.
|
||||
|
||||
The following sections describe pitfalls and helpful patterns when
|
||||
implementing these rules.
|
||||
|
||||
Always await your awaitables
|
||||
----------------------------
|
||||
|
||||
Whenever you get an awaitable back from a function, you should `await` on
|
||||
it as soon as possible. Do not pass go; do not do any logging; do not
|
||||
call any other functions.
|
||||
This can be fixed by using `PreserveLoggingContext`:
|
||||
|
||||
```python
|
||||
async def fun():
|
||||
logger.debug("starting")
|
||||
await do_some_stuff() # just like this
|
||||
import logging
|
||||
from synapse.logging.context import LoggingContext
|
||||
|
||||
coro = more_stuff()
|
||||
result = await coro # also fine, of course
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
return result
|
||||
def main() -> None:
|
||||
with context.LoggingContext("main"):
|
||||
task_context = context.LoggingContext("task")
|
||||
|
||||
with PreserveLoggingContext(task_context):
|
||||
logger.debug("foo")
|
||||
with PreserveLoggingContext(task_context):
|
||||
logger.debug("bar")
|
||||
|
||||
logger.debug("finished") # this will be logged against main
|
||||
```
|
||||
|
||||
Provided this pattern is followed all the way back up to the callchain
|
||||
to where the logcontext was set, this will make things work out ok:
|
||||
provided `do_some_stuff` and `more_stuff` follow the rules above, then
|
||||
so will `fun`.
|
||||
Or you could equivalently just manage the log context manually via
|
||||
`set_current_context`.
|
||||
|
||||
It's all too easy to forget to `await`: for instance if we forgot that
|
||||
`do_some_stuff` returned an awaitable, we might plough on regardless. This
|
||||
leads to a mess; it will probably work itself out eventually, but not
|
||||
before a load of stuff has been logged against the wrong context.
|
||||
(Normally, other things will break, more obviously, if you forget to
|
||||
`await`, so this tends not to be a major problem in practice.)
|
||||
|
||||
Of course sometimes you need to do something a bit fancier with your
|
||||
awaitable - not all code follows the linear A-then-B-then-C pattern.
|
||||
Notes on implementing more complex patterns are in later sections.
|
||||
|
||||
## Where you create a new awaitable, make it follow the rules
|
||||
|
||||
Most of the time, an awaitable comes from another synapse function.
|
||||
Sometimes, though, we need to make up a new awaitable, or we get an awaitable
|
||||
back from external code. We need to make it follow our rules.
|
||||
|
||||
The easy way to do it is by using `context.make_deferred_yieldable`. Suppose we want to implement
|
||||
`sleep`, which returns a deferred which will run its callbacks after a
|
||||
given number of seconds. That might look like:
|
||||
|
||||
```python
|
||||
# not a logcontext-rules-compliant function
|
||||
def get_sleep_deferred(seconds):
|
||||
d = defer.Deferred()
|
||||
reactor.callLater(seconds, d.callback, None)
|
||||
return d
|
||||
```
|
||||
|
||||
That doesn't follow the rules, but we can fix it by calling it through
|
||||
`context.make_deferred_yieldable`:
|
||||
|
||||
```python
|
||||
async def sleep(seconds):
|
||||
return await context.make_deferred_yieldable(get_sleep_deferred(seconds))
|
||||
```
|
||||
|
||||
## Fire-and-forget
|
||||
|
||||
Sometimes you want to fire off a chain of execution, but not wait for
|
||||
its result. That might look a bit like this:
|
||||
|
||||
```python
|
||||
async def do_request_handling():
|
||||
await foreground_operation()
|
||||
|
||||
# *don't* do this
|
||||
background_operation()
|
||||
|
||||
logger.debug("Request handling complete")
|
||||
|
||||
async def background_operation():
|
||||
await first_background_step()
|
||||
logger.debug("Completed first step")
|
||||
await second_background_step()
|
||||
logger.debug("Completed second step")
|
||||
```
|
||||
|
||||
The above code does a couple of steps in the background after
|
||||
`do_request_handling` has finished. The log lines are still logged
|
||||
against the `request_context` logcontext, which may or may not be
|
||||
desirable. There are two big problems with the above, however. The first
|
||||
problem is that, if `background_operation` returns an incomplete
|
||||
awaitable, it will expect its caller to `await` immediately, so will have
|
||||
cleared the logcontext. In this example, that means that 'Request
|
||||
handling complete' will be logged without any context.
|
||||
|
||||
The second problem, which is potentially even worse, is that when the
|
||||
awaitable returned by `background_operation` completes, it will restore
|
||||
the original logcontext. There is nothing waiting on that awaitable, so
|
||||
the logcontext will leak into the reactor and possibly get attached to
|
||||
some arbitrary future operation.
|
||||
|
||||
There are two potential solutions to this.
|
||||
|
||||
One option is to surround the call to `background_operation` with a
|
||||
`PreserveLoggingContext` call. That will reset the logcontext before
|
||||
starting `background_operation` (so the context restored when the
|
||||
deferred completes will be the empty logcontext), and will restore the
|
||||
current logcontext before continuing the foreground process:
|
||||
|
||||
```python
|
||||
async def do_request_handling():
|
||||
await foreground_operation()
|
||||
|
||||
# start background_operation off in the empty logcontext, to
|
||||
# avoid leaking the current context into the reactor.
|
||||
with PreserveLoggingContext():
|
||||
background_operation()
|
||||
|
||||
# this will now be logged against the request context
|
||||
logger.debug("Request handling complete")
|
||||
```
|
||||
|
||||
Obviously that option means that the operations done in
|
||||
`background_operation` would be not be logged against a logcontext
|
||||
(though that might be fixed by setting a different logcontext via a
|
||||
`with LoggingContext(...)` in `background_operation`).
|
||||
|
||||
The second option is to use `context.run_in_background`, which wraps a
|
||||
function so that it doesn't reset the logcontext even when it returns
|
||||
an incomplete awaitable, and adds a callback to the returned awaitable to
|
||||
reset the logcontext. In other words, it turns a function that follows
|
||||
the Synapse rules about logcontexts and awaitables into one which behaves
|
||||
more like an external function --- the opposite operation to that
|
||||
described in the previous section. It can be used like this:
|
||||
To drive an awaitable in the background, you can use `context.run_in_background`:
|
||||
|
||||
```python
|
||||
async def do_request_handling():
|
||||
@@ -261,104 +133,13 @@ async def do_request_handling():
|
||||
logger.debug("Request handling complete")
|
||||
```
|
||||
|
||||
## Passing synapse deferreds into third-party functions
|
||||
|
||||
A typical example of this is where we want to collect together two or
|
||||
more awaitables via `defer.gatherResults`:
|
||||
|
||||
```python
|
||||
a1 = operation1()
|
||||
a2 = operation2()
|
||||
a3 = defer.gatherResults([a1, a2])
|
||||
```
|
||||
|
||||
This is really a variation of the fire-and-forget problem above, in that
|
||||
we are firing off `a1` and `a2` without awaiting on them. The difference
|
||||
is that we now have third-party code attached to their callbacks. Anyway
|
||||
either technique given in the [Fire-and-forget](#fire-and-forget)
|
||||
section will work.
|
||||
|
||||
Of course, the new awaitable returned by `gather` needs to be
|
||||
wrapped in order to make it follow the logcontext rules before we can
|
||||
yield it, as described in [Where you create a new awaitable, make it
|
||||
follow the
|
||||
rules](#where-you-create-a-new-awaitable-make-it-follow-the-rules).
|
||||
|
||||
So, option one: reset the logcontext before starting the operations to
|
||||
be gathered:
|
||||
|
||||
```python
|
||||
async def do_request_handling():
|
||||
with PreserveLoggingContext():
|
||||
a1 = operation1()
|
||||
a2 = operation2()
|
||||
result = await defer.gatherResults([a1, a2])
|
||||
```
|
||||
|
||||
In this case particularly, though, option two, of using
|
||||
`context.run_in_background` almost certainly makes more sense, so that
|
||||
`operation1` and `operation2` are both logged against the original
|
||||
logcontext. This looks like:
|
||||
|
||||
```python
|
||||
async def do_request_handling():
|
||||
a1 = context.run_in_background(operation1)
|
||||
a2 = context.run_in_background(operation2)
|
||||
|
||||
result = await make_deferred_yieldable(defer.gatherResults([a1, a2]))
|
||||
result = await defer.gatherResults([a1, a2])
|
||||
```
|
||||
|
||||
## A note on garbage-collection of awaitable chains
|
||||
|
||||
It turns out that our logcontext rules do not play nicely with awaitable
|
||||
chains which get orphaned and garbage-collected.
|
||||
|
||||
Imagine we have some code that looks like this:
|
||||
|
||||
```python
|
||||
listener_queue = []
|
||||
|
||||
def on_something_interesting():
|
||||
for d in listener_queue:
|
||||
d.callback("foo")
|
||||
|
||||
async def await_something_interesting():
|
||||
new_awaitable = defer.Deferred()
|
||||
listener_queue.append(new_awaitable)
|
||||
|
||||
with PreserveLoggingContext():
|
||||
await new_awaitable
|
||||
```
|
||||
|
||||
Obviously, the idea here is that we have a bunch of things which are
|
||||
waiting for an event. (It's just an example of the problem here, but a
|
||||
relatively common one.)
|
||||
|
||||
Now let's imagine two further things happen. First of all, whatever was
|
||||
waiting for the interesting thing goes away. (Perhaps the request times
|
||||
out, or something *even more* interesting happens.)
|
||||
|
||||
Secondly, let's suppose that we decide that the interesting thing is
|
||||
never going to happen, and we reset the listener queue:
|
||||
|
||||
```python
|
||||
def reset_listener_queue():
|
||||
listener_queue.clear()
|
||||
```
|
||||
|
||||
So, both ends of the awaitable chain have now dropped their references,
|
||||
and the awaitable chain is now orphaned, and will be garbage-collected at
|
||||
some point. Note that `await_something_interesting` is a coroutine,
|
||||
which Python implements as a generator function. When Python
|
||||
garbage-collects generator functions, it gives them a chance to
|
||||
clean up by making the `await` (or `yield`) raise a `GeneratorExit`
|
||||
exception. In our case, that means that the `__exit__` handler of
|
||||
`PreserveLoggingContext` will carefully restore the request context, but
|
||||
there is now nothing waiting for its return, so the request context is
|
||||
never cleared.
|
||||
|
||||
To reiterate, this problem only arises when *both* ends of a awaitable
|
||||
chain are dropped. Dropping the the reference to an awaitable you're
|
||||
supposed to be awaiting is bad practice, so this doesn't
|
||||
actually happen too much. Unfortunately, when it does happen, it will
|
||||
lead to leaked logcontexts which are incredibly hard to track down.
|
||||
`background_process_metrics.run_as_background_process` also exists if you want some
|
||||
automatic tracing and metrics for the background task.
|
||||
|
||||
@@ -86,12 +86,5 @@ import synapse.util # noqa: E402
|
||||
|
||||
__version__ = synapse.util.SYNAPSE_VERSION
|
||||
|
||||
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
||||
# We import here so that we don't have to install a bunch of deps when
|
||||
# running the packaging tox test.
|
||||
from synapse.util.patch_inline_callbacks import do_patch
|
||||
|
||||
do_patch()
|
||||
|
||||
|
||||
check_rust_lib_up_to_date()
|
||||
|
||||
@@ -601,6 +601,12 @@ async def start(hs: "HomeServer") -> None:
|
||||
hs.get_datastores().main.db_pool.start_profiling()
|
||||
hs.get_pusherpool().start()
|
||||
|
||||
# Register background tasks required by this server. This must be done
|
||||
# somewhat manually due to the background tasks not being registered
|
||||
# unless handlers are instantiated.
|
||||
if hs.config.worker.run_background_tasks:
|
||||
hs.start_background_tasks()
|
||||
|
||||
# Log when we start the shut down process.
|
||||
hs.get_reactor().addSystemEventTrigger(
|
||||
"before", "shutdown", logger.info, "Shutting down..."
|
||||
|
||||
@@ -34,6 +34,7 @@ import logging
|
||||
import threading
|
||||
import typing
|
||||
import warnings
|
||||
from contextvars import ContextVar
|
||||
from types import TracebackType
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
@@ -653,13 +654,12 @@ class PreserveLoggingContext:
|
||||
)
|
||||
|
||||
|
||||
_thread_local = threading.local()
|
||||
_thread_local.current_context = SENTINEL_CONTEXT
|
||||
_current_context: ContextVar[LoggingContextOrSentinel] = ContextVar("current_context")
|
||||
|
||||
|
||||
def current_context() -> LoggingContextOrSentinel:
|
||||
"""Get the current logging context from thread local storage"""
|
||||
return getattr(_thread_local, "current_context", SENTINEL_CONTEXT)
|
||||
return _current_context.get(SENTINEL_CONTEXT)
|
||||
|
||||
|
||||
def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSentinel:
|
||||
@@ -680,7 +680,7 @@ def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSe
|
||||
if current is not context:
|
||||
rusage = get_thread_resource_usage()
|
||||
current.stop(rusage)
|
||||
_thread_local.current_context = context
|
||||
_current_context.set(context)
|
||||
context.start(rusage)
|
||||
|
||||
return current
|
||||
@@ -796,7 +796,6 @@ def run_in_background(
|
||||
CRITICAL error about an unhandled error will be logged without much
|
||||
indication about where it came from.
|
||||
"""
|
||||
current = current_context()
|
||||
try:
|
||||
res = f(*args, **kwargs)
|
||||
except Exception:
|
||||
@@ -825,23 +824,6 @@ def run_in_background(
|
||||
# optimise out the messing about
|
||||
return d
|
||||
|
||||
# The function may have reset the context before returning, so
|
||||
# we need to restore it now.
|
||||
ctx = set_current_context(current)
|
||||
|
||||
# The original context will be restored when the deferred
|
||||
# completes, but there is nothing waiting for it, so it will
|
||||
# get leaked into the reactor or some other function which
|
||||
# wasn't expecting it. We therefore need to reset the context
|
||||
# here.
|
||||
#
|
||||
# (If this feels asymmetric, consider it this way: we are
|
||||
# effectively forking a new thread of execution. We are
|
||||
# probably currently within a ``with LoggingContext()`` block,
|
||||
# which is supposed to have a single entry and exit point. But
|
||||
# by spawning off another deferred, we are effectively
|
||||
# adding a new exit point.)
|
||||
d.addBoth(_set_context_cb, ctx)
|
||||
return d
|
||||
|
||||
|
||||
@@ -861,65 +843,20 @@ def run_coroutine_in_background(
|
||||
cannot change the log contexts.
|
||||
"""
|
||||
|
||||
current = current_context()
|
||||
d = defer.ensureDeferred(coroutine)
|
||||
|
||||
# The function may have reset the context before returning, so
|
||||
# we need to restore it now.
|
||||
ctx = set_current_context(current)
|
||||
|
||||
# The original context will be restored when the deferred
|
||||
# completes, but there is nothing waiting for it, so it will
|
||||
# get leaked into the reactor or some other function which
|
||||
# wasn't expecting it. We therefore need to reset the context
|
||||
# here.
|
||||
#
|
||||
# (If this feels asymmetric, consider it this way: we are
|
||||
# effectively forking a new thread of execution. We are
|
||||
# probably currently within a ``with LoggingContext()`` block,
|
||||
# which is supposed to have a single entry and exit point. But
|
||||
# by spawning off another deferred, we are effectively
|
||||
# adding a new exit point.)
|
||||
d.addBoth(_set_context_cb, ctx)
|
||||
return d
|
||||
return defer.ensureDeferred(coroutine)
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
# TODO: This function is a no-op now and should be removed in a follow-up PR.
|
||||
def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
|
||||
"""Given a deferred, make it follow the Synapse logcontext rules:
|
||||
|
||||
If the deferred has completed, essentially does nothing (just returns another
|
||||
completed deferred with the result/failure).
|
||||
|
||||
If the deferred has not yet completed, resets the logcontext before
|
||||
returning a deferred. Then, when the deferred completes, restores the
|
||||
current logcontext before running callbacks/errbacks.
|
||||
|
||||
(This is more-or-less the opposite operation to run_in_background.)
|
||||
"""
|
||||
if deferred.called and not deferred.paused:
|
||||
# it looks like this deferred is ready to run any callbacks we give it
|
||||
# immediately. We may as well optimise out the logcontext faffery.
|
||||
return deferred
|
||||
|
||||
# ok, we can't be sure that a yield won't block, so let's reset the
|
||||
# logcontext, and add a callback to the deferred to restore it.
|
||||
prev_context = set_current_context(SENTINEL_CONTEXT)
|
||||
deferred.addBoth(_set_context_cb, prev_context)
|
||||
return deferred
|
||||
|
||||
|
||||
ResultT = TypeVar("ResultT")
|
||||
|
||||
|
||||
def _set_context_cb(result: ResultT, context: LoggingContextOrSentinel) -> ResultT:
|
||||
"""A callback function which just sets the logging context"""
|
||||
set_current_context(context)
|
||||
return result
|
||||
|
||||
|
||||
def defer_to_thread(
|
||||
reactor: "ISynapseReactor", f: Callable[P, R], *args: P.args, **kwargs: P.kwargs
|
||||
) -> "defer.Deferred[R]":
|
||||
@@ -931,9 +868,6 @@ def defer_to_thread(
|
||||
logcontext (so its CPU usage metrics will get attributed to the current
|
||||
logcontext). `f` should preserve the logcontext it is given.
|
||||
|
||||
The result deferred follows the Synapse logcontext rules: you should `yield`
|
||||
on it.
|
||||
|
||||
Args:
|
||||
reactor: The reactor in whose main thread the Deferred will be invoked,
|
||||
and whose threadpool we should use for the function.
|
||||
@@ -971,9 +905,6 @@ def defer_to_threadpool(
|
||||
logcontext (so its CPU usage metrics will get attributed to the current
|
||||
logcontext). `f` should preserve the logcontext it is given.
|
||||
|
||||
The result deferred follows the Synapse logcontext rules: you should `yield`
|
||||
on it.
|
||||
|
||||
Args:
|
||||
reactor: The reactor in whose main thread the Deferred will be invoked.
|
||||
Normally this will be hs.get_reactor().
|
||||
@@ -991,18 +922,6 @@ def defer_to_threadpool(
|
||||
A Deferred which fires a callback with the result of `f`, or an
|
||||
errback if `f` throws an exception.
|
||||
"""
|
||||
curr_context = current_context()
|
||||
if not curr_context:
|
||||
logger.warning(
|
||||
"Calling defer_to_threadpool from sentinel context: metrics will be lost"
|
||||
)
|
||||
parent_context = None
|
||||
else:
|
||||
assert isinstance(curr_context, LoggingContext)
|
||||
parent_context = curr_context
|
||||
|
||||
def g() -> R:
|
||||
with LoggingContext(str(curr_context), parent_context=parent_context):
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g))
|
||||
return make_deferred_yieldable(
|
||||
threads.deferToThreadPool(reactor, threadpool, f, *args, **kwargs)
|
||||
)
|
||||
|
||||
@@ -223,10 +223,9 @@ def run_as_background_process(
|
||||
This should be used to wrap processes which are fired off to run in the
|
||||
background, instead of being associated with a particular request.
|
||||
|
||||
It returns a Deferred which completes when the function completes, but it doesn't
|
||||
follow the synapse logcontext rules, which makes it appropriate for passing to
|
||||
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
|
||||
normal synapse async function).
|
||||
It returns a Deferred which completes when the function completes, which makes it
|
||||
appropriate for passing to clock.looping_call and friends (or for
|
||||
firing-and-forgetting in the middle of a normal synapse async function).
|
||||
|
||||
Args:
|
||||
desc: a description for this background process type
|
||||
@@ -241,8 +240,6 @@ def run_as_background_process(
|
||||
|
||||
Returns:
|
||||
Deferred which returns the result of func, or `None` if func raises.
|
||||
Note that the returned Deferred does not follow the synapse logcontext
|
||||
rules.
|
||||
"""
|
||||
|
||||
async def run() -> Optional[R]:
|
||||
|
||||
@@ -237,10 +237,9 @@ def run_as_background_process(
|
||||
This should be used to wrap processes which are fired off to run in the
|
||||
background, instead of being associated with a particular request.
|
||||
|
||||
It returns a Deferred which completes when the function completes, but it doesn't
|
||||
follow the synapse logcontext rules, which makes it appropriate for passing to
|
||||
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
|
||||
normal synapse async function).
|
||||
It returns a Deferred which completes when the function completes, which makes it
|
||||
appropriate for passing to clock.looping_call and friends (or for
|
||||
firing-and-forgetting in the middle of a normal synapse async function).
|
||||
|
||||
Args:
|
||||
desc: a description for this background process type
|
||||
@@ -255,8 +254,6 @@ def run_as_background_process(
|
||||
|
||||
Returns:
|
||||
Deferred which returns the result of func, or `None` if func raises.
|
||||
Note that the returned Deferred does not follow the synapse logcontext
|
||||
rules.
|
||||
"""
|
||||
|
||||
logger.warning(
|
||||
@@ -1375,9 +1372,7 @@ class ModuleApi:
|
||||
|
||||
Args:
|
||||
f: The function to call repeatedly. f can be either synchronous or
|
||||
asynchronous, and must follow Synapse's logcontext rules.
|
||||
More info about logcontexts is available at
|
||||
https://element-hq.github.io/synapse/latest/log_contexts.html
|
||||
asynchronous.
|
||||
msec: How long to wait between calls in milliseconds.
|
||||
*args: Positional arguments to pass to function.
|
||||
desc: The background task's description. Default to the function's name.
|
||||
@@ -1431,9 +1426,7 @@ class ModuleApi:
|
||||
Args:
|
||||
msec: How long to wait before calling, in milliseconds.
|
||||
f: The function to call once. f can be either synchronous or
|
||||
asynchronous, and must follow Synapse's logcontext rules.
|
||||
More info about logcontexts is available at
|
||||
https://element-hq.github.io/synapse/latest/log_contexts.html
|
||||
asynchronous.
|
||||
*args: Positional arguments to pass to function.
|
||||
desc: The background task's description. Default to the function's name.
|
||||
**kwargs: Keyword arguments to pass to function.
|
||||
@@ -1668,10 +1661,9 @@ class ModuleApi:
|
||||
This should be used to wrap processes which are fired off to run in the
|
||||
background, instead of being associated with a particular request.
|
||||
|
||||
It returns a Deferred which completes when the function completes, but it doesn't
|
||||
follow the synapse logcontext rules, which makes it appropriate for passing to
|
||||
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
|
||||
normal synapse async function).
|
||||
It returns a Deferred which completes when the function completes, which makes
|
||||
it appropriate for passing to clock.looping_call and friends (or for
|
||||
firing-and-forgetting in the middle of a normal synapse async function).
|
||||
|
||||
Args:
|
||||
desc: a description for this background process type
|
||||
@@ -1686,8 +1678,6 @@ class ModuleApi:
|
||||
|
||||
Returns:
|
||||
Deferred which returns the result of func, or `None` if func raises.
|
||||
Note that the returned Deferred does not follow the synapse logcontext
|
||||
rules.
|
||||
"""
|
||||
return _run_as_background_process(
|
||||
desc, self.server_name, func, *args, bg_start_span=bg_start_span, **kwargs
|
||||
|
||||
@@ -366,12 +366,6 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
self.datastores = Databases(self.DATASTORE_CLASS, self)
|
||||
logger.info("Finished setting up.")
|
||||
|
||||
# Register background tasks required by this server. This must be done
|
||||
# somewhat manually due to the background tasks not being registered
|
||||
# unless handlers are instantiated.
|
||||
if self.config.worker.run_background_tasks:
|
||||
self.setup_background_tasks()
|
||||
|
||||
def __del__(self) -> None:
|
||||
"""
|
||||
Called when an the homeserver is garbage collected.
|
||||
@@ -410,7 +404,7 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
appropriate listeners.
|
||||
"""
|
||||
|
||||
def setup_background_tasks(self) -> None:
|
||||
def start_background_tasks(self) -> None:
|
||||
"""
|
||||
Some handlers have side effects on instantiation (like registering
|
||||
background updates). This function causes them to be fetched, and
|
||||
|
||||
@@ -84,9 +84,6 @@ class AbstractObservableDeferred(Generic[_T], metaclass=abc.ABCMeta):
|
||||
This returns a brand new deferred that is resolved when the underlying
|
||||
deferred is resolved. Interacting with the returned deferred does not
|
||||
effect the underlying deferred.
|
||||
|
||||
Note that the returned Deferred doesn't follow the Synapse logcontext rules -
|
||||
you will probably want to `make_deferred_yieldable` it.
|
||||
"""
|
||||
...
|
||||
|
||||
@@ -100,11 +97,6 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
|
||||
|
||||
Cancelling or otherwise resolving an observer will not affect the original
|
||||
ObservableDeferred.
|
||||
|
||||
NB that it does not attempt to do anything with logcontexts; in general
|
||||
you should probably make_deferred_yieldable the deferreds
|
||||
returned by `observe`, and ensure that the original deferred runs its
|
||||
callbacks in the sentinel logcontext.
|
||||
"""
|
||||
|
||||
__slots__ = ["_deferred", "_observers", "_result"]
|
||||
@@ -861,16 +853,12 @@ def stop_cancellation(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
|
||||
"""Prevent a `Deferred` from being cancelled by wrapping it in another `Deferred`.
|
||||
|
||||
Args:
|
||||
deferred: The `Deferred` to protect against cancellation. Must not follow the
|
||||
Synapse logcontext rules.
|
||||
deferred: The `Deferred` to protect against cancellation.
|
||||
|
||||
Returns:
|
||||
A new `Deferred`, which will contain the result of the original `Deferred`.
|
||||
The new `Deferred` will not propagate cancellation through to the original.
|
||||
When cancelled, the new `Deferred` will fail with a `CancelledError`.
|
||||
|
||||
The new `Deferred` will not follow the Synapse logcontext rules and should be
|
||||
wrapped with `make_deferred_yieldable`.
|
||||
"""
|
||||
new_deferred: "defer.Deferred[T]" = defer.Deferred()
|
||||
deferred.chainDeferred(new_deferred)
|
||||
@@ -896,8 +884,7 @@ def delay_cancellation(awaitable: Awaitable[T]) -> Awaitable[T]:
|
||||
resolve with a `CancelledError` until the original awaitable resolves.
|
||||
|
||||
Args:
|
||||
deferred: The coroutine or `Deferred` to protect against cancellation. May
|
||||
optionally follow the Synapse logcontext rules.
|
||||
deferred: The coroutine or `Deferred` to protect against cancellation.
|
||||
|
||||
Returns:
|
||||
A new `Deferred`, which will contain the result of the original coroutine or
|
||||
@@ -906,10 +893,6 @@ def delay_cancellation(awaitable: Awaitable[T]) -> Awaitable[T]:
|
||||
|
||||
When cancelled, the new `Deferred` will wait until the original coroutine or
|
||||
`Deferred` resolves before failing with a `CancelledError`.
|
||||
|
||||
The new `Deferred` will follow the Synapse logcontext rules if `awaitable`
|
||||
follows the Synapse logcontext rules. Otherwise the new `Deferred` should be
|
||||
wrapped with `make_deferred_yieldable`.
|
||||
"""
|
||||
|
||||
# First, convert the awaitable into a `Deferred`.
|
||||
|
||||
@@ -295,9 +295,6 @@ class DeferredCache(Generic[KT, VT]):
|
||||
*original* `value`, (c) any future calls to `get()` will complete with the
|
||||
result from the *new* `value`.
|
||||
|
||||
It is expected that `value` does *not* follow the synapse logcontext rules - ie,
|
||||
if it is incomplete, it runs its callbacks in the sentinel context.
|
||||
|
||||
Args:
|
||||
key: Key to be set
|
||||
value: a deferred which will complete with a result to add to the cache
|
||||
|
||||
@@ -234,11 +234,9 @@ class ResponseCache(Generic[KV]):
|
||||
) -> RV:
|
||||
"""Wrap together a *get* and *set* call, taking care of logcontexts
|
||||
|
||||
First looks up the key in the cache, and if it is present makes it
|
||||
follow the synapse logcontext rules and returns it.
|
||||
First looks up the key in the cache, and if present, returns it.
|
||||
|
||||
Otherwise, makes a call to *callback(*args, **kwargs)*, which should
|
||||
follow the synapse logcontext rules, and adds the result to the cache.
|
||||
Otherwise, makes a call to *callback(*args, **kwargs)* and adds the result to the cache.
|
||||
|
||||
Example usage:
|
||||
|
||||
|
||||
@@ -1,250 +0,0 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2023 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
# Originally licensed under the Apache License, Version 2.0:
|
||||
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
||||
#
|
||||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
|
||||
import functools
|
||||
import sys
|
||||
from types import GeneratorType
|
||||
from typing import Any, Callable, Generator, List, TypeVar, cast
|
||||
|
||||
from typing_extensions import ParamSpec
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import Deferred
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
# Tracks if we've already patched inlineCallbacks
|
||||
_already_patched = False
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
P = ParamSpec("P")
|
||||
|
||||
|
||||
def do_patch() -> None:
|
||||
"""
|
||||
Patch defer.inlineCallbacks so that it checks the state of the logcontext on exit
|
||||
"""
|
||||
|
||||
from synapse.logging.context import current_context
|
||||
|
||||
global _already_patched
|
||||
|
||||
orig_inline_callbacks = defer.inlineCallbacks
|
||||
if _already_patched:
|
||||
return
|
||||
|
||||
def new_inline_callbacks(
|
||||
f: Callable[P, Generator["Deferred[object]", object, T]],
|
||||
) -> Callable[P, "Deferred[T]"]:
|
||||
@functools.wraps(f)
|
||||
def wrapped(*args: P.args, **kwargs: P.kwargs) -> "Deferred[T]":
|
||||
start_context = current_context()
|
||||
changes: List[str] = []
|
||||
orig: Callable[P, "Deferred[T]"] = orig_inline_callbacks(
|
||||
_check_yield_points(f, changes)
|
||||
)
|
||||
|
||||
try:
|
||||
res: "Deferred[T]" = orig(*args, **kwargs)
|
||||
except Exception:
|
||||
if current_context() != start_context:
|
||||
for err in changes:
|
||||
print(err, file=sys.stderr)
|
||||
|
||||
err = "%s changed context from %s to %s on exception" % (
|
||||
f,
|
||||
start_context,
|
||||
current_context(),
|
||||
)
|
||||
print(err, file=sys.stderr)
|
||||
raise Exception(err)
|
||||
raise
|
||||
|
||||
if not isinstance(res, Deferred) or res.called:
|
||||
if current_context() != start_context:
|
||||
for err in changes:
|
||||
print(err, file=sys.stderr)
|
||||
|
||||
err = "Completed %s changed context from %s to %s" % (
|
||||
f,
|
||||
start_context,
|
||||
current_context(),
|
||||
)
|
||||
# print the error to stderr because otherwise all we
|
||||
# see in travis-ci is the 500 error
|
||||
print(err, file=sys.stderr)
|
||||
raise Exception(err)
|
||||
return res
|
||||
|
||||
if current_context():
|
||||
err = (
|
||||
"%s returned incomplete deferred in non-sentinel context "
|
||||
"%s (start was %s)"
|
||||
) % (f, current_context(), start_context)
|
||||
print(err, file=sys.stderr)
|
||||
raise Exception(err)
|
||||
|
||||
def check_ctx(r: T) -> T:
|
||||
if current_context() != start_context:
|
||||
for err in changes:
|
||||
print(err, file=sys.stderr)
|
||||
err = "%s completion of %s changed context from %s to %s" % (
|
||||
"Failure" if isinstance(r, Failure) else "Success",
|
||||
f,
|
||||
start_context,
|
||||
current_context(),
|
||||
)
|
||||
print(err, file=sys.stderr)
|
||||
raise Exception(err)
|
||||
return r
|
||||
|
||||
res.addBoth(check_ctx)
|
||||
return res
|
||||
|
||||
return wrapped
|
||||
|
||||
defer.inlineCallbacks = new_inline_callbacks
|
||||
_already_patched = True
|
||||
|
||||
|
||||
def _check_yield_points(
|
||||
f: Callable[P, Generator["Deferred[object]", object, T]],
|
||||
changes: List[str],
|
||||
) -> Callable:
|
||||
"""Wraps a generator that is about to be passed to defer.inlineCallbacks
|
||||
checking that after every yield the log contexts are correct.
|
||||
|
||||
It's perfectly valid for log contexts to change within a function, e.g. due
|
||||
to new Measure blocks, so such changes are added to the given `changes`
|
||||
list instead of triggering an exception.
|
||||
|
||||
Args:
|
||||
f: generator function to wrap
|
||||
changes: A list of strings detailing how the contexts
|
||||
changed within a function.
|
||||
|
||||
Returns:
|
||||
function
|
||||
"""
|
||||
|
||||
from synapse.logging.context import current_context
|
||||
|
||||
@functools.wraps(f)
|
||||
def check_yield_points_inner(
|
||||
*args: P.args, **kwargs: P.kwargs
|
||||
) -> Generator["Deferred[object]", object, T]:
|
||||
gen = f(*args, **kwargs)
|
||||
|
||||
# We only patch if we have a native generator function, as we rely on
|
||||
# `gen.gi_frame`.
|
||||
if not isinstance(gen, GeneratorType):
|
||||
ret = yield from gen
|
||||
return ret
|
||||
|
||||
last_yield_line_no = gen.gi_frame.f_lineno
|
||||
result: Any = None
|
||||
while True:
|
||||
expected_context = current_context()
|
||||
|
||||
try:
|
||||
isFailure = isinstance(result, Failure)
|
||||
if isFailure:
|
||||
d = result.throwExceptionIntoGenerator(gen)
|
||||
else:
|
||||
d = gen.send(result)
|
||||
except StopIteration as e:
|
||||
if current_context() != expected_context:
|
||||
# This happens when the context is lost sometime *after* the
|
||||
# final yield and returning. E.g. we forgot to yield on a
|
||||
# function that returns a deferred.
|
||||
#
|
||||
# We don't raise here as it's perfectly valid for contexts to
|
||||
# change in a function, as long as it sets the correct context
|
||||
# on resolving (which is checked separately).
|
||||
err = (
|
||||
"Function %r returned and changed context from %s to %s,"
|
||||
" in %s between %d and end of func"
|
||||
% (
|
||||
f.__qualname__,
|
||||
expected_context,
|
||||
current_context(),
|
||||
f.__code__.co_filename,
|
||||
last_yield_line_no,
|
||||
)
|
||||
)
|
||||
changes.append(err)
|
||||
# The `StopIteration` contains the return value from the
|
||||
# generator.
|
||||
return cast(T, e.value)
|
||||
|
||||
frame = gen.gi_frame
|
||||
|
||||
if isinstance(d, defer.Deferred) and not d.called:
|
||||
# This happens if we yield on a deferred that doesn't follow
|
||||
# the log context rules without wrapping in a `make_deferred_yieldable`.
|
||||
# We raise here as this should never happen.
|
||||
if current_context():
|
||||
err = (
|
||||
"%s yielded with context %s rather than sentinel,"
|
||||
" yielded on line %d in %s"
|
||||
% (
|
||||
frame.f_code.co_name,
|
||||
current_context(),
|
||||
frame.f_lineno,
|
||||
frame.f_code.co_filename,
|
||||
)
|
||||
)
|
||||
raise Exception(err)
|
||||
|
||||
# the wrapped function yielded a Deferred: yield it back up to the parent
|
||||
# inlineCallbacks().
|
||||
try:
|
||||
result = yield d
|
||||
except Exception:
|
||||
# this will fish an earlier Failure out of the stack where possible, and
|
||||
# thus is preferable to passing in an exception to the Failure
|
||||
# constructor, since it results in less stack-mangling.
|
||||
result = Failure()
|
||||
|
||||
if current_context() != expected_context:
|
||||
# This happens because the context is lost sometime *after* the
|
||||
# previous yield and *after* the current yield. E.g. the
|
||||
# deferred we waited on didn't follow the rules, or we forgot to
|
||||
# yield on a function between the two yield points.
|
||||
#
|
||||
# We don't raise here as its perfectly valid for contexts to
|
||||
# change in a function, as long as it sets the correct context
|
||||
# on resolving (which is checked separately).
|
||||
err = (
|
||||
"%s changed context from %s to %s, happened between lines %d and %d in %s"
|
||||
% (
|
||||
frame.f_code.co_name,
|
||||
expected_context,
|
||||
current_context(),
|
||||
last_yield_line_no,
|
||||
frame.f_lineno,
|
||||
frame.f_code.co_filename,
|
||||
)
|
||||
)
|
||||
changes.append(err)
|
||||
|
||||
last_yield_line_no = frame.f_lineno
|
||||
|
||||
return check_yield_points_inner
|
||||
@@ -21,9 +21,4 @@
|
||||
|
||||
from twisted.trial import util
|
||||
|
||||
from synapse.util.patch_inline_callbacks import do_patch
|
||||
|
||||
# attempt to do the patch before we load any synapse code
|
||||
do_patch()
|
||||
|
||||
util.DEFAULT_TIMEOUT_DURATION = 20
|
||||
|
||||
@@ -355,7 +355,7 @@ class DescriptorTestCase(unittest.TestCase):
|
||||
d = obj.fn(1)
|
||||
self.assertEqual(
|
||||
current_context(),
|
||||
SENTINEL_CONTEXT,
|
||||
c1,
|
||||
)
|
||||
yield d
|
||||
self.fail("No exception thrown")
|
||||
@@ -849,7 +849,7 @@ class CachedListDescriptorTestCase(unittest.TestCase):
|
||||
|
||||
# start the lookup off
|
||||
d1 = obj.list_fn([10, 20], 2)
|
||||
self.assertEqual(current_context(), SENTINEL_CONTEXT)
|
||||
self.assertEqual(current_context(), c1)
|
||||
r = yield d1
|
||||
self.assertEqual(current_context(), c1)
|
||||
obj.mock.assert_called_once_with({10, 20}, 2)
|
||||
|
||||
Reference in New Issue
Block a user