mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Remove sentinel logcontext in Clock utilities (looping_call, looping_call_now, call_later) (#18907)
Part of https://github.com/element-hq/synapse/issues/18905
Lints for ensuring we use `Clock.call_later` instead of
`reactor.callLater`, etc are coming in
https://github.com/element-hq/synapse/pull/18944
### Testing strategy
1. Configure Synapse to log at the `DEBUG` level
1. Start Synapse: `poetry run synapse_homeserver --config-path
homeserver.yaml`
1. Wait 10 seconds for the [database profiling
loop](9cc4001778/synapse/storage/database.py (L711))
to execute
1. Notice the logcontext being used for the `Total database time` log
line
Before (`sentinel`):
```
2025-09-10 16:36:58,651 - synapse.storage.TIME - 707 - DEBUG - sentinel - Total database time: 0.646% {room_forgetter_stream_pos(2): 0.131%, reap_monthly_active_users(1): 0.083%, get_device_change_last_converted_pos(1): 0.078%}
```
After (`looping_call`):
```
2025-09-10 16:36:58,651 - synapse.storage.TIME - 707 - DEBUG - looping_call - Total database time: 0.646% {room_forgetter_stream_pos(2): 0.131%, reap_monthly_active_users(1): 0.083%, get_device_change_last_converted_pos(1): 0.078%}
```
This commit is contained in:
1
changelog.d/18907.misc
Normal file
1
changelog.d/18907.misc
Normal file
@@ -0,0 +1 @@
|
||||
Remove `sentinel` logcontext usage in `Clock` utilities like `looping_call` and `call_later`.
|
||||
@@ -23,6 +23,7 @@ import attr
|
||||
from typing_extensions import ParamSpec
|
||||
|
||||
from twisted.internet import defer, task
|
||||
from twisted.internet.defer import Deferred
|
||||
from twisted.internet.interfaces import IDelayedCall
|
||||
from twisted.internet.task import LoopingCall
|
||||
|
||||
@@ -46,6 +47,8 @@ class Clock:
|
||||
|
||||
async def sleep(self, seconds: float) -> None:
|
||||
d: defer.Deferred[float] = defer.Deferred()
|
||||
# Start task in the `sentinel` logcontext, to avoid leaking the current context
|
||||
# into the reactor once it finishes.
|
||||
with context.PreserveLoggingContext():
|
||||
self._reactor.callLater(seconds, d.callback, seconds)
|
||||
await d
|
||||
@@ -74,8 +77,9 @@ class Clock:
|
||||
this functionality thanks to this function being a thin wrapper around
|
||||
`twisted.internet.task.LoopingCall`.
|
||||
|
||||
Note that the function will be called with no logcontext, so if it is anything
|
||||
other than trivial, you probably want to wrap it in run_as_background_process.
|
||||
Note that the function will be called with generic `looping_call` logcontext, so
|
||||
if it is anything other than a trivial task, you probably want to wrap it in
|
||||
`run_as_background_process` to give it more specific label and track metrics.
|
||||
|
||||
Args:
|
||||
f: The function to call repeatedly.
|
||||
@@ -97,8 +101,9 @@ class Clock:
|
||||
As with `looping_call`: subsequent calls are not scheduled until after the
|
||||
the Awaitable returned by a previous call has finished.
|
||||
|
||||
Also as with `looping_call`: the function is called with no logcontext and
|
||||
you probably want to wrap it in `run_as_background_process`.
|
||||
Note that the function will be called with generic `looping_call` logcontext, so
|
||||
if it is anything other than a trivial task, you probably want to wrap it in
|
||||
`run_as_background_process` to give it more specific label and track metrics.
|
||||
|
||||
Args:
|
||||
f: The function to call repeatedly.
|
||||
@@ -117,9 +122,43 @@ class Clock:
|
||||
**kwargs: P.kwargs,
|
||||
) -> LoopingCall:
|
||||
"""Common functionality for `looping_call` and `looping_call_now`"""
|
||||
call = task.LoopingCall(f, *args, **kwargs)
|
||||
|
||||
def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> Deferred:
|
||||
assert context.current_context() is context.SENTINEL_CONTEXT, (
|
||||
"Expected `looping_call` callback from the reactor to start with the sentinel logcontext "
|
||||
f"but saw {context.current_context()}. In other words, another task shouldn't have "
|
||||
"leaked their logcontext to us."
|
||||
)
|
||||
|
||||
# Because this is a callback from the reactor, we will be using the
|
||||
# `sentinel` log context at this point. We want the function to log with
|
||||
# some logcontext as we want to know which server the logs came from.
|
||||
#
|
||||
# We use `PreserveLoggingContext` to prevent our new `looping_call`
|
||||
# logcontext from finishing as soon as we exit this function, in case `f`
|
||||
# returns an awaitable/deferred which would continue running and may try to
|
||||
# restore the `loop_call` context when it's done (because it's trying to
|
||||
# adhere to the Synapse logcontext rules.)
|
||||
#
|
||||
# This also ensures that we return to the `sentinel` context when we exit
|
||||
# this function and yield control back to the reactor to avoid leaking the
|
||||
# current logcontext to the reactor (which would then get picked up and
|
||||
# associated with the next thing the reactor does)
|
||||
with context.PreserveLoggingContext(context.LoggingContext("looping_call")):
|
||||
# We use `run_in_background` to reset the logcontext after `f` (or the
|
||||
# awaitable returned by `f`) completes to avoid leaking the current
|
||||
# logcontext to the reactor
|
||||
return context.run_in_background(f, *args, **kwargs)
|
||||
|
||||
call = task.LoopingCall(wrapped_f, *args, **kwargs)
|
||||
call.clock = self._reactor
|
||||
d = call.start(msec / 1000.0, now=now)
|
||||
# If `now=true`, the function will be called here immediately so we need to be
|
||||
# in the sentinel context now.
|
||||
#
|
||||
# We want to start the task in the `sentinel` logcontext, to avoid leaking the
|
||||
# current context into the reactor after the function finishes.
|
||||
with context.PreserveLoggingContext():
|
||||
d = call.start(msec / 1000.0, now=now)
|
||||
d.addErrback(log_failure, "Looping call died", consumeErrors=False)
|
||||
return call
|
||||
|
||||
@@ -128,8 +167,9 @@ class Clock:
|
||||
) -> IDelayedCall:
|
||||
"""Call something later
|
||||
|
||||
Note that the function will be called with no logcontext, so if it is anything
|
||||
other than trivial, you probably want to wrap it in run_as_background_process.
|
||||
Note that the function will be called with generic `call_later` logcontext, so
|
||||
if it is anything other than a trivial task, you probably want to wrap it in
|
||||
`run_as_background_process` to give it more specific label and track metrics.
|
||||
|
||||
Args:
|
||||
delay: How long to wait in seconds.
|
||||
@@ -139,11 +179,33 @@ class Clock:
|
||||
"""
|
||||
|
||||
def wrapped_callback(*args: Any, **kwargs: Any) -> None:
|
||||
with context.PreserveLoggingContext():
|
||||
callback(*args, **kwargs)
|
||||
assert context.current_context() is context.SENTINEL_CONTEXT, (
|
||||
"Expected `call_later` callback from the reactor to start with the sentinel logcontext "
|
||||
f"but saw {context.current_context()}. In other words, another task shouldn't have "
|
||||
"leaked their logcontext to us."
|
||||
)
|
||||
|
||||
with context.PreserveLoggingContext():
|
||||
return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs)
|
||||
# Because this is a callback from the reactor, we will be using the
|
||||
# `sentinel` log context at this point. We want the function to log with
|
||||
# some logcontext as we want to know which server the logs came from.
|
||||
#
|
||||
# We use `PreserveLoggingContext` to prevent our new `call_later`
|
||||
# logcontext from finishing as soon as we exit this function, in case `f`
|
||||
# returns an awaitable/deferred which would continue running and may try to
|
||||
# restore the `loop_call` context when it's done (because it's trying to
|
||||
# adhere to the Synapse logcontext rules.)
|
||||
#
|
||||
# This also ensures that we return to the `sentinel` context when we exit
|
||||
# this function and yield control back to the reactor to avoid leaking the
|
||||
# current logcontext to the reactor (which would then get picked up and
|
||||
# associated with the next thing the reactor does)
|
||||
with context.PreserveLoggingContext(context.LoggingContext("call_later")):
|
||||
# We use `run_in_background` to reset the logcontext after `f` (or the
|
||||
# awaitable returned by `f`) completes to avoid leaking the current
|
||||
# logcontext to the reactor
|
||||
context.run_in_background(callback, *args, **kwargs)
|
||||
|
||||
return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs)
|
||||
|
||||
def cancel_call_later(self, timer: IDelayedCall, ignore_errs: bool = False) -> None:
|
||||
try:
|
||||
|
||||
@@ -31,6 +31,7 @@ from twisted.internet.testing import MemoryReactor
|
||||
|
||||
import synapse.rest.admin
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.push.emailpusher import EmailPusher
|
||||
from synapse.rest.client import login, room
|
||||
from synapse.rest.synapse.client.unsubscribe import UnsubscribeResource
|
||||
@@ -89,7 +90,7 @@ class EmailPusherTests(HomeserverTestCase):
|
||||
# This mocks out synapse.reactor.send_email._sendmail.
|
||||
d: Deferred = Deferred()
|
||||
self.email_attempts.append((d, args, kwargs))
|
||||
return d
|
||||
return make_deferred_yieldable(d)
|
||||
|
||||
hs.get_send_email_handler()._sendmail = sendmail # type: ignore[assignment]
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#
|
||||
#
|
||||
|
||||
import logging
|
||||
from typing import Callable, Generator, cast
|
||||
|
||||
import twisted.python.failure
|
||||
@@ -28,6 +29,7 @@ from synapse.logging.context import (
|
||||
SENTINEL_CONTEXT,
|
||||
LoggingContext,
|
||||
PreserveLoggingContext,
|
||||
_Sentinel,
|
||||
current_context,
|
||||
make_deferred_yieldable,
|
||||
nested_logging_context,
|
||||
@@ -36,7 +38,10 @@ from synapse.logging.context import (
|
||||
from synapse.types import ISynapseReactor
|
||||
from synapse.util.clock import Clock
|
||||
|
||||
from .. import unittest
|
||||
from tests import unittest
|
||||
from tests.unittest import logcontext_clean
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
reactor = cast(ISynapseReactor, _reactor)
|
||||
|
||||
@@ -44,33 +49,212 @@ reactor = cast(ISynapseReactor, _reactor)
|
||||
class LoggingContextTestCase(unittest.TestCase):
|
||||
def _check_test_key(self, value: str) -> None:
|
||||
context = current_context()
|
||||
assert isinstance(context, LoggingContext)
|
||||
self.assertEqual(context.name, value)
|
||||
assert isinstance(context, LoggingContext) or isinstance(context, _Sentinel), (
|
||||
f"Expected LoggingContext({value}) but saw {context}"
|
||||
)
|
||||
self.assertEqual(
|
||||
str(context), value, f"Expected LoggingContext({value}) but saw {context}"
|
||||
)
|
||||
|
||||
@logcontext_clean
|
||||
def test_with_context(self) -> None:
|
||||
with LoggingContext("test"):
|
||||
self._check_test_key("test")
|
||||
|
||||
@logcontext_clean
|
||||
async def test_sleep(self) -> None:
|
||||
"""
|
||||
Test `Clock.sleep`
|
||||
"""
|
||||
clock = Clock(reactor)
|
||||
|
||||
# Sanity check that we start in the sentinel context
|
||||
self._check_test_key("sentinel")
|
||||
|
||||
callback_finished = False
|
||||
|
||||
async def competing_callback() -> None:
|
||||
with LoggingContext("competing"):
|
||||
await clock.sleep(0)
|
||||
self._check_test_key("competing")
|
||||
nonlocal callback_finished
|
||||
try:
|
||||
# A callback from the reactor should start with the sentinel context. In
|
||||
# other words, another task shouldn't have leaked their context to us.
|
||||
self._check_test_key("sentinel")
|
||||
|
||||
with LoggingContext("competing"):
|
||||
await clock.sleep(0)
|
||||
self._check_test_key("competing")
|
||||
|
||||
self._check_test_key("sentinel")
|
||||
finally:
|
||||
# When exceptions happen, we still want to mark the callback as finished
|
||||
# so that the test can complete and we see the underlying error.
|
||||
callback_finished = True
|
||||
|
||||
reactor.callLater(0, lambda: defer.ensureDeferred(competing_callback()))
|
||||
|
||||
with LoggingContext("one"):
|
||||
with LoggingContext("foo"):
|
||||
await clock.sleep(0)
|
||||
self._check_test_key("one")
|
||||
self._check_test_key("foo")
|
||||
await clock.sleep(0)
|
||||
self._check_test_key("foo")
|
||||
|
||||
self.assertTrue(
|
||||
callback_finished,
|
||||
"Callback never finished which means the test probably didn't wait long enough",
|
||||
)
|
||||
|
||||
# Back to the sentinel context
|
||||
self._check_test_key("sentinel")
|
||||
|
||||
@logcontext_clean
|
||||
async def test_looping_call(self) -> None:
|
||||
"""
|
||||
Test `Clock.looping_call`
|
||||
"""
|
||||
clock = Clock(reactor)
|
||||
|
||||
# Sanity check that we start in the sentinel context
|
||||
self._check_test_key("sentinel")
|
||||
|
||||
callback_finished = False
|
||||
|
||||
async def competing_callback() -> None:
|
||||
nonlocal callback_finished
|
||||
try:
|
||||
# A `looping_call` callback should have *some* logcontext since we should know
|
||||
# which server spawned this loop and which server the logs came from.
|
||||
self._check_test_key("looping_call")
|
||||
|
||||
with LoggingContext("competing"):
|
||||
await clock.sleep(0)
|
||||
self._check_test_key("competing")
|
||||
|
||||
self._check_test_key("looping_call")
|
||||
finally:
|
||||
# When exceptions happen, we still want to mark the callback as finished
|
||||
# so that the test can complete and we see the underlying error.
|
||||
callback_finished = True
|
||||
|
||||
with LoggingContext("foo"):
|
||||
lc = clock.looping_call(
|
||||
lambda: defer.ensureDeferred(competing_callback()), 0
|
||||
)
|
||||
self._check_test_key("foo")
|
||||
await clock.sleep(0)
|
||||
self._check_test_key("foo")
|
||||
await clock.sleep(0)
|
||||
self._check_test_key("foo")
|
||||
|
||||
self.assertTrue(
|
||||
callback_finished,
|
||||
"Callback never finished which means the test probably didn't wait long enough",
|
||||
)
|
||||
|
||||
# Back to the sentinel context
|
||||
self._check_test_key("sentinel")
|
||||
|
||||
# Stop the looping call to prevent "Reactor was unclean" errors
|
||||
lc.stop()
|
||||
|
||||
@logcontext_clean
|
||||
async def test_looping_call_now(self) -> None:
|
||||
"""
|
||||
Test `Clock.looping_call_now`
|
||||
"""
|
||||
clock = Clock(reactor)
|
||||
|
||||
# Sanity check that we start in the sentinel context
|
||||
self._check_test_key("sentinel")
|
||||
|
||||
callback_finished = False
|
||||
|
||||
async def competing_callback() -> None:
|
||||
nonlocal callback_finished
|
||||
try:
|
||||
# A `looping_call` callback should have *some* logcontext since we should know
|
||||
# which server spawned this loop and which server the logs came from.
|
||||
self._check_test_key("looping_call")
|
||||
|
||||
with LoggingContext("competing"):
|
||||
await clock.sleep(0)
|
||||
self._check_test_key("competing")
|
||||
|
||||
self._check_test_key("looping_call")
|
||||
finally:
|
||||
# When exceptions happen, we still want to mark the callback as finished
|
||||
# so that the test can complete and we see the underlying error.
|
||||
callback_finished = True
|
||||
|
||||
with LoggingContext("foo"):
|
||||
lc = clock.looping_call_now(
|
||||
lambda: defer.ensureDeferred(competing_callback()), 0
|
||||
)
|
||||
self._check_test_key("foo")
|
||||
await clock.sleep(0)
|
||||
self._check_test_key("foo")
|
||||
|
||||
self.assertTrue(
|
||||
callback_finished,
|
||||
"Callback never finished which means the test probably didn't wait long enough",
|
||||
)
|
||||
|
||||
# Back to the sentinel context
|
||||
self._check_test_key("sentinel")
|
||||
|
||||
# Stop the looping call to prevent "Reactor was unclean" errors
|
||||
lc.stop()
|
||||
|
||||
@logcontext_clean
|
||||
async def test_call_later(self) -> None:
|
||||
"""
|
||||
Test `Clock.call_later`
|
||||
"""
|
||||
clock = Clock(reactor)
|
||||
|
||||
# Sanity check that we start in the sentinel context
|
||||
self._check_test_key("sentinel")
|
||||
|
||||
callback_finished = False
|
||||
|
||||
async def competing_callback() -> None:
|
||||
nonlocal callback_finished
|
||||
try:
|
||||
# A `call_later` callback should have *some* logcontext since we should know
|
||||
# which server spawned this loop and which server the logs came from.
|
||||
self._check_test_key("call_later")
|
||||
|
||||
with LoggingContext("competing"):
|
||||
await clock.sleep(0)
|
||||
self._check_test_key("competing")
|
||||
|
||||
self._check_test_key("call_later")
|
||||
finally:
|
||||
# When exceptions happen, we still want to mark the callback as finished
|
||||
# so that the test can complete and we see the underlying error.
|
||||
callback_finished = True
|
||||
|
||||
with LoggingContext("foo"):
|
||||
clock.call_later(0, lambda: defer.ensureDeferred(competing_callback()))
|
||||
self._check_test_key("foo")
|
||||
await clock.sleep(0)
|
||||
self._check_test_key("foo")
|
||||
await clock.sleep(0)
|
||||
self._check_test_key("foo")
|
||||
|
||||
self.assertTrue(
|
||||
callback_finished,
|
||||
"Callback never finished which means the test probably didn't wait long enough",
|
||||
)
|
||||
|
||||
# Back to the sentinel context
|
||||
self._check_test_key("sentinel")
|
||||
|
||||
def _test_run_in_background(self, function: Callable[[], object]) -> defer.Deferred:
|
||||
sentinel_context = current_context()
|
||||
|
||||
callback_completed = False
|
||||
|
||||
with LoggingContext("one"):
|
||||
with LoggingContext("foo"):
|
||||
# fire off function, but don't wait on it.
|
||||
d2 = run_in_background(function)
|
||||
|
||||
@@ -81,7 +265,7 @@ class LoggingContextTestCase(unittest.TestCase):
|
||||
|
||||
d2.addCallback(cb)
|
||||
|
||||
self._check_test_key("one")
|
||||
self._check_test_key("foo")
|
||||
|
||||
# now wait for the function under test to have run, and check that
|
||||
# the logcontext is left in a sane state.
|
||||
@@ -105,12 +289,14 @@ class LoggingContextTestCase(unittest.TestCase):
|
||||
# test is done once d2 finishes
|
||||
return d2
|
||||
|
||||
@logcontext_clean
|
||||
def test_run_in_background_with_blocking_fn(self) -> defer.Deferred:
|
||||
async def blocking_function() -> None:
|
||||
await Clock(reactor).sleep(0)
|
||||
|
||||
return self._test_run_in_background(blocking_function)
|
||||
|
||||
@logcontext_clean
|
||||
def test_run_in_background_with_non_blocking_fn(self) -> defer.Deferred:
|
||||
@defer.inlineCallbacks
|
||||
def nonblocking_function() -> Generator["defer.Deferred[object]", object, None]:
|
||||
@@ -119,6 +305,7 @@ class LoggingContextTestCase(unittest.TestCase):
|
||||
|
||||
return self._test_run_in_background(nonblocking_function)
|
||||
|
||||
@logcontext_clean
|
||||
def test_run_in_background_with_chained_deferred(self) -> defer.Deferred:
|
||||
# a function which returns a deferred which looks like it has been
|
||||
# called, but is actually paused
|
||||
@@ -127,22 +314,25 @@ class LoggingContextTestCase(unittest.TestCase):
|
||||
|
||||
return self._test_run_in_background(testfunc)
|
||||
|
||||
@logcontext_clean
|
||||
def test_run_in_background_with_coroutine(self) -> defer.Deferred:
|
||||
async def testfunc() -> None:
|
||||
self._check_test_key("one")
|
||||
self._check_test_key("foo")
|
||||
d = defer.ensureDeferred(Clock(reactor).sleep(0))
|
||||
self.assertIs(current_context(), SENTINEL_CONTEXT)
|
||||
await d
|
||||
self._check_test_key("one")
|
||||
self._check_test_key("foo")
|
||||
|
||||
return self._test_run_in_background(testfunc)
|
||||
|
||||
@logcontext_clean
|
||||
def test_run_in_background_with_nonblocking_coroutine(self) -> defer.Deferred:
|
||||
async def testfunc() -> None:
|
||||
self._check_test_key("one")
|
||||
self._check_test_key("foo")
|
||||
|
||||
return self._test_run_in_background(testfunc)
|
||||
|
||||
@logcontext_clean
|
||||
@defer.inlineCallbacks
|
||||
def test_make_deferred_yieldable(
|
||||
self,
|
||||
@@ -156,7 +346,7 @@ class LoggingContextTestCase(unittest.TestCase):
|
||||
|
||||
sentinel_context = current_context()
|
||||
|
||||
with LoggingContext("one"):
|
||||
with LoggingContext("foo"):
|
||||
d1 = make_deferred_yieldable(blocking_function())
|
||||
# make sure that the context was reset by make_deferred_yieldable
|
||||
self.assertIs(current_context(), sentinel_context)
|
||||
@@ -164,15 +354,16 @@ class LoggingContextTestCase(unittest.TestCase):
|
||||
yield d1
|
||||
|
||||
# now it should be restored
|
||||
self._check_test_key("one")
|
||||
self._check_test_key("foo")
|
||||
|
||||
@logcontext_clean
|
||||
@defer.inlineCallbacks
|
||||
def test_make_deferred_yieldable_with_chained_deferreds(
|
||||
self,
|
||||
) -> Generator["defer.Deferred[object]", object, None]:
|
||||
sentinel_context = current_context()
|
||||
|
||||
with LoggingContext("one"):
|
||||
with LoggingContext("foo"):
|
||||
d1 = make_deferred_yieldable(_chained_deferred_function())
|
||||
# make sure that the context was reset by make_deferred_yieldable
|
||||
self.assertIs(current_context(), sentinel_context)
|
||||
@@ -180,8 +371,9 @@ class LoggingContextTestCase(unittest.TestCase):
|
||||
yield d1
|
||||
|
||||
# now it should be restored
|
||||
self._check_test_key("one")
|
||||
self._check_test_key("foo")
|
||||
|
||||
@logcontext_clean
|
||||
def test_nested_logging_context(self) -> None:
|
||||
with LoggingContext("foo"):
|
||||
nested_context = nested_logging_context(suffix="bar")
|
||||
|
||||
Reference in New Issue
Block a user