Port call_later

This commit is contained in:
Erik Johnston
2025-11-26 14:53:51 +00:00
parent d470dee438
commit 41d2b490ef
24 changed files with 141 additions and 52 deletions

View File

@@ -278,14 +278,14 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
# We need to defer this init for the cases that we daemonize
# otherwise the process ID we get is that of the non-daemon process
clock.call_later(
0,
Duration(seconds=0),
performance_stats_init,
)
# We wait 5 minutes to send the first set of stats as the server can
# be quite busy the first few minutes
clock.call_later(
INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME.as_secs(),
INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME,
phone_stats_home,
hs,
stats,

View File

@@ -77,6 +77,7 @@ from synapse.logging.context import run_in_background
from synapse.storage.databases.main import DataStore
from synapse.types import DeviceListUpdates, JsonMapping
from synapse.util.clock import Clock, DelayedCallWrapper
from synapse.util.duration import Duration
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -504,7 +505,7 @@ class _Recoverer:
self.scheduled_recovery: DelayedCallWrapper | None = None
def recover(self) -> None:
delay = 2**self.backoff_counter
delay = Duration(seconds=2**self.backoff_counter)
logger.info("Scheduling retries on %s in %fs", self.service.id, delay)
self.scheduled_recovery = self.clock.call_later(
delay,

View File

@@ -42,6 +42,7 @@ from synapse.types import (
UserID,
create_requester,
)
from synapse.util.duration import Duration
from synapse.util.events import generate_fake_event_id
from synapse.util.metrics import Measure
from synapse.util.sentinel import Sentinel
@@ -92,7 +93,7 @@ class DelayedEventsHandler:
# Kick off again (without blocking) to catch any missed notifications
# that may have fired before the callback was added.
self._clock.call_later(
0,
Duration(seconds=0),
self.notify_new_event,
)
@@ -501,17 +502,17 @@ class DelayedEventsHandler:
def _schedule_next_at(self, next_send_ts: Timestamp) -> None:
delay = next_send_ts - self._get_current_ts()
delay_sec = delay / 1000 if delay > 0 else 0
delay_duration = Duration(milliseconds=max(delay, 0))
if self._next_delayed_event_call is None:
self._next_delayed_event_call = self._clock.call_later(
delay_sec,
delay_duration,
self.hs.run_as_background_process,
"_send_on_timeout",
self._send_on_timeout,
)
else:
self._next_delayed_event_call.reset(delay_sec)
self._next_delayed_event_call.reset(delay_duration.as_secs())
async def get_all_for_user(self, requester: Requester) -> list[JsonDict]:
"""Return all pending delayed events requested by the given user."""

View File

@@ -434,14 +434,11 @@ class MessageHandler:
# Figure out how many seconds we need to wait before expiring the event.
now_ms = self.clock.time_msec()
delay = (expiry_ts - now_ms) / 1000
delay = Duration(milliseconds=max(expiry_ts - now_ms, 0))
# callLater doesn't support negative delays, so trim the delay to 0 if we're
# in that case.
if delay < 0:
delay = 0
logger.info("Scheduling expiry for event %s in %.3fs", event_id, delay)
logger.info(
"Scheduling expiry for event %s in %.3fs", event_id, delay.as_secs()
)
self._scheduled_expiry = self.clock.call_later(
delay,

View File

@@ -862,7 +862,7 @@ class PresenceHandler(BasePresenceHandler):
# The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline.
self.clock.call_later(
30,
Duration(seconds=30),
self.clock.looping_call,
self._handle_timeouts,
Duration(seconds=5),
@@ -872,7 +872,7 @@ class PresenceHandler(BasePresenceHandler):
# internally.
if self._presence_enabled:
self.clock.call_later(
60,
Duration(seconds=60),
self.clock.looping_call,
self._persist_unpersisted_changes,
Duration(minutes=1),

View File

@@ -2191,7 +2191,7 @@ class RoomForgetterHandler(StateDeltasHandler):
# We kick this off to pick up outstanding work from before the last restart.
self._clock.call_later(
0,
Duration(seconds=0),
self.notify_new_event,
)

View File

@@ -32,6 +32,7 @@ from synapse.api.constants import EventContentFields, EventTypes, Membership
from synapse.metrics import SERVER_NAME_LABEL, event_processing_positions
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.types import JsonDict
from synapse.util.duration import Duration
from synapse.util.events import get_plain_text_topic_from_event_content
if TYPE_CHECKING:
@@ -72,7 +73,7 @@ class StatsHandler:
# We kick this off so that we don't have to wait for a change before
# we start populating stats
self.clock.call_later(
0,
Duration(seconds=0),
self.notify_new_event,
)

View File

@@ -40,6 +40,7 @@ from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.storage.databases.main.user_directory import SearchResult
from synapse.storage.roommember import ProfileInfo
from synapse.types import UserID
from synapse.util.duration import Duration
from synapse.util.metrics import Measure
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import non_null_str_or_none
@@ -52,7 +53,7 @@ logger = logging.getLogger(__name__)
# Don't refresh a stale user directory entry, using a Federation /profile request,
# for 60 seconds. This gives time for other state events to arrive (which will
# then be coalesced such that only one /profile request is made).
USER_DIRECTORY_STALE_REFRESH_TIME_MS = 60 * 1000
USER_DIRECTORY_STALE_REFRESH_TIME = Duration(seconds=60)
# Maximum number of remote servers that we will attempt to refresh profiles for
# in one go.
@@ -60,7 +61,7 @@ MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO = 5
# As long as we have servers to refresh (without backoff), keep adding more
# every 15 seconds.
INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES = 15
INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES = Duration(seconds=15)
def calculate_time_of_next_retry(now_ts: int, retry_count: int) -> int:
@@ -137,13 +138,13 @@ class UserDirectoryHandler(StateDeltasHandler):
# We kick this off so that we don't have to wait for a change before
# we start populating the user directory
self.clock.call_later(
0,
Duration(seconds=0),
self.notify_new_event,
)
# Kick off the profile refresh process on startup
self._refresh_remote_profiles_call_later = self.clock.call_later(
10,
Duration(seconds=10),
self.kick_off_remote_profile_refresh_process,
)
@@ -550,7 +551,7 @@ class UserDirectoryHandler(StateDeltasHandler):
now_ts = self.clock.time_msec()
await self.store.set_remote_user_profile_in_user_dir_stale(
user_id,
next_try_at_ms=now_ts + USER_DIRECTORY_STALE_REFRESH_TIME_MS,
next_try_at_ms=now_ts + USER_DIRECTORY_STALE_REFRESH_TIME.as_millis(),
retry_counter=0,
)
# Schedule a wake-up to refresh the user directory for this server.
@@ -558,13 +559,13 @@ class UserDirectoryHandler(StateDeltasHandler):
# other servers ahead of it in the queue to get in the way of updating
# the profile if the server only just sent us an event.
self.clock.call_later(
USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1,
USER_DIRECTORY_STALE_REFRESH_TIME + Duration(seconds=1),
self.kick_off_remote_profile_refresh_process_for_remote_server,
UserID.from_string(user_id).domain,
)
# Schedule a wake-up to handle any backoffs that may occur in the future.
self.clock.call_later(
2 * USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1,
USER_DIRECTORY_STALE_REFRESH_TIME * 2 + Duration(seconds=1),
self.kick_off_remote_profile_refresh_process,
)
return
@@ -656,7 +657,9 @@ class UserDirectoryHandler(StateDeltasHandler):
if not users:
return
_, _, next_try_at_ts = users[0]
delay = ((next_try_at_ts - self.clock.time_msec()) // 1000) + 2
delay = Duration(
milliseconds=next_try_at_ts - self.clock.time_msec()
) + Duration(seconds=2)
self._refresh_remote_profiles_call_later = self.clock.call_later(
delay,
self.kick_off_remote_profile_refresh_process,

View File

@@ -187,7 +187,7 @@ class WorkerLocksHandler:
lock.release_lock()
self._clock.call_later(
0,
Duration(seconds=0),
_wake_all_locks,
locks,
)

View File

@@ -87,6 +87,7 @@ from synapse.metrics import SERVER_NAME_LABEL
from synapse.types import ISynapseReactor, StrSequence
from synapse.util.async_helpers import timeout_deferred
from synapse.util.clock import Clock
from synapse.util.duration import Duration
from synapse.util.json import json_decoder
if TYPE_CHECKING:
@@ -172,7 +173,7 @@ def _make_scheduler(clock: Clock) -> Callable[[Callable[[], object]], IDelayedCa
def _scheduler(x: Callable[[], object]) -> IDelayedCall:
return clock.call_later(
_EPSILON,
Duration(seconds=_EPSILON),
x,
)

View File

@@ -1445,8 +1445,7 @@ class ModuleApi:
desc = f.__name__
return self._clock.call_later(
# convert ms to seconds as needed by call_later.
msec * 0.001,
Duration(milliseconds=msec),
self._hs.run_as_background_process,
desc,
lambda: maybe_awaitable(f(*args, **kwargs)),

View File

@@ -29,6 +29,7 @@ from synapse.push import Pusher, PusherConfig, PusherConfigException, ThrottlePa
from synapse.push.mailer import Mailer
from synapse.push.push_types import EmailReason
from synapse.storage.databases.main.event_push_actions import EmailPushAction
from synapse.util.duration import Duration
from synapse.util.threepids import validate_email
if TYPE_CHECKING:
@@ -229,7 +230,7 @@ class EmailPusher(Pusher):
if soonest_due_at is not None:
delay = self.seconds_until(soonest_due_at)
self.timed_call = self.hs.get_clock().call_later(
delay,
Duration(seconds=delay),
self.on_timer,
)

View File

@@ -337,7 +337,7 @@ class HttpPusher(Pusher):
else:
logger.info("Push failed: delaying for %ds", self.backoff_delay)
self.timed_call = self.hs.get_clock().call_later(
self.backoff_delay,
Duration(seconds=self.backoff_delay),
self.on_timer,
)
self.backoff_delay = min(

View File

@@ -632,7 +632,7 @@ class DatabasePool:
# Check ASAP (and then later, every 1s) to see if we have finished
# background updates of tables that aren't safe to update.
self._clock.call_later(
0.0,
Duration(seconds=0),
self.hs.run_as_background_process,
"upsert_safety_check",
self._check_safe_to_upsert,
@@ -680,7 +680,7 @@ class DatabasePool:
# If there's any updates still running, reschedule to run.
if background_update_names:
self._clock.call_later(
15.0,
Duration(seconds=15),
self.hs.run_as_background_process,
"upsert_safety_check",
self._check_safe_to_upsert,

View File

@@ -45,6 +45,7 @@ from synapse.storage.database import (
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.util.caches.descriptors import CachedFunction
from synapse.util.duration import Duration
from synapse.util.iterutils import batch_iter
if TYPE_CHECKING:
@@ -71,11 +72,11 @@ GET_E2E_CROSS_SIGNING_SIGNATURES_FOR_DEVICE_CACHE_NAME = (
# How long between cache invalidation table cleanups, once we have caught up
# with the backlog.
REGULAR_CLEANUP_INTERVAL_MS = Config.parse_duration("1h")
REGULAR_CLEANUP_INTERVAL = Duration(hours=1)
# How long between cache invalidation table cleanups, before we have caught
# up with the backlog.
CATCH_UP_CLEANUP_INTERVAL_MS = Config.parse_duration("1m")
CATCH_UP_CLEANUP_INTERVAL = Duration(minutes=1)
# Maximum number of cache invalidation rows to delete at once.
CLEAN_UP_MAX_BATCH_SIZE = 20_000
@@ -139,7 +140,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self.database_engine, PostgresEngine
):
self.hs.get_clock().call_later(
CATCH_UP_CLEANUP_INTERVAL_MS / 1000,
CATCH_UP_CLEANUP_INTERVAL,
self._clean_up_cache_invalidation_wrapper,
)
@@ -825,12 +826,12 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
# Vary how long we wait before calling again depending on whether we
# are still sifting through backlog or we have caught up.
if in_backlog:
next_interval = CATCH_UP_CLEANUP_INTERVAL_MS
next_interval = CATCH_UP_CLEANUP_INTERVAL
else:
next_interval = REGULAR_CLEANUP_INTERVAL_MS
next_interval = REGULAR_CLEANUP_INTERVAL
self.hs.get_clock().call_later(
next_interval / 1000,
next_interval,
self._clean_up_cache_invalidation_wrapper,
)

View File

@@ -214,7 +214,7 @@ class RegistrationWorkerStore(StatsStore, CacheInvalidationWorkerStore):
if hs.config.worker.run_background_tasks:
self.clock.call_later(
0.0,
Duration(seconds=0),
self._set_expiration_date_when_missing,
)

View File

@@ -114,7 +114,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
Duration(minutes=1),
)
self.hs.get_clock().call_later(
1,
Duration(seconds=1),
self._count_known_servers,
)
federation_known_servers_gauge.register_hook(

View File

@@ -819,7 +819,9 @@ def timeout_deferred(
# We don't track these calls since they are short.
delayed_call = clock.call_later(
timeout, time_it_out, call_later_cancel_on_shutdown=cancel_on_shutdown
Duration(seconds=timeout),
time_it_out,
call_later_cancel_on_shutdown=cancel_on_shutdown,
)
def convert_cancelled(value: Failure) -> Failure:

View File

@@ -42,6 +42,7 @@ from synapse.logging.opentracing import (
from synapse.util.async_helpers import AbstractObservableDeferred, ObservableDeferred
from synapse.util.caches import EvictionReason, register_cache
from synapse.util.clock import Clock
from synapse.util.duration import Duration
logger = logging.getLogger(__name__)
@@ -120,7 +121,7 @@ class ResponseCache(Generic[KV]):
self._result_cache: dict[KV, ResponseCacheEntry] = {}
self.clock = clock
self.timeout_sec = timeout_ms / 1000.0
self.timeout = Duration(milliseconds=timeout_ms)
self._name = name
self._metrics = register_cache(
@@ -195,9 +196,9 @@ class ResponseCache(Generic[KV]):
# if this cache has a non-zero timeout, and the callback has not cleared
# the should_cache bit, we leave it in the cache for now and schedule
# its removal later.
if self.timeout_sec and context.should_cache:
if self.timeout and context.should_cache:
self.clock.call_later(
self.timeout_sec,
self.timeout,
self._entry_timeout,
key,
# We don't need to track these calls since they don't hold any strong

View File

@@ -254,7 +254,7 @@ class Clock:
def call_later(
self,
delay: float,
delay: Duration,
callback: Callable,
*args: Any,
call_later_cancel_on_shutdown: bool = True,
@@ -267,7 +267,7 @@ class Clock:
`run_as_background_process` to give it more specific label and track metrics.
Args:
delay: How long to wait in seconds.
delay: How long to wait.
callback: Function to call
*args: Postional arguments to pass to function.
call_later_cancel_on_shutdown: Whether this call should be tracked for cleanup during
@@ -325,7 +325,9 @@ class Clock:
# We can ignore the lint here since this class is the one location callLater should
# be called.
call = self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) # type: ignore[call-later-not-tracked]
call = self._reactor.callLater(
delay.as_secs(), wrapped_callback, *args, **kwargs
) # type: ignore[call-later-not-tracked]
logger.debug(
"call_later(%s): Scheduled call for %ss later (tracked for shutdown: %s)",

View File

@@ -13,6 +13,7 @@
#
from datetime import timedelta
from typing import overload
# Constant so we don't keep creating new timedelta objects when calling
# `.as_millis()`.
@@ -38,3 +39,79 @@ class Duration(timedelta):
def as_secs(self) -> float:
"""Returns the duration in seconds."""
return self.total_seconds()
# Override arithmetic operations to return Duration instances
def __add__(self, other: timedelta) -> "Duration":
"""Add two durations together, returning a Duration."""
result = super().__add__(other)
return Duration(seconds=result.total_seconds())
def __radd__(self, other: timedelta) -> "Duration":
"""Add two durations together (reversed), returning a Duration."""
result = super().__radd__(other)
return Duration(seconds=result.total_seconds())
def __sub__(self, other: timedelta) -> "Duration":
"""Subtract two durations, returning a Duration."""
result = super().__sub__(other)
return Duration(seconds=result.total_seconds())
def __rsub__(self, other: timedelta) -> "Duration":
"""Subtract two durations (reversed), returning a Duration."""
result = super().__rsub__(other)
return Duration(seconds=result.total_seconds())
def __mul__(self, other: float) -> "Duration":
"""Multiply a duration by a scalar, returning a Duration."""
result = super().__mul__(other)
return Duration(seconds=result.total_seconds())
def __rmul__(self, other: float) -> "Duration":
"""Multiply a duration by a scalar (reversed), returning a Duration."""
result = super().__rmul__(other)
return Duration(seconds=result.total_seconds())
@overload
def __truediv__(self, other: timedelta) -> float: ...
@overload
def __truediv__(self, other: float) -> "Duration": ...
def __truediv__(self, other: float | timedelta) -> "Duration | float":
"""Divide a duration by a scalar or another duration.
If dividing by a scalar, returns a Duration.
If dividing by a timedelta, returns a float ratio.
"""
result = super().__truediv__(other)
if isinstance(other, timedelta):
# Dividing by a timedelta gives a float ratio
assert isinstance(result, float)
return result
else:
# Dividing by a scalar gives a Duration
assert isinstance(result, timedelta)
return Duration(seconds=result.total_seconds())
@overload
def __floordiv__(self, other: timedelta) -> int: ...
@overload
def __floordiv__(self, other: int) -> "Duration": ...
def __floordiv__(self, other: int | timedelta) -> "Duration | int":
"""Floor divide a duration by a scalar or another duration.
If dividing by a scalar, returns a Duration.
If dividing by a timedelta, returns an int ratio.
"""
result = super().__floordiv__(other)
if isinstance(other, timedelta):
# Dividing by a timedelta gives an int ratio
assert isinstance(result, int)
return result
else:
# Dividing by a scalar gives a Duration
assert isinstance(result, timedelta)
return Duration(seconds=result.total_seconds())

View File

@@ -417,6 +417,6 @@ class _PerHostRatelimiter:
pass
self.clock.call_later(
0.0,
Duration(seconds=0),
start_next_request,
)

View File

@@ -469,7 +469,7 @@ class TaskScheduler:
# Try launch a new task since we've finished with this one.
self._clock.call_later(
0.1,
Duration(milliseconds=100),
self._launch_scheduled_tasks,
)

View File

@@ -239,7 +239,9 @@ class LoggingContextTestCase(unittest.TestCase):
callback_finished = True
with LoggingContext(name="foo", server_name="test_server"):
clock.call_later(0, lambda: defer.ensureDeferred(competing_callback()))
clock.call_later(
Duration(seconds=0), lambda: defer.ensureDeferred(competing_callback())
)
self._check_test_key("foo")
await clock.sleep(Duration(seconds=0))
self._check_test_key("foo")