mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-17 02:10:27 +00:00
Compare commits
9 Commits
travis/cre
...
madlittlem
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6ce2f3e59d | ||
|
|
93044f4c5b | ||
|
|
c7a80b63ec | ||
|
|
1f384b0e21 | ||
|
|
3e66e0a1b8 | ||
|
|
0c8759bbb6 | ||
|
|
4303879cfe | ||
|
|
3742b3b3fb | ||
|
|
224cb3f827 |
1
changelog.d/18871.misc
Normal file
1
changelog.d/18871.misc
Normal file
@@ -0,0 +1 @@
|
|||||||
|
Store the `LoggingContext` in a `ContextVar` instead of a thread-local variable.
|
||||||
@@ -1,12 +1,12 @@
|
|||||||
# Log Contexts
|
# Log Contexts
|
||||||
|
|
||||||
To help track the processing of individual requests, synapse uses a
|
To help track the processing of individual requests, synapse uses a
|
||||||
'`log context`' to track which request it is handling at any given
|
`LoggingContext` to track which request it is handling at any given
|
||||||
moment. This is done via a thread-local variable; a `logging.Filter` is
|
moment. This is done via a `ContextVar` variable; a `logging.Filter` is
|
||||||
then used to fish the information back out of the thread-local variable
|
then used to fish the information back out of the `ContextVar` variable
|
||||||
and add it to each log record.
|
and add it to each log record.
|
||||||
|
|
||||||
Logcontexts are also used for CPU and database accounting, so that we
|
Log contexts are also used for CPU and database accounting, so that we
|
||||||
can track which requests were responsible for high CPU use or database
|
can track which requests were responsible for high CPU use or database
|
||||||
activity.
|
activity.
|
||||||
|
|
||||||
@@ -14,18 +14,11 @@ The `synapse.logging.context` module provides facilities for managing
|
|||||||
the current log context (as well as providing the `LoggingContextFilter`
|
the current log context (as well as providing the `LoggingContextFilter`
|
||||||
class).
|
class).
|
||||||
|
|
||||||
Asynchronous functions make the whole thing complicated, so this document describes
|
In this document, "awaitable" refers to any object which can be `await`ed. In the
|
||||||
how it all works, and how to write code which follows the rules.
|
context of Synapse, that normally means either a coroutine or a Twisted
|
||||||
|
|
||||||
In this document, "awaitable" refers to any object which can be `await`ed. In the context of
|
|
||||||
Synapse, that normally means either a coroutine or a Twisted
|
|
||||||
[`Deferred`](https://twistedmatrix.com/documents/current/api/twisted.internet.defer.Deferred.html).
|
[`Deferred`](https://twistedmatrix.com/documents/current/api/twisted.internet.defer.Deferred.html).
|
||||||
|
|
||||||
## Logcontexts without asynchronous code
|
## Basic usage
|
||||||
|
|
||||||
In the absence of any asynchronous voodoo, things are simple enough. As with
|
|
||||||
any code of this nature, the rule is that our function should leave
|
|
||||||
things as it found them:
|
|
||||||
|
|
||||||
```python
|
```python
|
||||||
from synapse.logging import context # omitted from future snippets
|
from synapse.logging import context # omitted from future snippets
|
||||||
@@ -45,7 +38,7 @@ def do_request_handling():
|
|||||||
logger.debug("phew") # this will be logged against request_id
|
logger.debug("phew") # this will be logged against request_id
|
||||||
```
|
```
|
||||||
|
|
||||||
LoggingContext implements the context management methods, so the above
|
`LoggingContext` implements the context management methods, so the above
|
||||||
can be written much more succinctly as:
|
can be written much more succinctly as:
|
||||||
|
|
||||||
```python
|
```python
|
||||||
@@ -59,197 +52,76 @@ def do_request_handling():
|
|||||||
logger.debug("phew")
|
logger.debug("phew")
|
||||||
```
|
```
|
||||||
|
|
||||||
## Using logcontexts with awaitables
|
### The `sentinel` context
|
||||||
|
|
||||||
Awaitables break the linear flow of code so that there is no longer a single entry point
|
The default context is `context.SENTINEL_CONTEXT`, which is a sentinel value to
|
||||||
where we should set the logcontext and a single exit point where we should remove it.
|
represent the root context. This is what is used when there is no other context set.
|
||||||
|
|
||||||
Consider the example above, where `do_request_handling` needs to do some
|
No CPU/database usage metrics are recorded against the `sentinel` context.
|
||||||
blocking operation, and returns an awaitable:
|
|
||||||
|
Ideally, nothing from the Synapse homeserver would be logged against the `sentinel`
|
||||||
|
context as we want to know where the logs came from. In practice, this is not always the
|
||||||
|
case yet especially outside of request handling.
|
||||||
|
|
||||||
|
Previously, the `sentinel` context played a bigger role when we had to carefully deal
|
||||||
|
with thread-local storage; as we had to make sure to not leak another context to another
|
||||||
|
task after we gave up control to the reactor so we set the
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
### `PreserveLoggingContext`
|
||||||
|
|
||||||
|
In a similar vein of no longer as relevant, `PreserveLoggingContext` is another context
|
||||||
|
manager helper and a little bit of syntactic sugar to set the current log context
|
||||||
|
(without finishing it) and restore the previous context on exit.
|
||||||
|
|
||||||
```python
|
```python
|
||||||
async def handle_request(request_id):
|
import logging
|
||||||
with context.LoggingContext() as request_context:
|
from synapse.logging.context import LoggingContext
|
||||||
request_context.request = request_id
|
|
||||||
await do_request_handling()
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
with context.LoggingContext("main"):
|
||||||
|
task_context = context.LoggingContext("task")
|
||||||
|
|
||||||
|
with task_context:
|
||||||
|
logger.debug("foo")
|
||||||
|
|
||||||
|
# Bad: will throw an error because `task_context` is already finished
|
||||||
|
with task_context:
|
||||||
|
logger.debug("bar")
|
||||||
|
|
||||||
logger.debug("finished")
|
logger.debug("finished")
|
||||||
```
|
```
|
||||||
|
|
||||||
In the above flow:
|
This can be fixed by using `PreserveLoggingContext`:
|
||||||
|
|
||||||
- The logcontext is set
|
|
||||||
- `do_request_handling` is called, and returns an awaitable
|
|
||||||
- `handle_request` awaits the awaitable
|
|
||||||
- Execution of `handle_request` is suspended
|
|
||||||
|
|
||||||
So we have stopped processing the request (and will probably go on to
|
|
||||||
start processing the next), without clearing the logcontext.
|
|
||||||
|
|
||||||
To circumvent this problem, synapse code assumes that, wherever you have
|
|
||||||
an awaitable, you will want to `await` it. To that end, wherever
|
|
||||||
functions return awaitables, we adopt the following conventions:
|
|
||||||
|
|
||||||
**Rules for functions returning awaitables:**
|
|
||||||
|
|
||||||
> - If the awaitable is already complete, the function returns with the
|
|
||||||
> same logcontext it started with.
|
|
||||||
> - If the awaitable is incomplete, the function clears the logcontext
|
|
||||||
> before returning; when the awaitable completes, it restores the
|
|
||||||
> logcontext before running any callbacks.
|
|
||||||
|
|
||||||
That sounds complicated, but actually it means a lot of code (including
|
|
||||||
the example above) "just works". There are two cases:
|
|
||||||
|
|
||||||
- If `do_request_handling` returns a completed awaitable, then the
|
|
||||||
logcontext will still be in place. In this case, execution will
|
|
||||||
continue immediately after the `await`; the "finished" line will
|
|
||||||
be logged against the right context, and the `with` block restores
|
|
||||||
the original context before we return to the caller.
|
|
||||||
- If the returned awaitable is incomplete, `do_request_handling` clears
|
|
||||||
the logcontext before returning. The logcontext is therefore clear
|
|
||||||
when `handle_request` `await`s the awaitable.
|
|
||||||
|
|
||||||
Once `do_request_handling`'s awaitable completes, it will reinstate
|
|
||||||
the logcontext, before running the second half of `handle_request`,
|
|
||||||
so again the "finished" line will be logged against the right context,
|
|
||||||
and the `with` block restores the original context.
|
|
||||||
|
|
||||||
As an aside, it's worth noting that `handle_request` follows our rules
|
|
||||||
- though that only matters if the caller has its own logcontext which it
|
|
||||||
cares about.
|
|
||||||
|
|
||||||
The following sections describe pitfalls and helpful patterns when
|
|
||||||
implementing these rules.
|
|
||||||
|
|
||||||
Always await your awaitables
|
|
||||||
----------------------------
|
|
||||||
|
|
||||||
Whenever you get an awaitable back from a function, you should `await` on
|
|
||||||
it as soon as possible. Do not pass go; do not do any logging; do not
|
|
||||||
call any other functions.
|
|
||||||
|
|
||||||
```python
|
```python
|
||||||
async def fun():
|
import logging
|
||||||
logger.debug("starting")
|
from synapse.logging.context import LoggingContext
|
||||||
await do_some_stuff() # just like this
|
|
||||||
|
|
||||||
coro = more_stuff()
|
logger = logging.getLogger(__name__)
|
||||||
result = await coro # also fine, of course
|
|
||||||
|
|
||||||
return result
|
def main() -> None:
|
||||||
|
with context.LoggingContext("main"):
|
||||||
|
task_context = context.LoggingContext("task")
|
||||||
|
|
||||||
|
with PreserveLoggingContext(task_context):
|
||||||
|
logger.debug("foo")
|
||||||
|
with PreserveLoggingContext(task_context):
|
||||||
|
logger.debug("bar")
|
||||||
|
|
||||||
|
logger.debug("finished") # this will be logged against main
|
||||||
```
|
```
|
||||||
|
|
||||||
Provided this pattern is followed all the way back up to the callchain
|
Or you could equivalently just manage the log context manually via
|
||||||
to where the logcontext was set, this will make things work out ok:
|
`set_current_context`.
|
||||||
provided `do_some_stuff` and `more_stuff` follow the rules above, then
|
|
||||||
so will `fun`.
|
|
||||||
|
|
||||||
It's all too easy to forget to `await`: for instance if we forgot that
|
|
||||||
`do_some_stuff` returned an awaitable, we might plough on regardless. This
|
|
||||||
leads to a mess; it will probably work itself out eventually, but not
|
|
||||||
before a load of stuff has been logged against the wrong context.
|
|
||||||
(Normally, other things will break, more obviously, if you forget to
|
|
||||||
`await`, so this tends not to be a major problem in practice.)
|
|
||||||
|
|
||||||
Of course sometimes you need to do something a bit fancier with your
|
|
||||||
awaitable - not all code follows the linear A-then-B-then-C pattern.
|
|
||||||
Notes on implementing more complex patterns are in later sections.
|
|
||||||
|
|
||||||
## Where you create a new awaitable, make it follow the rules
|
|
||||||
|
|
||||||
Most of the time, an awaitable comes from another synapse function.
|
|
||||||
Sometimes, though, we need to make up a new awaitable, or we get an awaitable
|
|
||||||
back from external code. We need to make it follow our rules.
|
|
||||||
|
|
||||||
The easy way to do it is by using `context.make_deferred_yieldable`. Suppose we want to implement
|
|
||||||
`sleep`, which returns a deferred which will run its callbacks after a
|
|
||||||
given number of seconds. That might look like:
|
|
||||||
|
|
||||||
```python
|
|
||||||
# not a logcontext-rules-compliant function
|
|
||||||
def get_sleep_deferred(seconds):
|
|
||||||
d = defer.Deferred()
|
|
||||||
reactor.callLater(seconds, d.callback, None)
|
|
||||||
return d
|
|
||||||
```
|
|
||||||
|
|
||||||
That doesn't follow the rules, but we can fix it by calling it through
|
|
||||||
`context.make_deferred_yieldable`:
|
|
||||||
|
|
||||||
```python
|
|
||||||
async def sleep(seconds):
|
|
||||||
return await context.make_deferred_yieldable(get_sleep_deferred(seconds))
|
|
||||||
```
|
|
||||||
|
|
||||||
## Fire-and-forget
|
## Fire-and-forget
|
||||||
|
|
||||||
Sometimes you want to fire off a chain of execution, but not wait for
|
To drive an awaitable in the background, you can use `context.run_in_background`:
|
||||||
its result. That might look a bit like this:
|
|
||||||
|
|
||||||
```python
|
|
||||||
async def do_request_handling():
|
|
||||||
await foreground_operation()
|
|
||||||
|
|
||||||
# *don't* do this
|
|
||||||
background_operation()
|
|
||||||
|
|
||||||
logger.debug("Request handling complete")
|
|
||||||
|
|
||||||
async def background_operation():
|
|
||||||
await first_background_step()
|
|
||||||
logger.debug("Completed first step")
|
|
||||||
await second_background_step()
|
|
||||||
logger.debug("Completed second step")
|
|
||||||
```
|
|
||||||
|
|
||||||
The above code does a couple of steps in the background after
|
|
||||||
`do_request_handling` has finished. The log lines are still logged
|
|
||||||
against the `request_context` logcontext, which may or may not be
|
|
||||||
desirable. There are two big problems with the above, however. The first
|
|
||||||
problem is that, if `background_operation` returns an incomplete
|
|
||||||
awaitable, it will expect its caller to `await` immediately, so will have
|
|
||||||
cleared the logcontext. In this example, that means that 'Request
|
|
||||||
handling complete' will be logged without any context.
|
|
||||||
|
|
||||||
The second problem, which is potentially even worse, is that when the
|
|
||||||
awaitable returned by `background_operation` completes, it will restore
|
|
||||||
the original logcontext. There is nothing waiting on that awaitable, so
|
|
||||||
the logcontext will leak into the reactor and possibly get attached to
|
|
||||||
some arbitrary future operation.
|
|
||||||
|
|
||||||
There are two potential solutions to this.
|
|
||||||
|
|
||||||
One option is to surround the call to `background_operation` with a
|
|
||||||
`PreserveLoggingContext` call. That will reset the logcontext before
|
|
||||||
starting `background_operation` (so the context restored when the
|
|
||||||
deferred completes will be the empty logcontext), and will restore the
|
|
||||||
current logcontext before continuing the foreground process:
|
|
||||||
|
|
||||||
```python
|
|
||||||
async def do_request_handling():
|
|
||||||
await foreground_operation()
|
|
||||||
|
|
||||||
# start background_operation off in the empty logcontext, to
|
|
||||||
# avoid leaking the current context into the reactor.
|
|
||||||
with PreserveLoggingContext():
|
|
||||||
background_operation()
|
|
||||||
|
|
||||||
# this will now be logged against the request context
|
|
||||||
logger.debug("Request handling complete")
|
|
||||||
```
|
|
||||||
|
|
||||||
Obviously that option means that the operations done in
|
|
||||||
`background_operation` would be not be logged against a logcontext
|
|
||||||
(though that might be fixed by setting a different logcontext via a
|
|
||||||
`with LoggingContext(...)` in `background_operation`).
|
|
||||||
|
|
||||||
The second option is to use `context.run_in_background`, which wraps a
|
|
||||||
function so that it doesn't reset the logcontext even when it returns
|
|
||||||
an incomplete awaitable, and adds a callback to the returned awaitable to
|
|
||||||
reset the logcontext. In other words, it turns a function that follows
|
|
||||||
the Synapse rules about logcontexts and awaitables into one which behaves
|
|
||||||
more like an external function --- the opposite operation to that
|
|
||||||
described in the previous section. It can be used like this:
|
|
||||||
|
|
||||||
```python
|
```python
|
||||||
async def do_request_handling():
|
async def do_request_handling():
|
||||||
@@ -261,104 +133,13 @@ async def do_request_handling():
|
|||||||
logger.debug("Request handling complete")
|
logger.debug("Request handling complete")
|
||||||
```
|
```
|
||||||
|
|
||||||
## Passing synapse deferreds into third-party functions
|
|
||||||
|
|
||||||
A typical example of this is where we want to collect together two or
|
|
||||||
more awaitables via `defer.gatherResults`:
|
|
||||||
|
|
||||||
```python
|
|
||||||
a1 = operation1()
|
|
||||||
a2 = operation2()
|
|
||||||
a3 = defer.gatherResults([a1, a2])
|
|
||||||
```
|
|
||||||
|
|
||||||
This is really a variation of the fire-and-forget problem above, in that
|
|
||||||
we are firing off `a1` and `a2` without awaiting on them. The difference
|
|
||||||
is that we now have third-party code attached to their callbacks. Anyway
|
|
||||||
either technique given in the [Fire-and-forget](#fire-and-forget)
|
|
||||||
section will work.
|
|
||||||
|
|
||||||
Of course, the new awaitable returned by `gather` needs to be
|
|
||||||
wrapped in order to make it follow the logcontext rules before we can
|
|
||||||
yield it, as described in [Where you create a new awaitable, make it
|
|
||||||
follow the
|
|
||||||
rules](#where-you-create-a-new-awaitable-make-it-follow-the-rules).
|
|
||||||
|
|
||||||
So, option one: reset the logcontext before starting the operations to
|
|
||||||
be gathered:
|
|
||||||
|
|
||||||
```python
|
|
||||||
async def do_request_handling():
|
|
||||||
with PreserveLoggingContext():
|
|
||||||
a1 = operation1()
|
|
||||||
a2 = operation2()
|
|
||||||
result = await defer.gatherResults([a1, a2])
|
|
||||||
```
|
|
||||||
|
|
||||||
In this case particularly, though, option two, of using
|
|
||||||
`context.run_in_background` almost certainly makes more sense, so that
|
|
||||||
`operation1` and `operation2` are both logged against the original
|
|
||||||
logcontext. This looks like:
|
|
||||||
|
|
||||||
```python
|
```python
|
||||||
async def do_request_handling():
|
async def do_request_handling():
|
||||||
a1 = context.run_in_background(operation1)
|
a1 = context.run_in_background(operation1)
|
||||||
a2 = context.run_in_background(operation2)
|
a2 = context.run_in_background(operation2)
|
||||||
|
|
||||||
result = await make_deferred_yieldable(defer.gatherResults([a1, a2]))
|
result = await defer.gatherResults([a1, a2])
|
||||||
```
|
```
|
||||||
|
|
||||||
## A note on garbage-collection of awaitable chains
|
`background_process_metrics.run_as_background_process` also exists if you want some
|
||||||
|
automatic tracing and metrics for the background task.
|
||||||
It turns out that our logcontext rules do not play nicely with awaitable
|
|
||||||
chains which get orphaned and garbage-collected.
|
|
||||||
|
|
||||||
Imagine we have some code that looks like this:
|
|
||||||
|
|
||||||
```python
|
|
||||||
listener_queue = []
|
|
||||||
|
|
||||||
def on_something_interesting():
|
|
||||||
for d in listener_queue:
|
|
||||||
d.callback("foo")
|
|
||||||
|
|
||||||
async def await_something_interesting():
|
|
||||||
new_awaitable = defer.Deferred()
|
|
||||||
listener_queue.append(new_awaitable)
|
|
||||||
|
|
||||||
with PreserveLoggingContext():
|
|
||||||
await new_awaitable
|
|
||||||
```
|
|
||||||
|
|
||||||
Obviously, the idea here is that we have a bunch of things which are
|
|
||||||
waiting for an event. (It's just an example of the problem here, but a
|
|
||||||
relatively common one.)
|
|
||||||
|
|
||||||
Now let's imagine two further things happen. First of all, whatever was
|
|
||||||
waiting for the interesting thing goes away. (Perhaps the request times
|
|
||||||
out, or something *even more* interesting happens.)
|
|
||||||
|
|
||||||
Secondly, let's suppose that we decide that the interesting thing is
|
|
||||||
never going to happen, and we reset the listener queue:
|
|
||||||
|
|
||||||
```python
|
|
||||||
def reset_listener_queue():
|
|
||||||
listener_queue.clear()
|
|
||||||
```
|
|
||||||
|
|
||||||
So, both ends of the awaitable chain have now dropped their references,
|
|
||||||
and the awaitable chain is now orphaned, and will be garbage-collected at
|
|
||||||
some point. Note that `await_something_interesting` is a coroutine,
|
|
||||||
which Python implements as a generator function. When Python
|
|
||||||
garbage-collects generator functions, it gives them a chance to
|
|
||||||
clean up by making the `await` (or `yield`) raise a `GeneratorExit`
|
|
||||||
exception. In our case, that means that the `__exit__` handler of
|
|
||||||
`PreserveLoggingContext` will carefully restore the request context, but
|
|
||||||
there is now nothing waiting for its return, so the request context is
|
|
||||||
never cleared.
|
|
||||||
|
|
||||||
To reiterate, this problem only arises when *both* ends of a awaitable
|
|
||||||
chain are dropped. Dropping the the reference to an awaitable you're
|
|
||||||
supposed to be awaiting is bad practice, so this doesn't
|
|
||||||
actually happen too much. Unfortunately, when it does happen, it will
|
|
||||||
lead to leaked logcontexts which are incredibly hard to track down.
|
|
||||||
|
|||||||
@@ -86,12 +86,5 @@ import synapse.util # noqa: E402
|
|||||||
|
|
||||||
__version__ = synapse.util.SYNAPSE_VERSION
|
__version__ = synapse.util.SYNAPSE_VERSION
|
||||||
|
|
||||||
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
|
||||||
# We import here so that we don't have to install a bunch of deps when
|
|
||||||
# running the packaging tox test.
|
|
||||||
from synapse.util.patch_inline_callbacks import do_patch
|
|
||||||
|
|
||||||
do_patch()
|
|
||||||
|
|
||||||
|
|
||||||
check_rust_lib_up_to_date()
|
check_rust_lib_up_to_date()
|
||||||
|
|||||||
@@ -601,6 +601,12 @@ async def start(hs: "HomeServer") -> None:
|
|||||||
hs.get_datastores().main.db_pool.start_profiling()
|
hs.get_datastores().main.db_pool.start_profiling()
|
||||||
hs.get_pusherpool().start()
|
hs.get_pusherpool().start()
|
||||||
|
|
||||||
|
# Register background tasks required by this server. This must be done
|
||||||
|
# somewhat manually due to the background tasks not being registered
|
||||||
|
# unless handlers are instantiated.
|
||||||
|
if hs.config.worker.run_background_tasks:
|
||||||
|
hs.start_background_tasks()
|
||||||
|
|
||||||
# Log when we start the shut down process.
|
# Log when we start the shut down process.
|
||||||
hs.get_reactor().addSystemEventTrigger(
|
hs.get_reactor().addSystemEventTrigger(
|
||||||
"before", "shutdown", logger.info, "Shutting down..."
|
"before", "shutdown", logger.info, "Shutting down..."
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ import logging
|
|||||||
import threading
|
import threading
|
||||||
import typing
|
import typing
|
||||||
import warnings
|
import warnings
|
||||||
|
from contextvars import ContextVar
|
||||||
from types import TracebackType
|
from types import TracebackType
|
||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
@@ -653,13 +654,12 @@ class PreserveLoggingContext:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
_thread_local = threading.local()
|
_current_context: ContextVar[LoggingContextOrSentinel] = ContextVar("current_context")
|
||||||
_thread_local.current_context = SENTINEL_CONTEXT
|
|
||||||
|
|
||||||
|
|
||||||
def current_context() -> LoggingContextOrSentinel:
|
def current_context() -> LoggingContextOrSentinel:
|
||||||
"""Get the current logging context from thread local storage"""
|
"""Get the current logging context from thread local storage"""
|
||||||
return getattr(_thread_local, "current_context", SENTINEL_CONTEXT)
|
return _current_context.get(SENTINEL_CONTEXT)
|
||||||
|
|
||||||
|
|
||||||
def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSentinel:
|
def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSentinel:
|
||||||
@@ -680,7 +680,7 @@ def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSe
|
|||||||
if current is not context:
|
if current is not context:
|
||||||
rusage = get_thread_resource_usage()
|
rusage = get_thread_resource_usage()
|
||||||
current.stop(rusage)
|
current.stop(rusage)
|
||||||
_thread_local.current_context = context
|
_current_context.set(context)
|
||||||
context.start(rusage)
|
context.start(rusage)
|
||||||
|
|
||||||
return current
|
return current
|
||||||
@@ -796,7 +796,6 @@ def run_in_background(
|
|||||||
CRITICAL error about an unhandled error will be logged without much
|
CRITICAL error about an unhandled error will be logged without much
|
||||||
indication about where it came from.
|
indication about where it came from.
|
||||||
"""
|
"""
|
||||||
current = current_context()
|
|
||||||
try:
|
try:
|
||||||
res = f(*args, **kwargs)
|
res = f(*args, **kwargs)
|
||||||
except Exception:
|
except Exception:
|
||||||
@@ -825,23 +824,6 @@ def run_in_background(
|
|||||||
# optimise out the messing about
|
# optimise out the messing about
|
||||||
return d
|
return d
|
||||||
|
|
||||||
# The function may have reset the context before returning, so
|
|
||||||
# we need to restore it now.
|
|
||||||
ctx = set_current_context(current)
|
|
||||||
|
|
||||||
# The original context will be restored when the deferred
|
|
||||||
# completes, but there is nothing waiting for it, so it will
|
|
||||||
# get leaked into the reactor or some other function which
|
|
||||||
# wasn't expecting it. We therefore need to reset the context
|
|
||||||
# here.
|
|
||||||
#
|
|
||||||
# (If this feels asymmetric, consider it this way: we are
|
|
||||||
# effectively forking a new thread of execution. We are
|
|
||||||
# probably currently within a ``with LoggingContext()`` block,
|
|
||||||
# which is supposed to have a single entry and exit point. But
|
|
||||||
# by spawning off another deferred, we are effectively
|
|
||||||
# adding a new exit point.)
|
|
||||||
d.addBoth(_set_context_cb, ctx)
|
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
||||||
@@ -861,65 +843,20 @@ def run_coroutine_in_background(
|
|||||||
cannot change the log contexts.
|
cannot change the log contexts.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
current = current_context()
|
return defer.ensureDeferred(coroutine)
|
||||||
d = defer.ensureDeferred(coroutine)
|
|
||||||
|
|
||||||
# The function may have reset the context before returning, so
|
|
||||||
# we need to restore it now.
|
|
||||||
ctx = set_current_context(current)
|
|
||||||
|
|
||||||
# The original context will be restored when the deferred
|
|
||||||
# completes, but there is nothing waiting for it, so it will
|
|
||||||
# get leaked into the reactor or some other function which
|
|
||||||
# wasn't expecting it. We therefore need to reset the context
|
|
||||||
# here.
|
|
||||||
#
|
|
||||||
# (If this feels asymmetric, consider it this way: we are
|
|
||||||
# effectively forking a new thread of execution. We are
|
|
||||||
# probably currently within a ``with LoggingContext()`` block,
|
|
||||||
# which is supposed to have a single entry and exit point. But
|
|
||||||
# by spawning off another deferred, we are effectively
|
|
||||||
# adding a new exit point.)
|
|
||||||
d.addBoth(_set_context_cb, ctx)
|
|
||||||
return d
|
|
||||||
|
|
||||||
|
|
||||||
T = TypeVar("T")
|
T = TypeVar("T")
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: This function is a no-op now and should be removed in a follow-up PR.
|
||||||
def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
|
def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
|
||||||
"""Given a deferred, make it follow the Synapse logcontext rules:
|
|
||||||
|
|
||||||
If the deferred has completed, essentially does nothing (just returns another
|
|
||||||
completed deferred with the result/failure).
|
|
||||||
|
|
||||||
If the deferred has not yet completed, resets the logcontext before
|
|
||||||
returning a deferred. Then, when the deferred completes, restores the
|
|
||||||
current logcontext before running callbacks/errbacks.
|
|
||||||
|
|
||||||
(This is more-or-less the opposite operation to run_in_background.)
|
|
||||||
"""
|
|
||||||
if deferred.called and not deferred.paused:
|
|
||||||
# it looks like this deferred is ready to run any callbacks we give it
|
|
||||||
# immediately. We may as well optimise out the logcontext faffery.
|
|
||||||
return deferred
|
|
||||||
|
|
||||||
# ok, we can't be sure that a yield won't block, so let's reset the
|
|
||||||
# logcontext, and add a callback to the deferred to restore it.
|
|
||||||
prev_context = set_current_context(SENTINEL_CONTEXT)
|
|
||||||
deferred.addBoth(_set_context_cb, prev_context)
|
|
||||||
return deferred
|
return deferred
|
||||||
|
|
||||||
|
|
||||||
ResultT = TypeVar("ResultT")
|
ResultT = TypeVar("ResultT")
|
||||||
|
|
||||||
|
|
||||||
def _set_context_cb(result: ResultT, context: LoggingContextOrSentinel) -> ResultT:
|
|
||||||
"""A callback function which just sets the logging context"""
|
|
||||||
set_current_context(context)
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
def defer_to_thread(
|
def defer_to_thread(
|
||||||
reactor: "ISynapseReactor", f: Callable[P, R], *args: P.args, **kwargs: P.kwargs
|
reactor: "ISynapseReactor", f: Callable[P, R], *args: P.args, **kwargs: P.kwargs
|
||||||
) -> "defer.Deferred[R]":
|
) -> "defer.Deferred[R]":
|
||||||
@@ -931,9 +868,6 @@ def defer_to_thread(
|
|||||||
logcontext (so its CPU usage metrics will get attributed to the current
|
logcontext (so its CPU usage metrics will get attributed to the current
|
||||||
logcontext). `f` should preserve the logcontext it is given.
|
logcontext). `f` should preserve the logcontext it is given.
|
||||||
|
|
||||||
The result deferred follows the Synapse logcontext rules: you should `yield`
|
|
||||||
on it.
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
reactor: The reactor in whose main thread the Deferred will be invoked,
|
reactor: The reactor in whose main thread the Deferred will be invoked,
|
||||||
and whose threadpool we should use for the function.
|
and whose threadpool we should use for the function.
|
||||||
@@ -971,9 +905,6 @@ def defer_to_threadpool(
|
|||||||
logcontext (so its CPU usage metrics will get attributed to the current
|
logcontext (so its CPU usage metrics will get attributed to the current
|
||||||
logcontext). `f` should preserve the logcontext it is given.
|
logcontext). `f` should preserve the logcontext it is given.
|
||||||
|
|
||||||
The result deferred follows the Synapse logcontext rules: you should `yield`
|
|
||||||
on it.
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
reactor: The reactor in whose main thread the Deferred will be invoked.
|
reactor: The reactor in whose main thread the Deferred will be invoked.
|
||||||
Normally this will be hs.get_reactor().
|
Normally this will be hs.get_reactor().
|
||||||
@@ -991,18 +922,6 @@ def defer_to_threadpool(
|
|||||||
A Deferred which fires a callback with the result of `f`, or an
|
A Deferred which fires a callback with the result of `f`, or an
|
||||||
errback if `f` throws an exception.
|
errback if `f` throws an exception.
|
||||||
"""
|
"""
|
||||||
curr_context = current_context()
|
return make_deferred_yieldable(
|
||||||
if not curr_context:
|
threads.deferToThreadPool(reactor, threadpool, f, *args, **kwargs)
|
||||||
logger.warning(
|
|
||||||
"Calling defer_to_threadpool from sentinel context: metrics will be lost"
|
|
||||||
)
|
)
|
||||||
parent_context = None
|
|
||||||
else:
|
|
||||||
assert isinstance(curr_context, LoggingContext)
|
|
||||||
parent_context = curr_context
|
|
||||||
|
|
||||||
def g() -> R:
|
|
||||||
with LoggingContext(str(curr_context), parent_context=parent_context):
|
|
||||||
return f(*args, **kwargs)
|
|
||||||
|
|
||||||
return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g))
|
|
||||||
|
|||||||
@@ -223,10 +223,9 @@ def run_as_background_process(
|
|||||||
This should be used to wrap processes which are fired off to run in the
|
This should be used to wrap processes which are fired off to run in the
|
||||||
background, instead of being associated with a particular request.
|
background, instead of being associated with a particular request.
|
||||||
|
|
||||||
It returns a Deferred which completes when the function completes, but it doesn't
|
It returns a Deferred which completes when the function completes, which makes it
|
||||||
follow the synapse logcontext rules, which makes it appropriate for passing to
|
appropriate for passing to clock.looping_call and friends (or for
|
||||||
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
|
firing-and-forgetting in the middle of a normal synapse async function).
|
||||||
normal synapse async function).
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
desc: a description for this background process type
|
desc: a description for this background process type
|
||||||
@@ -241,8 +240,6 @@ def run_as_background_process(
|
|||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Deferred which returns the result of func, or `None` if func raises.
|
Deferred which returns the result of func, or `None` if func raises.
|
||||||
Note that the returned Deferred does not follow the synapse logcontext
|
|
||||||
rules.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
async def run() -> Optional[R]:
|
async def run() -> Optional[R]:
|
||||||
|
|||||||
@@ -237,10 +237,9 @@ def run_as_background_process(
|
|||||||
This should be used to wrap processes which are fired off to run in the
|
This should be used to wrap processes which are fired off to run in the
|
||||||
background, instead of being associated with a particular request.
|
background, instead of being associated with a particular request.
|
||||||
|
|
||||||
It returns a Deferred which completes when the function completes, but it doesn't
|
It returns a Deferred which completes when the function completes, which makes it
|
||||||
follow the synapse logcontext rules, which makes it appropriate for passing to
|
appropriate for passing to clock.looping_call and friends (or for
|
||||||
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
|
firing-and-forgetting in the middle of a normal synapse async function).
|
||||||
normal synapse async function).
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
desc: a description for this background process type
|
desc: a description for this background process type
|
||||||
@@ -255,8 +254,6 @@ def run_as_background_process(
|
|||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Deferred which returns the result of func, or `None` if func raises.
|
Deferred which returns the result of func, or `None` if func raises.
|
||||||
Note that the returned Deferred does not follow the synapse logcontext
|
|
||||||
rules.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
logger.warning(
|
logger.warning(
|
||||||
@@ -1375,9 +1372,7 @@ class ModuleApi:
|
|||||||
|
|
||||||
Args:
|
Args:
|
||||||
f: The function to call repeatedly. f can be either synchronous or
|
f: The function to call repeatedly. f can be either synchronous or
|
||||||
asynchronous, and must follow Synapse's logcontext rules.
|
asynchronous.
|
||||||
More info about logcontexts is available at
|
|
||||||
https://element-hq.github.io/synapse/latest/log_contexts.html
|
|
||||||
msec: How long to wait between calls in milliseconds.
|
msec: How long to wait between calls in milliseconds.
|
||||||
*args: Positional arguments to pass to function.
|
*args: Positional arguments to pass to function.
|
||||||
desc: The background task's description. Default to the function's name.
|
desc: The background task's description. Default to the function's name.
|
||||||
@@ -1431,9 +1426,7 @@ class ModuleApi:
|
|||||||
Args:
|
Args:
|
||||||
msec: How long to wait before calling, in milliseconds.
|
msec: How long to wait before calling, in milliseconds.
|
||||||
f: The function to call once. f can be either synchronous or
|
f: The function to call once. f can be either synchronous or
|
||||||
asynchronous, and must follow Synapse's logcontext rules.
|
asynchronous.
|
||||||
More info about logcontexts is available at
|
|
||||||
https://element-hq.github.io/synapse/latest/log_contexts.html
|
|
||||||
*args: Positional arguments to pass to function.
|
*args: Positional arguments to pass to function.
|
||||||
desc: The background task's description. Default to the function's name.
|
desc: The background task's description. Default to the function's name.
|
||||||
**kwargs: Keyword arguments to pass to function.
|
**kwargs: Keyword arguments to pass to function.
|
||||||
@@ -1668,10 +1661,9 @@ class ModuleApi:
|
|||||||
This should be used to wrap processes which are fired off to run in the
|
This should be used to wrap processes which are fired off to run in the
|
||||||
background, instead of being associated with a particular request.
|
background, instead of being associated with a particular request.
|
||||||
|
|
||||||
It returns a Deferred which completes when the function completes, but it doesn't
|
It returns a Deferred which completes when the function completes, which makes
|
||||||
follow the synapse logcontext rules, which makes it appropriate for passing to
|
it appropriate for passing to clock.looping_call and friends (or for
|
||||||
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
|
firing-and-forgetting in the middle of a normal synapse async function).
|
||||||
normal synapse async function).
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
desc: a description for this background process type
|
desc: a description for this background process type
|
||||||
@@ -1686,8 +1678,6 @@ class ModuleApi:
|
|||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Deferred which returns the result of func, or `None` if func raises.
|
Deferred which returns the result of func, or `None` if func raises.
|
||||||
Note that the returned Deferred does not follow the synapse logcontext
|
|
||||||
rules.
|
|
||||||
"""
|
"""
|
||||||
return _run_as_background_process(
|
return _run_as_background_process(
|
||||||
desc, self.server_name, func, *args, bg_start_span=bg_start_span, **kwargs
|
desc, self.server_name, func, *args, bg_start_span=bg_start_span, **kwargs
|
||||||
|
|||||||
@@ -366,12 +366,6 @@ class HomeServer(metaclass=abc.ABCMeta):
|
|||||||
self.datastores = Databases(self.DATASTORE_CLASS, self)
|
self.datastores = Databases(self.DATASTORE_CLASS, self)
|
||||||
logger.info("Finished setting up.")
|
logger.info("Finished setting up.")
|
||||||
|
|
||||||
# Register background tasks required by this server. This must be done
|
|
||||||
# somewhat manually due to the background tasks not being registered
|
|
||||||
# unless handlers are instantiated.
|
|
||||||
if self.config.worker.run_background_tasks:
|
|
||||||
self.setup_background_tasks()
|
|
||||||
|
|
||||||
def __del__(self) -> None:
|
def __del__(self) -> None:
|
||||||
"""
|
"""
|
||||||
Called when an the homeserver is garbage collected.
|
Called when an the homeserver is garbage collected.
|
||||||
@@ -410,7 +404,7 @@ class HomeServer(metaclass=abc.ABCMeta):
|
|||||||
appropriate listeners.
|
appropriate listeners.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def setup_background_tasks(self) -> None:
|
def start_background_tasks(self) -> None:
|
||||||
"""
|
"""
|
||||||
Some handlers have side effects on instantiation (like registering
|
Some handlers have side effects on instantiation (like registering
|
||||||
background updates). This function causes them to be fetched, and
|
background updates). This function causes them to be fetched, and
|
||||||
|
|||||||
@@ -84,9 +84,6 @@ class AbstractObservableDeferred(Generic[_T], metaclass=abc.ABCMeta):
|
|||||||
This returns a brand new deferred that is resolved when the underlying
|
This returns a brand new deferred that is resolved when the underlying
|
||||||
deferred is resolved. Interacting with the returned deferred does not
|
deferred is resolved. Interacting with the returned deferred does not
|
||||||
effect the underlying deferred.
|
effect the underlying deferred.
|
||||||
|
|
||||||
Note that the returned Deferred doesn't follow the Synapse logcontext rules -
|
|
||||||
you will probably want to `make_deferred_yieldable` it.
|
|
||||||
"""
|
"""
|
||||||
...
|
...
|
||||||
|
|
||||||
@@ -100,11 +97,6 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
|
|||||||
|
|
||||||
Cancelling or otherwise resolving an observer will not affect the original
|
Cancelling or otherwise resolving an observer will not affect the original
|
||||||
ObservableDeferred.
|
ObservableDeferred.
|
||||||
|
|
||||||
NB that it does not attempt to do anything with logcontexts; in general
|
|
||||||
you should probably make_deferred_yieldable the deferreds
|
|
||||||
returned by `observe`, and ensure that the original deferred runs its
|
|
||||||
callbacks in the sentinel logcontext.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
__slots__ = ["_deferred", "_observers", "_result"]
|
__slots__ = ["_deferred", "_observers", "_result"]
|
||||||
@@ -861,16 +853,12 @@ def stop_cancellation(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
|
|||||||
"""Prevent a `Deferred` from being cancelled by wrapping it in another `Deferred`.
|
"""Prevent a `Deferred` from being cancelled by wrapping it in another `Deferred`.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
deferred: The `Deferred` to protect against cancellation. Must not follow the
|
deferred: The `Deferred` to protect against cancellation.
|
||||||
Synapse logcontext rules.
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
A new `Deferred`, which will contain the result of the original `Deferred`.
|
A new `Deferred`, which will contain the result of the original `Deferred`.
|
||||||
The new `Deferred` will not propagate cancellation through to the original.
|
The new `Deferred` will not propagate cancellation through to the original.
|
||||||
When cancelled, the new `Deferred` will fail with a `CancelledError`.
|
When cancelled, the new `Deferred` will fail with a `CancelledError`.
|
||||||
|
|
||||||
The new `Deferred` will not follow the Synapse logcontext rules and should be
|
|
||||||
wrapped with `make_deferred_yieldable`.
|
|
||||||
"""
|
"""
|
||||||
new_deferred: "defer.Deferred[T]" = defer.Deferred()
|
new_deferred: "defer.Deferred[T]" = defer.Deferred()
|
||||||
deferred.chainDeferred(new_deferred)
|
deferred.chainDeferred(new_deferred)
|
||||||
@@ -896,8 +884,7 @@ def delay_cancellation(awaitable: Awaitable[T]) -> Awaitable[T]:
|
|||||||
resolve with a `CancelledError` until the original awaitable resolves.
|
resolve with a `CancelledError` until the original awaitable resolves.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
deferred: The coroutine or `Deferred` to protect against cancellation. May
|
deferred: The coroutine or `Deferred` to protect against cancellation.
|
||||||
optionally follow the Synapse logcontext rules.
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
A new `Deferred`, which will contain the result of the original coroutine or
|
A new `Deferred`, which will contain the result of the original coroutine or
|
||||||
@@ -906,10 +893,6 @@ def delay_cancellation(awaitable: Awaitable[T]) -> Awaitable[T]:
|
|||||||
|
|
||||||
When cancelled, the new `Deferred` will wait until the original coroutine or
|
When cancelled, the new `Deferred` will wait until the original coroutine or
|
||||||
`Deferred` resolves before failing with a `CancelledError`.
|
`Deferred` resolves before failing with a `CancelledError`.
|
||||||
|
|
||||||
The new `Deferred` will follow the Synapse logcontext rules if `awaitable`
|
|
||||||
follows the Synapse logcontext rules. Otherwise the new `Deferred` should be
|
|
||||||
wrapped with `make_deferred_yieldable`.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# First, convert the awaitable into a `Deferred`.
|
# First, convert the awaitable into a `Deferred`.
|
||||||
|
|||||||
@@ -295,9 +295,6 @@ class DeferredCache(Generic[KT, VT]):
|
|||||||
*original* `value`, (c) any future calls to `get()` will complete with the
|
*original* `value`, (c) any future calls to `get()` will complete with the
|
||||||
result from the *new* `value`.
|
result from the *new* `value`.
|
||||||
|
|
||||||
It is expected that `value` does *not* follow the synapse logcontext rules - ie,
|
|
||||||
if it is incomplete, it runs its callbacks in the sentinel context.
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
key: Key to be set
|
key: Key to be set
|
||||||
value: a deferred which will complete with a result to add to the cache
|
value: a deferred which will complete with a result to add to the cache
|
||||||
|
|||||||
@@ -234,11 +234,9 @@ class ResponseCache(Generic[KV]):
|
|||||||
) -> RV:
|
) -> RV:
|
||||||
"""Wrap together a *get* and *set* call, taking care of logcontexts
|
"""Wrap together a *get* and *set* call, taking care of logcontexts
|
||||||
|
|
||||||
First looks up the key in the cache, and if it is present makes it
|
First looks up the key in the cache, and if present, returns it.
|
||||||
follow the synapse logcontext rules and returns it.
|
|
||||||
|
|
||||||
Otherwise, makes a call to *callback(*args, **kwargs)*, which should
|
Otherwise, makes a call to *callback(*args, **kwargs)* and adds the result to the cache.
|
||||||
follow the synapse logcontext rules, and adds the result to the cache.
|
|
||||||
|
|
||||||
Example usage:
|
Example usage:
|
||||||
|
|
||||||
|
|||||||
@@ -1,250 +0,0 @@
|
|||||||
#
|
|
||||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
|
||||||
#
|
|
||||||
# Copyright (C) 2023 New Vector, Ltd
|
|
||||||
#
|
|
||||||
# This program is free software: you can redistribute it and/or modify
|
|
||||||
# it under the terms of the GNU Affero General Public License as
|
|
||||||
# published by the Free Software Foundation, either version 3 of the
|
|
||||||
# License, or (at your option) any later version.
|
|
||||||
#
|
|
||||||
# See the GNU Affero General Public License for more details:
|
|
||||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
|
||||||
#
|
|
||||||
# Originally licensed under the Apache License, Version 2.0:
|
|
||||||
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
|
||||||
#
|
|
||||||
# [This file includes modifications made by New Vector Limited]
|
|
||||||
#
|
|
||||||
#
|
|
||||||
|
|
||||||
import functools
|
|
||||||
import sys
|
|
||||||
from types import GeneratorType
|
|
||||||
from typing import Any, Callable, Generator, List, TypeVar, cast
|
|
||||||
|
|
||||||
from typing_extensions import ParamSpec
|
|
||||||
|
|
||||||
from twisted.internet import defer
|
|
||||||
from twisted.internet.defer import Deferred
|
|
||||||
from twisted.python.failure import Failure
|
|
||||||
|
|
||||||
# Tracks if we've already patched inlineCallbacks
|
|
||||||
_already_patched = False
|
|
||||||
|
|
||||||
|
|
||||||
T = TypeVar("T")
|
|
||||||
P = ParamSpec("P")
|
|
||||||
|
|
||||||
|
|
||||||
def do_patch() -> None:
|
|
||||||
"""
|
|
||||||
Patch defer.inlineCallbacks so that it checks the state of the logcontext on exit
|
|
||||||
"""
|
|
||||||
|
|
||||||
from synapse.logging.context import current_context
|
|
||||||
|
|
||||||
global _already_patched
|
|
||||||
|
|
||||||
orig_inline_callbacks = defer.inlineCallbacks
|
|
||||||
if _already_patched:
|
|
||||||
return
|
|
||||||
|
|
||||||
def new_inline_callbacks(
|
|
||||||
f: Callable[P, Generator["Deferred[object]", object, T]],
|
|
||||||
) -> Callable[P, "Deferred[T]"]:
|
|
||||||
@functools.wraps(f)
|
|
||||||
def wrapped(*args: P.args, **kwargs: P.kwargs) -> "Deferred[T]":
|
|
||||||
start_context = current_context()
|
|
||||||
changes: List[str] = []
|
|
||||||
orig: Callable[P, "Deferred[T]"] = orig_inline_callbacks(
|
|
||||||
_check_yield_points(f, changes)
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
res: "Deferred[T]" = orig(*args, **kwargs)
|
|
||||||
except Exception:
|
|
||||||
if current_context() != start_context:
|
|
||||||
for err in changes:
|
|
||||||
print(err, file=sys.stderr)
|
|
||||||
|
|
||||||
err = "%s changed context from %s to %s on exception" % (
|
|
||||||
f,
|
|
||||||
start_context,
|
|
||||||
current_context(),
|
|
||||||
)
|
|
||||||
print(err, file=sys.stderr)
|
|
||||||
raise Exception(err)
|
|
||||||
raise
|
|
||||||
|
|
||||||
if not isinstance(res, Deferred) or res.called:
|
|
||||||
if current_context() != start_context:
|
|
||||||
for err in changes:
|
|
||||||
print(err, file=sys.stderr)
|
|
||||||
|
|
||||||
err = "Completed %s changed context from %s to %s" % (
|
|
||||||
f,
|
|
||||||
start_context,
|
|
||||||
current_context(),
|
|
||||||
)
|
|
||||||
# print the error to stderr because otherwise all we
|
|
||||||
# see in travis-ci is the 500 error
|
|
||||||
print(err, file=sys.stderr)
|
|
||||||
raise Exception(err)
|
|
||||||
return res
|
|
||||||
|
|
||||||
if current_context():
|
|
||||||
err = (
|
|
||||||
"%s returned incomplete deferred in non-sentinel context "
|
|
||||||
"%s (start was %s)"
|
|
||||||
) % (f, current_context(), start_context)
|
|
||||||
print(err, file=sys.stderr)
|
|
||||||
raise Exception(err)
|
|
||||||
|
|
||||||
def check_ctx(r: T) -> T:
|
|
||||||
if current_context() != start_context:
|
|
||||||
for err in changes:
|
|
||||||
print(err, file=sys.stderr)
|
|
||||||
err = "%s completion of %s changed context from %s to %s" % (
|
|
||||||
"Failure" if isinstance(r, Failure) else "Success",
|
|
||||||
f,
|
|
||||||
start_context,
|
|
||||||
current_context(),
|
|
||||||
)
|
|
||||||
print(err, file=sys.stderr)
|
|
||||||
raise Exception(err)
|
|
||||||
return r
|
|
||||||
|
|
||||||
res.addBoth(check_ctx)
|
|
||||||
return res
|
|
||||||
|
|
||||||
return wrapped
|
|
||||||
|
|
||||||
defer.inlineCallbacks = new_inline_callbacks
|
|
||||||
_already_patched = True
|
|
||||||
|
|
||||||
|
|
||||||
def _check_yield_points(
|
|
||||||
f: Callable[P, Generator["Deferred[object]", object, T]],
|
|
||||||
changes: List[str],
|
|
||||||
) -> Callable:
|
|
||||||
"""Wraps a generator that is about to be passed to defer.inlineCallbacks
|
|
||||||
checking that after every yield the log contexts are correct.
|
|
||||||
|
|
||||||
It's perfectly valid for log contexts to change within a function, e.g. due
|
|
||||||
to new Measure blocks, so such changes are added to the given `changes`
|
|
||||||
list instead of triggering an exception.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
f: generator function to wrap
|
|
||||||
changes: A list of strings detailing how the contexts
|
|
||||||
changed within a function.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
function
|
|
||||||
"""
|
|
||||||
|
|
||||||
from synapse.logging.context import current_context
|
|
||||||
|
|
||||||
@functools.wraps(f)
|
|
||||||
def check_yield_points_inner(
|
|
||||||
*args: P.args, **kwargs: P.kwargs
|
|
||||||
) -> Generator["Deferred[object]", object, T]:
|
|
||||||
gen = f(*args, **kwargs)
|
|
||||||
|
|
||||||
# We only patch if we have a native generator function, as we rely on
|
|
||||||
# `gen.gi_frame`.
|
|
||||||
if not isinstance(gen, GeneratorType):
|
|
||||||
ret = yield from gen
|
|
||||||
return ret
|
|
||||||
|
|
||||||
last_yield_line_no = gen.gi_frame.f_lineno
|
|
||||||
result: Any = None
|
|
||||||
while True:
|
|
||||||
expected_context = current_context()
|
|
||||||
|
|
||||||
try:
|
|
||||||
isFailure = isinstance(result, Failure)
|
|
||||||
if isFailure:
|
|
||||||
d = result.throwExceptionIntoGenerator(gen)
|
|
||||||
else:
|
|
||||||
d = gen.send(result)
|
|
||||||
except StopIteration as e:
|
|
||||||
if current_context() != expected_context:
|
|
||||||
# This happens when the context is lost sometime *after* the
|
|
||||||
# final yield and returning. E.g. we forgot to yield on a
|
|
||||||
# function that returns a deferred.
|
|
||||||
#
|
|
||||||
# We don't raise here as it's perfectly valid for contexts to
|
|
||||||
# change in a function, as long as it sets the correct context
|
|
||||||
# on resolving (which is checked separately).
|
|
||||||
err = (
|
|
||||||
"Function %r returned and changed context from %s to %s,"
|
|
||||||
" in %s between %d and end of func"
|
|
||||||
% (
|
|
||||||
f.__qualname__,
|
|
||||||
expected_context,
|
|
||||||
current_context(),
|
|
||||||
f.__code__.co_filename,
|
|
||||||
last_yield_line_no,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
changes.append(err)
|
|
||||||
# The `StopIteration` contains the return value from the
|
|
||||||
# generator.
|
|
||||||
return cast(T, e.value)
|
|
||||||
|
|
||||||
frame = gen.gi_frame
|
|
||||||
|
|
||||||
if isinstance(d, defer.Deferred) and not d.called:
|
|
||||||
# This happens if we yield on a deferred that doesn't follow
|
|
||||||
# the log context rules without wrapping in a `make_deferred_yieldable`.
|
|
||||||
# We raise here as this should never happen.
|
|
||||||
if current_context():
|
|
||||||
err = (
|
|
||||||
"%s yielded with context %s rather than sentinel,"
|
|
||||||
" yielded on line %d in %s"
|
|
||||||
% (
|
|
||||||
frame.f_code.co_name,
|
|
||||||
current_context(),
|
|
||||||
frame.f_lineno,
|
|
||||||
frame.f_code.co_filename,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
raise Exception(err)
|
|
||||||
|
|
||||||
# the wrapped function yielded a Deferred: yield it back up to the parent
|
|
||||||
# inlineCallbacks().
|
|
||||||
try:
|
|
||||||
result = yield d
|
|
||||||
except Exception:
|
|
||||||
# this will fish an earlier Failure out of the stack where possible, and
|
|
||||||
# thus is preferable to passing in an exception to the Failure
|
|
||||||
# constructor, since it results in less stack-mangling.
|
|
||||||
result = Failure()
|
|
||||||
|
|
||||||
if current_context() != expected_context:
|
|
||||||
# This happens because the context is lost sometime *after* the
|
|
||||||
# previous yield and *after* the current yield. E.g. the
|
|
||||||
# deferred we waited on didn't follow the rules, or we forgot to
|
|
||||||
# yield on a function between the two yield points.
|
|
||||||
#
|
|
||||||
# We don't raise here as its perfectly valid for contexts to
|
|
||||||
# change in a function, as long as it sets the correct context
|
|
||||||
# on resolving (which is checked separately).
|
|
||||||
err = (
|
|
||||||
"%s changed context from %s to %s, happened between lines %d and %d in %s"
|
|
||||||
% (
|
|
||||||
frame.f_code.co_name,
|
|
||||||
expected_context,
|
|
||||||
current_context(),
|
|
||||||
last_yield_line_no,
|
|
||||||
frame.f_lineno,
|
|
||||||
frame.f_code.co_filename,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
changes.append(err)
|
|
||||||
|
|
||||||
last_yield_line_no = frame.f_lineno
|
|
||||||
|
|
||||||
return check_yield_points_inner
|
|
||||||
@@ -21,9 +21,4 @@
|
|||||||
|
|
||||||
from twisted.trial import util
|
from twisted.trial import util
|
||||||
|
|
||||||
from synapse.util.patch_inline_callbacks import do_patch
|
|
||||||
|
|
||||||
# attempt to do the patch before we load any synapse code
|
|
||||||
do_patch()
|
|
||||||
|
|
||||||
util.DEFAULT_TIMEOUT_DURATION = 20
|
util.DEFAULT_TIMEOUT_DURATION = 20
|
||||||
|
|||||||
@@ -355,7 +355,7 @@ class DescriptorTestCase(unittest.TestCase):
|
|||||||
d = obj.fn(1)
|
d = obj.fn(1)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
current_context(),
|
current_context(),
|
||||||
SENTINEL_CONTEXT,
|
c1,
|
||||||
)
|
)
|
||||||
yield d
|
yield d
|
||||||
self.fail("No exception thrown")
|
self.fail("No exception thrown")
|
||||||
@@ -849,7 +849,7 @@ class CachedListDescriptorTestCase(unittest.TestCase):
|
|||||||
|
|
||||||
# start the lookup off
|
# start the lookup off
|
||||||
d1 = obj.list_fn([10, 20], 2)
|
d1 = obj.list_fn([10, 20], 2)
|
||||||
self.assertEqual(current_context(), SENTINEL_CONTEXT)
|
self.assertEqual(current_context(), c1)
|
||||||
r = yield d1
|
r = yield d1
|
||||||
self.assertEqual(current_context(), c1)
|
self.assertEqual(current_context(), c1)
|
||||||
obj.mock.assert_called_once_with({10, 20}, 2)
|
obj.mock.assert_called_once_with({10, 20}, 2)
|
||||||
|
|||||||
Reference in New Issue
Block a user