From 566670c363915691826b5b435c4aa7acde61b408 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Thu, 27 Nov 2025 16:44:17 +0000 Subject: [PATCH] Move `RestartDelayedEventServlet` to workers (#19207) --- changelog.d/19207.feature | 1 + docker/configure_workers_and_start.py | 1 + docs/upgrade.md | 8 ++++++ docs/workers.md | 5 +++- synapse/handlers/delayed_events.py | 25 ++++++++++++------- synapse/rest/client/delayed_events.py | 4 +-- .../storage/databases/main/delayed_events.py | 19 ++++++++++++-- 7 files changed, 49 insertions(+), 14 deletions(-) create mode 100644 changelog.d/19207.feature diff --git a/changelog.d/19207.feature b/changelog.d/19207.feature new file mode 100644 index 0000000000..e64562c350 --- /dev/null +++ b/changelog.d/19207.feature @@ -0,0 +1 @@ +Allow restarting delayed event timeouts on workers. diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index e19b0a0039..e7cbd701b8 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -196,6 +196,7 @@ WORKERS_CONFIG: dict[str, dict[str, Any]] = { "^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload", "^/_matrix/client/(api/v1|r0|v3|unstable)/keys/device_signing/upload$", "^/_matrix/client/(api/v1|r0|v3|unstable)/keys/signatures/upload$", + "^/_matrix/client/unstable/org.matrix.msc4140/delayed_events(/.*/restart)?$", ], "shared_extra_conf": {}, "worker_extra_conf": "", diff --git a/docs/upgrade.md b/docs/upgrade.md index 350b71fe47..5e7fa31580 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -119,6 +119,14 @@ stacking them up. You can monitor the currently running background updates with # Upgrading to v1.144.0 +## Worker support for unstable MSC4140 `/restart` endpoint + +The following unstable endpoint pattern may now be routed to worker processes: + +``` +^/_matrix/client/unstable/org.matrix.msc4140/delayed_events/.*/restart$ +``` + ## Unstable mutual rooms endpoint is now behind an experimental feature flag The unstable mutual rooms endpoint from diff --git a/docs/workers.md b/docs/workers.md index f766b40251..2bc8afa74f 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -285,10 +285,13 @@ information. # User directory search requests ^/_matrix/client/(r0|v3|unstable)/user_directory/search$ + # Unstable MSC4140 support + ^/_matrix/client/unstable/org.matrix.msc4140/delayed_events(/.*/restart)?$ + Additionally, the following REST endpoints can be handled for GET requests: + # Push rules requests ^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/ - ^/_matrix/client/unstable/org.matrix.msc4140/delayed_events # Account data requests ^/_matrix/client/(r0|v3|unstable)/.*/tags diff --git a/synapse/handlers/delayed_events.py b/synapse/handlers/delayed_events.py index de21e3abbb..8817b65316 100644 --- a/synapse/handlers/delayed_events.py +++ b/synapse/handlers/delayed_events.py @@ -96,16 +96,18 @@ class DelayedEventsHandler: self.notify_new_event, ) - # Delayed events that are already marked as processed on startup might not have been - # sent properly on the last run of the server, so unmark them to send them again. + # Now process any delayed events that are due to be sent. + # + # We set `reprocess_events` to True in case any events had been + # marked as processed, but had not yet actually been sent, + # before the homeserver stopped. + # # Caveat: this will double-send delayed events that successfully persisted, but failed # to be removed from the DB table of delayed events. # TODO: To avoid double-sending, scan the timeline to find which of these events were # already sent. To do so, must store delay_ids in sent events to retrieve them later. - await self._store.unprocess_delayed_events() - events, next_send_ts = await self._store.process_timeout_delayed_events( - self._get_current_ts() + self._get_current_ts(), reprocess_events=True ) if next_send_ts: @@ -423,18 +425,23 @@ class DelayedEventsHandler: Raises: NotFoundError: if no matching delayed event could be found. """ - assert self._is_master await self._delayed_event_mgmt_ratelimiter.ratelimit( None, request.getClientAddress().host ) - await make_deferred_yieldable(self._initialized_from_db) + + # Note: We don't need to wait on `self._initialized_from_db` here as the + # events that deals with are already marked as processed. + # + # `restart_delayed_events` will skip over such events entirely. next_send_ts = await self._store.restart_delayed_event( delay_id, self._get_current_ts() ) - if self._next_send_ts_changed(next_send_ts): - self._schedule_next_at(next_send_ts) + # Only the main process handles sending delayed events. + if self._is_master: + if self._next_send_ts_changed(next_send_ts): + self._schedule_next_at(next_send_ts) async def send(self, request: SynapseRequest, delay_id: str) -> None: """ diff --git a/synapse/rest/client/delayed_events.py b/synapse/rest/client/delayed_events.py index 69d1013e72..7afecffe2d 100644 --- a/synapse/rest/client/delayed_events.py +++ b/synapse/rest/client/delayed_events.py @@ -156,10 +156,10 @@ class DelayedEventsServlet(RestServlet): def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: - # The following can't currently be instantiated on workers. + # Most of the following can't currently be instantiated on workers. if hs.config.worker.worker_app is None: UpdateDelayedEventServlet(hs).register(http_server) CancelDelayedEventServlet(hs).register(http_server) - RestartDelayedEventServlet(hs).register(http_server) SendDelayedEventServlet(hs).register(http_server) + RestartDelayedEventServlet(hs).register(http_server) DelayedEventsServlet(hs).register(http_server) diff --git a/synapse/storage/databases/main/delayed_events.py b/synapse/storage/databases/main/delayed_events.py index 7f72be46f5..5547150515 100644 --- a/synapse/storage/databases/main/delayed_events.py +++ b/synapse/storage/databases/main/delayed_events.py @@ -259,7 +259,7 @@ class DelayedEventsStore(SQLBaseStore): ] async def process_timeout_delayed_events( - self, current_ts: Timestamp + self, current_ts: Timestamp, reprocess_events: bool = False ) -> tuple[ list[DelayedEventDetails], Timestamp | None, @@ -268,6 +268,16 @@ class DelayedEventsStore(SQLBaseStore): Marks for processing all delayed events that should have been sent prior to the provided time that haven't already been marked as such. + Args: + current_ts: The current timestamp. + reprocess_events: Whether to reprocess already-processed delayed + events. If set to True, events which are marked as processed + will have their `send_ts` re-checked. + + This is mainly useful for recovering from a server restart; + which could have occurred between an event being marked as + processed and the event actually being sent. + Returns: The details of all newly-processed delayed events, and the send time of the next delayed event to be sent, if any. """ @@ -292,7 +302,12 @@ class DelayedEventsStore(SQLBaseStore): ) ) sql_update = "UPDATE delayed_events SET is_processed = TRUE" - sql_where = "WHERE send_ts <= ? AND NOT is_processed" + sql_where = "WHERE send_ts <= ?" + + if not reprocess_events: + # Skip already-processed events. + sql_where += " AND NOT is_processed" + sql_args = (current_ts,) sql_order = "ORDER BY send_ts" if isinstance(self.database_engine, PostgresEngine):