Compare commits

...

1 Commits

Author SHA1 Message Date
Quentin Gliech
724a48ecdd WIP: use a contextvar to store the logcontext 2025-08-18 12:26:05 -05:00

View File

@@ -34,6 +34,7 @@ import logging
import threading import threading
import typing import typing
import warnings import warnings
from contextvars import ContextVar
from types import TracebackType from types import TracebackType
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
@@ -660,13 +661,12 @@ class PreserveLoggingContext:
) )
_thread_local = threading.local() _current_context: ContextVar[LoggingContextOrSentinel] = ContextVar("current_context")
_thread_local.current_context = SENTINEL_CONTEXT
def current_context() -> LoggingContextOrSentinel: def current_context() -> LoggingContextOrSentinel:
"""Get the current logging context from thread local storage""" """Get the current logging context from thread local storage"""
return getattr(_thread_local, "current_context", SENTINEL_CONTEXT) return _current_context.get(SENTINEL_CONTEXT)
def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSentinel: def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSentinel:
@@ -687,7 +687,7 @@ def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSe
if current is not context: if current is not context:
rusage = get_thread_resource_usage() rusage = get_thread_resource_usage()
current.stop(rusage) current.stop(rusage)
_thread_local.current_context = context _current_context.set(context)
context.start(rusage) context.start(rusage)
return current return current
@@ -803,7 +803,6 @@ def run_in_background(
CRITICAL error about an unhandled error will be logged without much CRITICAL error about an unhandled error will be logged without much
indication about where it came from. indication about where it came from.
""" """
current = current_context()
try: try:
res = f(*args, **kwargs) res = f(*args, **kwargs)
except Exception: except Exception:
@@ -832,23 +831,6 @@ def run_in_background(
# optimise out the messing about # optimise out the messing about
return d return d
# The function may have reset the context before returning, so
# we need to restore it now.
ctx = set_current_context(current)
# The original context will be restored when the deferred
# completes, but there is nothing waiting for it, so it will
# get leaked into the reactor or some other function which
# wasn't expecting it. We therefore need to reset the context
# here.
#
# (If this feels asymmetric, consider it this way: we are
# effectively forking a new thread of execution. We are
# probably currently within a ``with LoggingContext()`` block,
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
d.addBoth(_set_context_cb, ctx)
return d return d
@@ -868,65 +850,19 @@ def run_coroutine_in_background(
cannot change the log contexts. cannot change the log contexts.
""" """
current = current_context() return defer.ensureDeferred(coroutine)
d = defer.ensureDeferred(coroutine)
# The function may have reset the context before returning, so
# we need to restore it now.
ctx = set_current_context(current)
# The original context will be restored when the deferred
# completes, but there is nothing waiting for it, so it will
# get leaked into the reactor or some other function which
# wasn't expecting it. We therefore need to reset the context
# here.
#
# (If this feels asymmetric, consider it this way: we are
# effectively forking a new thread of execution. We are
# probably currently within a ``with LoggingContext()`` block,
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
d.addBoth(_set_context_cb, ctx)
return d
T = TypeVar("T") T = TypeVar("T")
def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]": def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
"""Given a deferred, make it follow the Synapse logcontext rules:
If the deferred has completed, essentially does nothing (just returns another
completed deferred with the result/failure).
If the deferred has not yet completed, resets the logcontext before
returning a deferred. Then, when the deferred completes, restores the
current logcontext before running callbacks/errbacks.
(This is more-or-less the opposite operation to run_in_background.)
"""
if deferred.called and not deferred.paused:
# it looks like this deferred is ready to run any callbacks we give it
# immediately. We may as well optimise out the logcontext faffery.
return deferred
# ok, we can't be sure that a yield won't block, so let's reset the
# logcontext, and add a callback to the deferred to restore it.
prev_context = set_current_context(SENTINEL_CONTEXT)
deferred.addBoth(_set_context_cb, prev_context)
return deferred return deferred
ResultT = TypeVar("ResultT") ResultT = TypeVar("ResultT")
def _set_context_cb(result: ResultT, context: LoggingContextOrSentinel) -> ResultT:
"""A callback function which just sets the logging context"""
set_current_context(context)
return result
def defer_to_thread( def defer_to_thread(
reactor: "ISynapseReactor", f: Callable[P, R], *args: P.args, **kwargs: P.kwargs reactor: "ISynapseReactor", f: Callable[P, R], *args: P.args, **kwargs: P.kwargs
) -> "defer.Deferred[R]": ) -> "defer.Deferred[R]":
@@ -998,18 +934,6 @@ def defer_to_threadpool(
A Deferred which fires a callback with the result of `f`, or an A Deferred which fires a callback with the result of `f`, or an
errback if `f` throws an exception. errback if `f` throws an exception.
""" """
curr_context = current_context() return make_deferred_yieldable(
if not curr_context: threads.deferToThreadPool(reactor, threadpool, f, *args, **kwargs)
logger.warning( )
"Calling defer_to_threadpool from sentinel context: metrics will be lost"
)
parent_context = None
else:
assert isinstance(curr_context, LoggingContext)
parent_context = curr_context
def g() -> R:
with LoggingContext(str(curr_context), parent_context=parent_context):
return f(*args, **kwargs)
return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g))