Compare commits

...

1 Commits

Author SHA1 Message Date
Quentin Gliech
724a48ecdd WIP: use a contextvar to store the logcontext 2025-08-18 12:26:05 -05:00

View File

@@ -34,6 +34,7 @@ import logging
import threading
import typing
import warnings
from contextvars import ContextVar
from types import TracebackType
from typing import (
TYPE_CHECKING,
@@ -660,13 +661,12 @@ class PreserveLoggingContext:
)
_thread_local = threading.local()
_thread_local.current_context = SENTINEL_CONTEXT
_current_context: ContextVar[LoggingContextOrSentinel] = ContextVar("current_context")
def current_context() -> LoggingContextOrSentinel:
"""Get the current logging context from thread local storage"""
return getattr(_thread_local, "current_context", SENTINEL_CONTEXT)
return _current_context.get(SENTINEL_CONTEXT)
def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSentinel:
@@ -687,7 +687,7 @@ def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSe
if current is not context:
rusage = get_thread_resource_usage()
current.stop(rusage)
_thread_local.current_context = context
_current_context.set(context)
context.start(rusage)
return current
@@ -803,7 +803,6 @@ def run_in_background(
CRITICAL error about an unhandled error will be logged without much
indication about where it came from.
"""
current = current_context()
try:
res = f(*args, **kwargs)
except Exception:
@@ -832,23 +831,6 @@ def run_in_background(
# optimise out the messing about
return d
# The function may have reset the context before returning, so
# we need to restore it now.
ctx = set_current_context(current)
# The original context will be restored when the deferred
# completes, but there is nothing waiting for it, so it will
# get leaked into the reactor or some other function which
# wasn't expecting it. We therefore need to reset the context
# here.
#
# (If this feels asymmetric, consider it this way: we are
# effectively forking a new thread of execution. We are
# probably currently within a ``with LoggingContext()`` block,
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
d.addBoth(_set_context_cb, ctx)
return d
@@ -868,65 +850,19 @@ def run_coroutine_in_background(
cannot change the log contexts.
"""
current = current_context()
d = defer.ensureDeferred(coroutine)
# The function may have reset the context before returning, so
# we need to restore it now.
ctx = set_current_context(current)
# The original context will be restored when the deferred
# completes, but there is nothing waiting for it, so it will
# get leaked into the reactor or some other function which
# wasn't expecting it. We therefore need to reset the context
# here.
#
# (If this feels asymmetric, consider it this way: we are
# effectively forking a new thread of execution. We are
# probably currently within a ``with LoggingContext()`` block,
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
d.addBoth(_set_context_cb, ctx)
return d
return defer.ensureDeferred(coroutine)
T = TypeVar("T")
def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
"""Given a deferred, make it follow the Synapse logcontext rules:
If the deferred has completed, essentially does nothing (just returns another
completed deferred with the result/failure).
If the deferred has not yet completed, resets the logcontext before
returning a deferred. Then, when the deferred completes, restores the
current logcontext before running callbacks/errbacks.
(This is more-or-less the opposite operation to run_in_background.)
"""
if deferred.called and not deferred.paused:
# it looks like this deferred is ready to run any callbacks we give it
# immediately. We may as well optimise out the logcontext faffery.
return deferred
# ok, we can't be sure that a yield won't block, so let's reset the
# logcontext, and add a callback to the deferred to restore it.
prev_context = set_current_context(SENTINEL_CONTEXT)
deferred.addBoth(_set_context_cb, prev_context)
return deferred
ResultT = TypeVar("ResultT")
def _set_context_cb(result: ResultT, context: LoggingContextOrSentinel) -> ResultT:
"""A callback function which just sets the logging context"""
set_current_context(context)
return result
def defer_to_thread(
reactor: "ISynapseReactor", f: Callable[P, R], *args: P.args, **kwargs: P.kwargs
) -> "defer.Deferred[R]":
@@ -998,18 +934,6 @@ def defer_to_threadpool(
A Deferred which fires a callback with the result of `f`, or an
errback if `f` throws an exception.
"""
curr_context = current_context()
if not curr_context:
logger.warning(
"Calling defer_to_threadpool from sentinel context: metrics will be lost"
)
parent_context = None
else:
assert isinstance(curr_context, LoggingContext)
parent_context = curr_context
def g() -> R:
with LoggingContext(str(curr_context), parent_context=parent_context):
return f(*args, **kwargs)
return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g))
return make_deferred_yieldable(
threads.deferToThreadPool(reactor, threadpool, f, *args, **kwargs)
)