Compare commits

...

2 Commits

Author SHA1 Message Date
Andrew Morgan
0fbf296c99 1.138.1 2025-09-24 11:32:48 +01:00
Andrew Morgan
0c8594c9a8 Fix performance regression related to delayed events processing (#18926) 2025-09-24 11:30:47 +01:00
12 changed files with 260 additions and 21 deletions

View File

@@ -1,3 +1,12 @@
# Synapse 1.138.1 (2025-09-24)
## Bugfixes
- Fix a performance regression related to the experimental Delayed Events ([MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140)) feature. ([\#18926](https://github.com/element-hq/synapse/issues/18926))
# Synapse 1.138.0 (2025-09-09) # Synapse 1.138.0 (2025-09-09)
No significant changes since 1.138.0rc1. No significant changes since 1.138.0rc1.

6
debian/changelog vendored
View File

@@ -1,3 +1,9 @@
matrix-synapse-py3 (1.138.1) stable; urgency=medium
* New Synapse release 1.138.1.
-- Synapse Packaging team <packages@matrix.org> Wed, 24 Sep 2025 11:32:38 +0100
matrix-synapse-py3 (1.138.0) stable; urgency=medium matrix-synapse-py3 (1.138.0) stable; urgency=medium
* New Synapse release 1.138.0. * New Synapse release 1.138.0.

View File

@@ -101,7 +101,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry] [tool.poetry]
name = "matrix-synapse" name = "matrix-synapse"
version = "1.138.0" version = "1.138.1"
description = "Homeserver for the Matrix decentralised comms protocol" description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"] authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "AGPL-3.0-or-later" license = "AGPL-3.0-or-later"

View File

@@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, List, Optional, Set, Tuple
from twisted.internet.interfaces import IDelayedCall from twisted.internet.interfaces import IDelayedCall
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.api.errors import ShadowBanError from synapse.api.errors import ShadowBanError, SynapseError
from synapse.api.ratelimiting import Ratelimiter from synapse.api.ratelimiting import Ratelimiter
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
from synapse.logging.opentracing import set_tag from synapse.logging.opentracing import set_tag
@@ -45,6 +45,7 @@ from synapse.types import (
) )
from synapse.util.events import generate_fake_event_id from synapse.util.events import generate_fake_event_id
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.util.sentinel import Sentinel
if TYPE_CHECKING: if TYPE_CHECKING:
from synapse.server import HomeServer from synapse.server import HomeServer
@@ -146,10 +147,37 @@ class DelayedEventsHandler:
) )
async def _unsafe_process_new_event(self) -> None: async def _unsafe_process_new_event(self) -> None:
# We purposefully fetch the current max room stream ordering before
# doing anything else, as it could increment duing processing of state
# deltas. We want to avoid updating `delayed_events_stream_pos` past
# the stream ordering of the state deltas we've processed. Otherwise
# we'll leave gaps in our processing.
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
# Check that there are actually any delayed events to process. If not, bail early.
delayed_events_count = await self._store.get_count_of_delayed_events()
if delayed_events_count == 0:
# There are no delayed events to process. Update the
# `delayed_events_stream_pos` to the latest `events` stream pos and
# exit early.
self._event_pos = room_max_stream_ordering
logger.debug(
"No delayed events to process. Updating `delayed_events_stream_pos` to max stream ordering (%s)",
room_max_stream_ordering,
)
await self._store.update_delayed_events_stream_pos(room_max_stream_ordering)
event_processing_positions.labels(
name="delayed_events", **{SERVER_NAME_LABEL: self.server_name}
).set(room_max_stream_ordering)
return
# If self._event_pos is None then means we haven't fetched it from the DB yet # If self._event_pos is None then means we haven't fetched it from the DB yet
if self._event_pos is None: if self._event_pos is None:
self._event_pos = await self._store.get_delayed_events_stream_pos() self._event_pos = await self._store.get_delayed_events_stream_pos()
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos > room_max_stream_ordering: if self._event_pos > room_max_stream_ordering:
# apparently, we've processed more events than exist in the database! # apparently, we've processed more events than exist in the database!
# this can happen if events are removed with history purge or similar. # this can happen if events are removed with history purge or similar.
@@ -167,7 +195,7 @@ class DelayedEventsHandler:
self._clock, name="delayed_events_delta", server_name=self.server_name self._clock, name="delayed_events_delta", server_name=self.server_name
): ):
room_max_stream_ordering = self._store.get_room_max_stream_ordering() room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering: if self._event_pos >= room_max_stream_ordering:
return return
logger.debug( logger.debug(
@@ -202,23 +230,81 @@ class DelayedEventsHandler:
Process current state deltas to cancel other users' pending delayed events Process current state deltas to cancel other users' pending delayed events
that target the same state. that target the same state.
""" """
# Get the senders of each delta's state event (as sender information is
# not currently stored in the `current_state_deltas` table).
event_id_and_sender_dict = await self._store.get_senders_for_event_ids(
[delta.event_id for delta in deltas if delta.event_id is not None]
)
# Note: No need to batch as `get_current_state_deltas` will only ever
# return 100 rows at a time.
for delta in deltas: for delta in deltas:
logger.debug(
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
)
# `delta.event_id` and `delta.sender` can be `None` in a few valid
# cases (see the docstring of
# `get_current_state_delta_membership_changes_for_user` for details).
if delta.event_id is None: if delta.event_id is None:
logger.debug( # TODO: Differentiate between this being caused by a state reset
"Not handling delta for deleted state: %r %r", # which removed a user from a room, or the homeserver
# purposefully having left the room. We can do so by checking
# whether there are any local memberships still left in the
# room. If so, then this is the result of a state reset.
#
# If it is a state reset, we should avoid cancelling new,
# delayed state events due to old state resurfacing. So we
# should skip and log a warning in this case.
#
# If the homeserver has left the room, then we should cancel all
# delayed state events intended for this room, as there is no
# need to try and send a delayed event into a room we've left.
logger.warning(
"Skipping state delta (%r, %r) without corresponding event ID. "
"This can happen if the homeserver has left the room (in which "
"case this can be ignored), or if there has been a state reset "
"which has caused the sender to be kicked out of the room",
delta.event_type, delta.event_type,
delta.state_key, delta.state_key,
) )
continue continue
logger.debug( sender_str = event_id_and_sender_dict.get(
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id delta.event_id, Sentinel.UNSET_SENTINEL
) )
if sender_str is None:
event = await self._store.get_event(delta.event_id, allow_none=True) # An event exists, but the `sender` field was "null" and Synapse
if not event: # incorrectly accepted the event. This is not expected.
logger.error(
"Skipping state delta with event ID '%s' as 'sender' was None. "
"This is unexpected - please report it as a bug!",
delta.event_id,
)
continue
if sender_str is Sentinel.UNSET_SENTINEL:
# We have an event ID, but the event was not found in the
# datastore. This can happen if a room, or its history, is
# purged. State deltas related to the room are left behind, but
# the event no longer exists.
#
# As we cannot get the sender of this event, we can't calculate
# whether to cancel delayed events related to this one. So we skip.
logger.debug(
"Skipping state delta with event ID '%s' - the room, or its history, may have been purged",
delta.event_id,
)
continue
try:
sender = UserID.from_string(sender_str)
except SynapseError as e:
logger.error(
"Skipping state delta with Matrix User ID '%s' that failed to parse: %s",
sender_str,
e,
)
continue continue
sender = UserID.from_string(event.sender)
next_send_ts = await self._store.cancel_delayed_state_events( next_send_ts = await self._store.cancel_delayed_state_events(
room_id=delta.room_id, room_id=delta.room_id,

View File

@@ -1540,7 +1540,7 @@ class PresenceHandler(BasePresenceHandler):
self.clock, name="presence_delta", server_name=self.server_name self.clock, name="presence_delta", server_name=self.server_name
): ):
room_max_stream_ordering = self.store.get_room_max_stream_ordering() room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering: if self._event_pos >= room_max_stream_ordering:
return return
logger.debug( logger.debug(

View File

@@ -13,7 +13,6 @@
# #
import enum
import logging import logging
from itertools import chain from itertools import chain
from typing import ( from typing import (
@@ -75,6 +74,7 @@ from synapse.types.handlers.sliding_sync import (
) )
from synapse.types.state import StateFilter from synapse.types.state import StateFilter
from synapse.util import MutableOverlayMapping from synapse.util import MutableOverlayMapping
from synapse.util.sentinel import Sentinel
if TYPE_CHECKING: if TYPE_CHECKING:
from synapse.server import HomeServer from synapse.server import HomeServer
@@ -83,12 +83,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Sentinel(enum.Enum):
# defining a sentinel in this way allows mypy to correctly handle the
# type of a dictionary lookup and subsequent type narrowing.
UNSET_SENTINEL = object()
# Helper definition for the types that we might return. We do this to avoid # Helper definition for the types that we might return. We do this to avoid
# copying data between types (which can be expensive for many rooms). # copying data between types (which can be expensive for many rooms).
RoomsForUserType = Union[RoomsForUserStateReset, RoomsForUser, RoomsForUserSlidingSync] RoomsForUserType = Union[RoomsForUserStateReset, RoomsForUser, RoomsForUserSlidingSync]

View File

@@ -682,6 +682,8 @@ class StateStorageController:
- the stream id which these results go up to - the stream id which these results go up to
- list of current_state_delta_stream rows. If it is empty, we are - list of current_state_delta_stream rows. If it is empty, we are
up to date. up to date.
A maximum of 100 rows will be returned.
""" """
# FIXME(faster_joins): what do we do here? # FIXME(faster_joins): what do we do here?
# https://github.com/matrix-org/synapse/issues/13008 # https://github.com/matrix-org/synapse/issues/13008

View File

@@ -182,6 +182,21 @@ class DelayedEventsStore(SQLBaseStore):
"restart_delayed_event", restart_delayed_event_txn "restart_delayed_event", restart_delayed_event_txn
) )
async def get_count_of_delayed_events(self) -> int:
"""Returns the number of pending delayed events in the DB."""
def _get_count_of_delayed_events(txn: LoggingTransaction) -> int:
sql = "SELECT count(*) FROM delayed_events"
txn.execute(sql)
resp = txn.fetchone()
return resp[0] if resp is not None else 0
return await self.db_pool.runInteraction(
"get_count_of_delayed_events",
_get_count_of_delayed_events,
)
async def get_all_delayed_events_for_user( async def get_all_delayed_events_for_user(
self, self,
user_localpart: str, user_localpart: str,

View File

@@ -2135,6 +2135,39 @@ class EventsWorkerStore(SQLBaseStore):
return rows, to_token, True return rows, to_token, True
async def get_senders_for_event_ids(
self, event_ids: Collection[str]
) -> Dict[str, Optional[str]]:
"""
Given a sequence of event IDs, return the sender associated with each.
Args:
event_ids: A collection of event IDs as strings.
Returns:
A dict of event ID -> sender of the event.
If a given event ID does not exist in the `events` table, then no entry
for that event ID will be returned.
"""
def _get_senders_for_event_ids(
txn: LoggingTransaction,
) -> Dict[str, Optional[str]]:
rows = self.db_pool.simple_select_many_txn(
txn=txn,
table="events",
column="event_id",
iterable=event_ids,
keyvalues={},
retcols=["event_id", "sender"],
)
return dict(rows)
return await self.db_pool.runInteraction(
"get_senders_for_event_ids", _get_senders_for_event_ids
)
@cached(max_entries=5000) @cached(max_entries=5000)
async def get_event_ordering(self, event_id: str, room_id: str) -> Tuple[int, int]: async def get_event_ordering(self, event_id: str, room_id: str) -> Tuple[int, int]:
res = await self.db_pool.simple_select_one( res = await self.db_pool.simple_select_one(

View File

@@ -94,6 +94,8 @@ class StateDeltasStore(SQLBaseStore):
- the stream id which these results go up to - the stream id which these results go up to
- list of current_state_delta_stream rows. If it is empty, we are - list of current_state_delta_stream rows. If it is empty, we are
up to date. up to date.
A maximum of 100 rows will be returned.
""" """
prev_stream_id = int(prev_stream_id) prev_stream_id = int(prev_stream_id)

21
synapse/util/sentinel.py Normal file
View File

@@ -0,0 +1,21 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
import enum
class Sentinel(enum.Enum):
# defining a sentinel in this way allows mypy to correctly handle the
# type of a dictionary lookup and subsequent type narrowing.
UNSET_SENTINEL = object()

View File

@@ -20,7 +20,7 @@
# #
import logging import logging
from typing import List, Optional from typing import Dict, List, Optional
from twisted.internet.testing import MemoryReactor from twisted.internet.testing import MemoryReactor
@@ -39,6 +39,77 @@ from tests.unittest import HomeserverTestCase
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class EventsTestCase(HomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]
def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
self._store = self.hs.get_datastores().main
def test_get_senders_for_event_ids(self) -> None:
"""Tests the `get_senders_for_event_ids` storage function."""
users_and_tokens: Dict[str, str] = {}
for localpart_suffix in range(10):
localpart = f"user_{localpart_suffix}"
user_id = self.register_user(localpart, "rabbit")
token = self.login(localpart, "rabbit")
users_and_tokens[user_id] = token
room_creator_user_id = self.register_user("room_creator", "rabbit")
room_creator_token = self.login("room_creator", "rabbit")
users_and_tokens[room_creator_user_id] = room_creator_token
# Create a room and invite some users.
room_id = self.helper.create_room_as(
room_creator_user_id, tok=room_creator_token
)
event_ids_to_senders: Dict[str, str] = {}
for user_id, token in users_and_tokens.items():
if user_id == room_creator_user_id:
continue
self.helper.invite(
room=room_id,
targ=user_id,
tok=room_creator_token,
)
# Have the user accept the invite and join the room.
self.helper.join(
room=room_id,
user=user_id,
tok=token,
)
# Have the user send an event.
response = self.helper.send_event(
room_id=room_id,
type="m.room.message",
content={
"msgtype": "m.text",
"body": f"hello, I'm {user_id}!",
},
tok=token,
)
# Record the event ID and sender.
event_id = response["event_id"]
event_ids_to_senders[event_id] = user_id
# Check that `get_senders_for_event_ids` returns the correct data.
response = self.get_success(
self._store.get_senders_for_event_ids(list(event_ids_to_senders.keys()))
)
self.assert_dict(event_ids_to_senders, response)
class ExtremPruneTestCase(HomeserverTestCase): class ExtremPruneTestCase(HomeserverTestCase):
servlets = [ servlets = [
admin.register_servlets, admin.register_servlets,