Compare commits

...

13 Commits

Author SHA1 Message Date
Andrew Morgan
b4877a6915 state reset TODO 2025-09-19 11:26:16 +01:00
Andrew Morgan
a67b0362d2 logger.error for missing sender 2025-09-19 11:25:25 +01:00
Andrew Morgan
ef38b2e714 Prevent potential infinite loop if we bypass the max
Apply the same fix to a similar loop in the presence handling code.
2025-09-19 11:17:39 +01:00
Andrew Morgan
1eb25f10cc Revert "Only schedule next delayed event once"
This reverts commit 9f1c7e32bb.

This change caused delayed events to not be scheduled, thus in turn
causing the `TestDelayedEvents/delayed_.*_events_are_sent_on_timeout`
tests to fail.

I don't fully understand why my change didn't work, but have spent
enough time looking at the code. The optimisation was very slight
anyhow, so I don't think it's worth persuing for longer.
2025-09-18 15:14:01 +01:00
Andrew Morgan
68fb55200f More error handling 2025-09-18 13:20:22 +01:00
Andrew Morgan
4ff8184868 link MSC4140 in newsfile 2025-09-18 11:46:15 +01:00
Andrew Morgan
d1974a2c46 Note that order of fetching stream orderings is important 2025-09-18 11:46:15 +01:00
Andrew Morgan
8d975485b5 Note 100 row limit in docstrings 2025-09-18 11:46:15 +01:00
Andrew Morgan
a0faac9c60 newsfile 2025-09-17 13:42:25 +01:00
Andrew Morgan
76b885b1ff Bail out early if there are no delayed events to send
There are zero delayed events on matrix.org's database. We can save a
lot of time pulling out state deltas by first just checking if there are
any delayed events to actually deal with.
2025-09-17 13:38:21 +01:00
Andrew Morgan
9f1c7e32bb Only schedule next delayed event once
There's no need to do this for every event in the batch. Just find the min time for all delayed events.
2025-09-17 13:37:38 +01:00
Andrew Morgan
e3ad18cec9 Stop calling get_event and use new storage function
We also only call it once for all events in the batch, instead of per state delta.
2025-09-17 13:36:38 +01:00
Andrew Morgan
fadb7eb662 Add a storage function to pull out senders for event _ids
This is preferable to pulling out entire events and only fetching the `sender` field.
2025-09-17 13:35:44 +01:00
7 changed files with 105 additions and 7 deletions

1
changelog.d/18926.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a performance regression related to the experimental Delayed Events ([MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140)) feature.

View File

@@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, List, Optional, Set, Tuple
from twisted.internet.interfaces import IDelayedCall
from synapse.api.constants import EventTypes
from synapse.api.errors import ShadowBanError
from synapse.api.errors import ShadowBanError, SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
from synapse.logging.opentracing import set_tag
@@ -146,10 +146,37 @@ class DelayedEventsHandler:
)
async def _unsafe_process_new_event(self) -> None:
# We purposefully fetch the current max room stream ordering before
# doing anything else, as it could increment duing processing of state
# deltas. We want to avoid updating `delayed_events_stream_pos` past
# the stream ordering of the state deltas we've processed. Otherwise
# we'll leave gaps in our processing.
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
# Check that there are actually any delayed events to process. If not, bail early.
delayed_events_count = await self._store.get_count_of_delayed_events()
if delayed_events_count == 0:
# There are no delayed events to process. Update the
# `delayed_events_stream_pos` to the latest `events` stream pos and
# exit early.
self._event_pos = room_max_stream_ordering
logger.debug(
"No delayed events to process. Updating `delayed_events_stream_pos` to max stream ordering (%s)",
room_max_stream_ordering,
)
await self._store.update_delayed_events_stream_pos(room_max_stream_ordering)
event_processing_positions.labels(
name="delayed_events", **{SERVER_NAME_LABEL: self.server_name}
).set(room_max_stream_ordering)
return
# If self._event_pos is None then means we haven't fetched it from the DB yet
if self._event_pos is None:
self._event_pos = await self._store.get_delayed_events_stream_pos()
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos > room_max_stream_ordering:
# apparently, we've processed more events than exist in the database!
# this can happen if events are removed with history purge or similar.
@@ -167,7 +194,7 @@ class DelayedEventsHandler:
self._clock, name="delayed_events_delta", server_name=self.server_name
):
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering:
if self._event_pos >= room_max_stream_ordering:
return
logger.debug(
@@ -202,6 +229,16 @@ class DelayedEventsHandler:
Process current state deltas to cancel other users' pending delayed events
that target the same state.
"""
# TODO: How to handle state deltas that are the result of a state reset?
# Get the senders of each delta's state event (as sender information is
# not currently stored in the `current_state_deltas` table).
event_id_and_sender_dict = await self._store.get_senders_for_event_ids(
[delta.event_id for delta in deltas if delta.event_id is not None]
)
# Note: No need to batch as `get_current_state_deltas` will only ever
# return 100 rows at a time.
for delta in deltas:
if delta.event_id is None:
logger.debug(
@@ -215,10 +252,23 @@ class DelayedEventsHandler:
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
)
event = await self._store.get_event(delta.event_id, allow_none=True)
if not event:
sender_str = event_id_and_sender_dict.get(delta.event_id, None)
if sender_str is None:
logger.error(
"Skipping state delta with event ID '%s' as 'sender' was unknown. This is unexpected - please report it as a bug!",
delta.event_id,
)
continue
try:
sender = UserID.from_string(sender_str)
except SynapseError as e:
logger.error(
"Skipping state delta with Matrix User ID '%s' that failed to parse: %s",
sender_str,
e,
)
continue
sender = UserID.from_string(event.sender)
next_send_ts = await self._store.cancel_delayed_state_events(
room_id=delta.room_id,

View File

@@ -1548,7 +1548,7 @@ class PresenceHandler(BasePresenceHandler):
self.clock, name="presence_delta", server_name=self.server_name
):
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering:
if self._event_pos >= room_max_stream_ordering:
return
logger.debug(

View File

@@ -682,6 +682,8 @@ class StateStorageController:
- the stream id which these results go up to
- list of current_state_delta_stream rows. If it is empty, we are
up to date.
A maximum of 100 rows will be returned.
"""
# FIXME(faster_joins): what do we do here?
# https://github.com/matrix-org/synapse/issues/13008

View File

@@ -182,6 +182,21 @@ class DelayedEventsStore(SQLBaseStore):
"restart_delayed_event", restart_delayed_event_txn
)
async def get_count_of_delayed_events(self) -> int:
"""Returns the number of pending delayed events in the DB."""
def _get_count_of_delayed_events(txn: LoggingTransaction) -> int:
sql = "SELECT count(*) FROM delayed_events"
txn.execute(sql)
resp = txn.fetchone()
return resp[0] if resp is not None else 0
return await self.db_pool.runInteraction(
"get_count_of_delayed_events",
_get_count_of_delayed_events,
)
async def get_all_delayed_events_for_user(
self,
user_localpart: str,

View File

@@ -2135,6 +2135,34 @@ class EventsWorkerStore(SQLBaseStore):
return rows, to_token, True
async def get_senders_for_event_ids(
self, event_ids: Collection[str]
) -> Dict[str, str]:
"""
Given a sequence of event IDs, return the sender associated with each.
Args:
event_ids: A collection of event IDs as strings.
Returns:
A dict of event ID -> sender of the event.
"""
def _get_senders_for_event_ids(txn: LoggingTransaction) -> Dict[str, str]:
rows = self.db_pool.simple_select_many_txn(
txn=txn,
table="events",
column="event_id",
iterable=event_ids,
keyvalues={},
retcols=["event_id", "sender"],
)
return dict(rows)
return await self.db_pool.runInteraction(
"get_senders_for_event_ids", _get_senders_for_event_ids
)
@cached(max_entries=5000)
async def get_event_ordering(self, event_id: str, room_id: str) -> Tuple[int, int]:
res = await self.db_pool.simple_select_one(

View File

@@ -94,6 +94,8 @@ class StateDeltasStore(SQLBaseStore):
- the stream id which these results go up to
- list of current_state_delta_stream rows. If it is empty, we are
up to date.
A maximum of 100 rows will be returned.
"""
prev_stream_id = int(prev_stream_id)