Compare commits

...

5 Commits

Author SHA1 Message Date
Eric Eastwood
c3a65ee55e Add changelog 2025-11-12 16:37:25 -06:00
Eric Eastwood
16fba85271 Add debug logs for the rest of the clock utils 2025-11-12 16:35:16 -06:00
Eric Eastwood
e3e4f68aab Add debug logging to looping_call 2025-11-12 16:29:30 -06:00
Eric Eastwood
865f7d879f Use call_id to log 2025-11-12 16:23:15 -06:00
Eric Eastwood
1e9c6fbef0 First stab tracking clock.call_later(...) 2025-11-12 16:02:38 -06:00
3 changed files with 130 additions and 64 deletions

1
changelog.d/19173.misc Normal file
View File

@@ -0,0 +1 @@
Add debug logs to track `Clock` utilities.

View File

@@ -65,8 +65,6 @@ from typing import (
Sequence,
)
from twisted.internet.interfaces import IDelayedCall
from synapse.appservice import (
ApplicationService,
ApplicationServiceState,
@@ -78,7 +76,7 @@ from synapse.events import EventBase
from synapse.logging.context import run_in_background
from synapse.storage.databases.main import DataStore
from synapse.types import DeviceListUpdates, JsonMapping
from synapse.util.clock import Clock
from synapse.util.clock import Clock, DelayedCallWrapper
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -503,7 +501,7 @@ class _Recoverer:
self.service = service
self.callback = callback
self.backoff_counter = 1
self.scheduled_recovery: IDelayedCall | None = None
self.scheduled_recovery: DelayedCallWrapper | None = None
def recover(self) -> None:
delay = 2**self.backoff_counter

View File

@@ -14,6 +14,7 @@
#
import logging
from typing import (
Any,
Callable,
@@ -30,10 +31,14 @@ from twisted.internet.task import LoopingCall
from synapse.logging import context
from synapse.types import ISynapseThreadlessReactor
from synapse.util import log_failure
from synapse.util.stringutils import random_string_insecure_fast
P = ParamSpec("P")
logger = logging.getLogger(__name__)
class Clock:
"""
A Clock wraps a Twisted reactor and provides utilities on top of it.
@@ -64,7 +69,12 @@ class Clock:
"""List of active looping calls"""
self._call_id_to_delayed_call: dict[int, IDelayedCall] = {}
"""Mapping from unique call ID to delayed call"""
"""
Mapping from unique call ID to delayed call.
For "performance", this only tracks a subset of delayed calls: those created
with `call_later` with `call_later_cancel_on_shutdown=True`.
"""
self._is_shutdown = False
"""Whether shutdown has been requested by the HomeServer"""
@@ -153,11 +163,20 @@ class Clock:
**kwargs: P.kwargs,
) -> LoopingCall:
"""Common functionality for `looping_call` and `looping_call_now`"""
instance_id = random_string_insecure_fast(5)
if self._is_shutdown:
raise Exception("Cannot start looping call. Clock has been shutdown")
looping_call_context_string = "looping_call"
if now:
looping_call_context_string = "looping_call_now"
def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> Deferred:
logger.debug(
"%s(%s): Executing callback", looping_call_context_string, instance_id
)
assert context.current_context() is context.SENTINEL_CONTEXT, (
"Expected `looping_call` callback from the reactor to start with the sentinel logcontext "
f"but saw {context.current_context()}. In other words, another task shouldn't have "
@@ -201,6 +220,17 @@ class Clock:
d = call.start(msec / 1000.0, now=now)
d.addErrback(log_failure, "Looping call died", consumeErrors=False)
self._looping_calls.append(call)
logger.debug(
"%s(%s): Scheduled looping call every %sms later",
looping_call_context_string,
instance_id,
msec,
# Find out who is scheduling the call which makes it easy to follow in the
# logs.
stack_info=True,
)
return call
def cancel_all_looping_calls(self, consumeErrors: bool = True) -> None:
@@ -226,7 +256,7 @@ class Clock:
*args: Any,
call_later_cancel_on_shutdown: bool = True,
**kwargs: Any,
) -> IDelayedCall:
) -> "DelayedCallWrapper":
"""Call something later
Note that the function will be called with generic `call_later` logcontext, so
@@ -245,74 +275,79 @@ class Clock:
issue, we can just track all delayed calls.
**kwargs: Key arguments to pass to function.
"""
call_id = self._delayed_call_id
self._delayed_call_id = self._delayed_call_id + 1
if self._is_shutdown:
raise Exception("Cannot start delayed call. Clock has been shutdown")
def create_wrapped_callback(
track_for_shutdown_cancellation: bool,
) -> Callable[P, None]:
def wrapped_callback(*args: Any, **kwargs: Any) -> None:
assert context.current_context() is context.SENTINEL_CONTEXT, (
"Expected `call_later` callback from the reactor to start with the sentinel logcontext "
f"but saw {context.current_context()}. In other words, another task shouldn't have "
"leaked their logcontext to us."
)
def wrapped_callback(*args: Any, **kwargs: Any) -> None:
logger.debug("call_later(%s): Executing callback", call_id)
# Because this is a callback from the reactor, we will be using the
# `sentinel` log context at this point. We want the function to log with
# some logcontext as we want to know which server the logs came from.
#
# We use `PreserveLoggingContext` to prevent our new `call_later`
# logcontext from finishing as soon as we exit this function, in case `f`
# returns an awaitable/deferred which would continue running and may try to
# restore the `call_later` context when it's done (because it's trying to
# adhere to the Synapse logcontext rules.)
#
# This also ensures that we return to the `sentinel` context when we exit
# this function and yield control back to the reactor to avoid leaking the
# current logcontext to the reactor (which would then get picked up and
# associated with the next thing the reactor does)
try:
with context.PreserveLoggingContext(
context.LoggingContext(
name="call_later", server_name=self._server_name
)
):
# We use `run_in_background` to reset the logcontext after `f` (or the
# awaitable returned by `f`) completes to avoid leaking the current
# logcontext to the reactor
context.run_in_background(callback, *args, **kwargs)
finally:
if track_for_shutdown_cancellation:
# We still want to remove the call from the tracking map. Even if
# the callback raises an exception.
self._call_id_to_delayed_call.pop(call_id)
assert context.current_context() is context.SENTINEL_CONTEXT, (
"Expected `call_later` callback from the reactor to start with the sentinel logcontext "
f"but saw {context.current_context()}. In other words, another task shouldn't have "
"leaked their logcontext to us."
)
return wrapped_callback
# Because this is a callback from the reactor, we will be using the
# `sentinel` log context at this point. We want the function to log with
# some logcontext as we want to know which server the logs came from.
#
# We use `PreserveLoggingContext` to prevent our new `call_later`
# logcontext from finishing as soon as we exit this function, in case `f`
# returns an awaitable/deferred which would continue running and may try to
# restore the `call_later` context when it's done (because it's trying to
# adhere to the Synapse logcontext rules.)
#
# This also ensures that we return to the `sentinel` context when we exit
# this function and yield control back to the reactor to avoid leaking the
# current logcontext to the reactor (which would then get picked up and
# associated with the next thing the reactor does)
try:
with context.PreserveLoggingContext(
context.LoggingContext(
name="call_later", server_name=self._server_name
)
):
# We use `run_in_background` to reset the logcontext after `f` (or the
# awaitable returned by `f`) completes to avoid leaking the current
# logcontext to the reactor
context.run_in_background(callback, *args, **kwargs)
finally:
if call_later_cancel_on_shutdown:
# We still want to remove the call from the tracking map. Even if
# the callback raises an exception.
self._call_id_to_delayed_call.pop(call_id)
# We can ignore the lint here since this class is the one location callLater should
# be called.
call = self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) # type: ignore[call-later-not-tracked]
logger.debug(
"call_later(%s): Scheduled call for %ss later (tracked for shutdown: %s)",
call_id,
delay,
call_later_cancel_on_shutdown,
# Find out who is scheduling the call which makes it easy to follow in the
# logs.
stack_info=True,
)
wrapped_call = DelayedCallWrapper(call, call_id, self)
if call_later_cancel_on_shutdown:
call_id = self._delayed_call_id
self._delayed_call_id = self._delayed_call_id + 1
self._call_id_to_delayed_call[call_id] = wrapped_call
# We can ignore the lint here since this class is the one location callLater
# should be called.
call = self._reactor.callLater(
delay, create_wrapped_callback(True), *args, **kwargs
) # type: ignore[call-later-not-tracked]
call = DelayedCallWrapper(call, call_id, self)
self._call_id_to_delayed_call[call_id] = call
return call
else:
# We can ignore the lint here since this class is the one location callLater should
# be called.
return self._reactor.callLater(
delay, create_wrapped_callback(False), *args, **kwargs
) # type: ignore[call-later-not-tracked]
return wrapped_call
def cancel_call_later(self, timer: IDelayedCall, ignore_errs: bool = False) -> None:
def cancel_call_later(
self, wrapped_call: "DelayedCallWrapper", ignore_errs: bool = False
) -> None:
try:
timer.cancel()
logger.debug(
"cancel_call_later: cancelling scheduled call %s", wrapped_call.call_id
)
wrapped_call.delayed_call.cancel()
except Exception:
if not ignore_errs:
raise
@@ -327,8 +362,11 @@ class Clock:
"""
# We make a copy here since calling `cancel()` on a delayed_call
# will result in the call removing itself from the map mid-iteration.
for call in list(self._call_id_to_delayed_call.values()):
for call_id, call in list(self._call_id_to_delayed_call.items()):
try:
logger.debug(
"cancel_all_delayed_calls: cancelling scheduled call %s", call_id
)
call.cancel()
except Exception:
if not ignore_errs:
@@ -352,8 +390,11 @@ class Clock:
*args: Postional arguments to pass to function.
**kwargs: Key arguments to pass to function.
"""
instance_id = random_string_insecure_fast(5)
def wrapped_callback(*args: Any, **kwargs: Any) -> None:
logger.debug("call_when_running(%s): Executing callback", instance_id)
# Since this callback can be invoked immediately if the reactor is already
# running, we can't always assume that we're running in the sentinel
# logcontext (i.e. we can't assert that we're in the sentinel context like
@@ -392,6 +433,14 @@ class Clock:
# callWhenRunning should be called.
self._reactor.callWhenRunning(wrapped_callback, *args, **kwargs) # type: ignore[prefer-synapse-clock-call-when-running]
logger.debug(
"call_when_running(%s): Scheduled call",
instance_id,
# Find out who is scheduling the call which makes it easy to follow in the
# logs.
stack_info=True,
)
def add_system_event_trigger(
self,
phase: str,
@@ -417,8 +466,16 @@ class Clock:
Returns:
an ID that can be used to remove this call with `reactor.removeSystemEventTrigger`.
"""
instance_id = random_string_insecure_fast(5)
def wrapped_callback(*args: Any, **kwargs: Any) -> None:
logger.debug(
"add_system_event_trigger(%s): Executing %s %s callback",
instance_id,
phase,
event_type,
)
assert context.current_context() is context.SENTINEL_CONTEXT, (
"Expected `add_system_event_trigger` callback from the reactor to start with the sentinel logcontext "
f"but saw {context.current_context()}. In other words, another task shouldn't have "
@@ -449,6 +506,16 @@ class Clock:
# logcontext to the reactor
context.run_in_background(callback, *args, **kwargs)
logger.debug(
"add_system_event_trigger(%s) for %s %s",
instance_id,
phase,
event_type,
# Find out who is scheduling the call which makes it easy to follow in the
# logs.
stack_info=True,
)
# We can ignore the lint here since this class is the one location
# `addSystemEventTrigger` should be called.
return self._reactor.addSystemEventTrigger(