|
|
|
@@ -227,7 +227,16 @@ LoggingContextOrSentinel = Union["LoggingContext", "_Sentinel"]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _Sentinel:
|
|
|
|
class _Sentinel:
|
|
|
|
"""Sentinel to represent the root context"""
|
|
|
|
"""
|
|
|
|
|
|
|
|
Sentinel to represent the root context
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
This should only be used for tasks outside of Synapse like when we yield control
|
|
|
|
|
|
|
|
back to the Twisted reactor (event loop) so we don't leak the current logging
|
|
|
|
|
|
|
|
context to other tasks that are scheduled next in the event loop.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Nothing from the Synapse homeserver should be logged with the sentinel context. i.e.
|
|
|
|
|
|
|
|
we should always know which server the logs are coming from.
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
__slots__ = ["previous_context", "finished", "request", "tag"]
|
|
|
|
__slots__ = ["previous_context", "finished", "request", "tag"]
|
|
|
|
|
|
|
|
|
|
|
|
@@ -616,9 +625,17 @@ class LoggingContextFilter(logging.Filter):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PreserveLoggingContext:
|
|
|
|
class PreserveLoggingContext:
|
|
|
|
"""Context manager which replaces the logging context
|
|
|
|
"""
|
|
|
|
|
|
|
|
Context manager which replaces the logging context
|
|
|
|
|
|
|
|
|
|
|
|
The previous logging context is restored on exit."""
|
|
|
|
The previous logging context is restored on exit.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
`make_deferred_yieldable` is pretty equivalent to using `with
|
|
|
|
|
|
|
|
PreserveLoggingContext():` (using the default sentinel context), i.e. it clears the
|
|
|
|
|
|
|
|
logcontext before awaiting (and so before execution passes back to the reactor) and
|
|
|
|
|
|
|
|
restores the old context once the awaitable completes (execution passes from the
|
|
|
|
|
|
|
|
reactor back to the code).
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
__slots__ = ["_old_context", "_new_context"]
|
|
|
|
__slots__ = ["_old_context", "_new_context"]
|
|
|
|
|
|
|
|
|
|
|
|
@@ -784,6 +801,14 @@ def run_in_background(
|
|
|
|
return from the function, and that the sentinel context is set once the
|
|
|
|
return from the function, and that the sentinel context is set once the
|
|
|
|
deferred returned by the function completes.
|
|
|
|
deferred returned by the function completes.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
To explain how the log contexts work here:
|
|
|
|
|
|
|
|
- When this function is called, the current context is stored ("original"), we kick
|
|
|
|
|
|
|
|
off the background task, and we restore that original context before returning
|
|
|
|
|
|
|
|
- When the background task finishes, we don't want to leak our context into the
|
|
|
|
|
|
|
|
reactor which would erroneously get attached to the next operation picked up by
|
|
|
|
|
|
|
|
the event loop. We add a callback to the deferred which will clear the logging
|
|
|
|
|
|
|
|
context after it finishes and yields control back to the reactor.
|
|
|
|
|
|
|
|
|
|
|
|
Useful for wrapping functions that return a deferred or coroutine, which you don't
|
|
|
|
Useful for wrapping functions that return a deferred or coroutine, which you don't
|
|
|
|
yield or await on (for instance because you want to pass it to
|
|
|
|
yield or await on (for instance because you want to pass it to
|
|
|
|
deferred.gatherResults()).
|
|
|
|
deferred.gatherResults()).
|
|
|
|
@@ -795,8 +820,13 @@ def run_in_background(
|
|
|
|
`f` doesn't raise any deferred exceptions, otherwise a scary-looking
|
|
|
|
`f` doesn't raise any deferred exceptions, otherwise a scary-looking
|
|
|
|
CRITICAL error about an unhandled error will be logged without much
|
|
|
|
CRITICAL error about an unhandled error will be logged without much
|
|
|
|
indication about where it came from.
|
|
|
|
indication about where it came from.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
|
|
Deferred which returns the result of func, or `None` if func raises.
|
|
|
|
|
|
|
|
Note that the returned Deferred does not follow the synapse logcontext
|
|
|
|
|
|
|
|
rules.
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
current = current_context()
|
|
|
|
calling_context = current_context()
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
res = f(*args, **kwargs)
|
|
|
|
res = f(*args, **kwargs)
|
|
|
|
except Exception:
|
|
|
|
except Exception:
|
|
|
|
@@ -806,6 +836,9 @@ def run_in_background(
|
|
|
|
|
|
|
|
|
|
|
|
# `res` may be a coroutine, `Deferred`, some other kind of awaitable, or a plain
|
|
|
|
# `res` may be a coroutine, `Deferred`, some other kind of awaitable, or a plain
|
|
|
|
# value. Convert it to a `Deferred`.
|
|
|
|
# value. Convert it to a `Deferred`.
|
|
|
|
|
|
|
|
#
|
|
|
|
|
|
|
|
# Wrapping the value in a deferred has the side effect of executing the coroutine,
|
|
|
|
|
|
|
|
# if it is one. If it's already a deferred, then we can just use that.
|
|
|
|
d: "defer.Deferred[R]"
|
|
|
|
d: "defer.Deferred[R]"
|
|
|
|
if isinstance(res, typing.Coroutine):
|
|
|
|
if isinstance(res, typing.Coroutine):
|
|
|
|
# Wrap the coroutine in a `Deferred`.
|
|
|
|
# Wrap the coroutine in a `Deferred`.
|
|
|
|
@@ -820,20 +853,24 @@ def run_in_background(
|
|
|
|
# `res` is a plain value. Wrap it in a `Deferred`.
|
|
|
|
# `res` is a plain value. Wrap it in a `Deferred`.
|
|
|
|
d = defer.succeed(res)
|
|
|
|
d = defer.succeed(res)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# The deferred has already completed
|
|
|
|
if d.called and not d.paused:
|
|
|
|
if d.called and not d.paused:
|
|
|
|
# The function should have maintained the logcontext, so we can
|
|
|
|
# The function should have maintained the logcontext, so we can
|
|
|
|
# optimise out the messing about
|
|
|
|
# optimise out the messing about
|
|
|
|
return d
|
|
|
|
return d
|
|
|
|
|
|
|
|
|
|
|
|
# The function may have reset the context before returning, so
|
|
|
|
# The function may have reset the context before returning, so we need to restore it
|
|
|
|
# we need to restore it now.
|
|
|
|
# now.
|
|
|
|
ctx = set_current_context(current)
|
|
|
|
#
|
|
|
|
|
|
|
|
# Our goal is to have the caller logcontext unchanged after firing off the
|
|
|
|
|
|
|
|
# background task and returning.
|
|
|
|
|
|
|
|
set_current_context(calling_context)
|
|
|
|
|
|
|
|
|
|
|
|
# The original context will be restored when the deferred
|
|
|
|
# The original logcontext will be restored when the deferred completes, but
|
|
|
|
# completes, but there is nothing waiting for it, so it will
|
|
|
|
# there is nothing waiting for it, so it will get leaked into the reactor (which
|
|
|
|
# get leaked into the reactor or some other function which
|
|
|
|
# would then get picked up by the next thing the reactor does). We therefore
|
|
|
|
# wasn't expecting it. We therefore need to reset the context
|
|
|
|
# need to reset the logcontext here (set the `sentinel` logcontext) before
|
|
|
|
# here.
|
|
|
|
# yielding control back to the reactor.
|
|
|
|
#
|
|
|
|
#
|
|
|
|
# (If this feels asymmetric, consider it this way: we are
|
|
|
|
# (If this feels asymmetric, consider it this way: we are
|
|
|
|
# effectively forking a new thread of execution. We are
|
|
|
|
# effectively forking a new thread of execution. We are
|
|
|
|
@@ -841,7 +878,7 @@ def run_in_background(
|
|
|
|
# which is supposed to have a single entry and exit point. But
|
|
|
|
# which is supposed to have a single entry and exit point. But
|
|
|
|
# by spawning off another deferred, we are effectively
|
|
|
|
# by spawning off another deferred, we are effectively
|
|
|
|
# adding a new exit point.)
|
|
|
|
# adding a new exit point.)
|
|
|
|
d.addBoth(_set_context_cb, ctx)
|
|
|
|
d.addBoth(_set_context_cb, SENTINEL_CONTEXT)
|
|
|
|
return d
|
|
|
|
return d
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -859,20 +896,34 @@ def run_coroutine_in_background(
|
|
|
|
coroutine directly rather than a function. We can do this because coroutines
|
|
|
|
coroutine directly rather than a function. We can do this because coroutines
|
|
|
|
do not run until called, and so calling an async function without awaiting
|
|
|
|
do not run until called, and so calling an async function without awaiting
|
|
|
|
cannot change the log contexts.
|
|
|
|
cannot change the log contexts.
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
current = current_context()
|
|
|
|
This is an ergonomic helper so we can do this:
|
|
|
|
|
|
|
|
```python
|
|
|
|
|
|
|
|
run_coroutine_in_background(func1(arg1))
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
Rather than having to do this:
|
|
|
|
|
|
|
|
```python
|
|
|
|
|
|
|
|
run_in_background(lambda: func1(arg1))
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
calling_context = current_context()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Wrap the coroutine in a deferred, which will have the side effect of executing the
|
|
|
|
|
|
|
|
# coroutine in the background.
|
|
|
|
d = defer.ensureDeferred(coroutine)
|
|
|
|
d = defer.ensureDeferred(coroutine)
|
|
|
|
|
|
|
|
|
|
|
|
# The function may have reset the context before returning, so
|
|
|
|
# The function may have reset the context before returning, so we need to restore it
|
|
|
|
# we need to restore it now.
|
|
|
|
# now.
|
|
|
|
ctx = set_current_context(current)
|
|
|
|
#
|
|
|
|
|
|
|
|
# Our goal is to have the caller logcontext unchanged after firing off the
|
|
|
|
|
|
|
|
# background task and returning.
|
|
|
|
|
|
|
|
set_current_context(calling_context)
|
|
|
|
|
|
|
|
|
|
|
|
# The original context will be restored when the deferred
|
|
|
|
# The original logcontext will be restored when the deferred completes, but
|
|
|
|
# completes, but there is nothing waiting for it, so it will
|
|
|
|
# there is nothing waiting for it, so it will get leaked into the reactor (which
|
|
|
|
# get leaked into the reactor or some other function which
|
|
|
|
# would then get picked up by the next thing the reactor does). We therefore
|
|
|
|
# wasn't expecting it. We therefore need to reset the context
|
|
|
|
# need to reset the logcontext here (set the `sentinel` logcontext) before
|
|
|
|
# here.
|
|
|
|
# yielding control back to the reactor.
|
|
|
|
#
|
|
|
|
#
|
|
|
|
# (If this feels asymmetric, consider it this way: we are
|
|
|
|
# (If this feels asymmetric, consider it this way: we are
|
|
|
|
# effectively forking a new thread of execution. We are
|
|
|
|
# effectively forking a new thread of execution. We are
|
|
|
|
@@ -880,7 +931,7 @@ def run_coroutine_in_background(
|
|
|
|
# which is supposed to have a single entry and exit point. But
|
|
|
|
# which is supposed to have a single entry and exit point. But
|
|
|
|
# by spawning off another deferred, we are effectively
|
|
|
|
# by spawning off another deferred, we are effectively
|
|
|
|
# adding a new exit point.)
|
|
|
|
# adding a new exit point.)
|
|
|
|
d.addBoth(_set_context_cb, ctx)
|
|
|
|
d.addBoth(_set_context_cb, SENTINEL_CONTEXT)
|
|
|
|
return d
|
|
|
|
return d
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -888,24 +939,43 @@ T = TypeVar("T")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
|
|
|
|
def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
|
|
|
|
"""Given a deferred, make it follow the Synapse logcontext rules:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
If the deferred has completed, essentially does nothing (just returns another
|
|
|
|
|
|
|
|
completed deferred with the result/failure).
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
If the deferred has not yet completed, resets the logcontext before
|
|
|
|
|
|
|
|
returning a deferred. Then, when the deferred completes, restores the
|
|
|
|
|
|
|
|
current logcontext before running callbacks/errbacks.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
(This is more-or-less the opposite operation to run_in_background.)
|
|
|
|
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
|
|
|
|
Given a deferred, make it follow the Synapse logcontext rules:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- If the deferred has completed, essentially does nothing (just returns another
|
|
|
|
|
|
|
|
completed deferred with the result/failure).
|
|
|
|
|
|
|
|
- If the deferred has not yet completed, resets the logcontext before returning a
|
|
|
|
|
|
|
|
incomplete deferred. Then, when the deferred completes, restores the current
|
|
|
|
|
|
|
|
logcontext before running callbacks/errbacks.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
This means the resultant deferred can be awaited without leaking the current
|
|
|
|
|
|
|
|
logcontext to the reactor (which would then get erroneously picked up by the next
|
|
|
|
|
|
|
|
thing the reactor does), and also means that the logcontext is preserved when the
|
|
|
|
|
|
|
|
deferred completes.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
(This is more-or-less the opposite operation to run_in_background in terms of how it
|
|
|
|
|
|
|
|
handles log contexts.)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Pretty much equivalent to using `with PreserveLoggingContext():`, i.e. it clears the
|
|
|
|
|
|
|
|
logcontext before awaiting (and so before execution passes back to the reactor) and
|
|
|
|
|
|
|
|
restores the old context once the awaitable completes (execution passes from the
|
|
|
|
|
|
|
|
reactor back to the code).
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
# The deferred has already completed
|
|
|
|
if deferred.called and not deferred.paused:
|
|
|
|
if deferred.called and not deferred.paused:
|
|
|
|
# it looks like this deferred is ready to run any callbacks we give it
|
|
|
|
# it looks like this deferred is ready to run any callbacks we give it
|
|
|
|
# immediately. We may as well optimise out the logcontext faffery.
|
|
|
|
# immediately. We may as well optimise out the logcontext faffery.
|
|
|
|
return deferred
|
|
|
|
return deferred
|
|
|
|
|
|
|
|
|
|
|
|
# ok, we can't be sure that a yield won't block, so let's reset the
|
|
|
|
# Our goal is to have the caller logcontext unchanged after they yield/await the
|
|
|
|
# logcontext, and add a callback to the deferred to restore it.
|
|
|
|
# returned deferred.
|
|
|
|
|
|
|
|
#
|
|
|
|
|
|
|
|
# When the caller yield/await's the returned deferred, it may yield
|
|
|
|
|
|
|
|
# control back to the reactor. To avoid leaking the current logcontext to the
|
|
|
|
|
|
|
|
# reactor (which would then get erroneously picked up by the next thing the reactor
|
|
|
|
|
|
|
|
# does) while the deferred runs in the reactor event loop, we reset the logcontext
|
|
|
|
|
|
|
|
# and add a callback to the deferred to restore it so the caller's logcontext is
|
|
|
|
|
|
|
|
# active when the deferred completes.
|
|
|
|
prev_context = set_current_context(SENTINEL_CONTEXT)
|
|
|
|
prev_context = set_current_context(SENTINEL_CONTEXT)
|
|
|
|
deferred.addBoth(_set_context_cb, prev_context)
|
|
|
|
deferred.addBoth(_set_context_cb, prev_context)
|
|
|
|
return deferred
|
|
|
|
return deferred
|
|
|
|
|