mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-07 01:20:16 +00:00
Compare commits
2 Commits
anoa/codex
...
v1.138.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0fbf296c99 | ||
|
|
0c8594c9a8 |
@@ -1,3 +1,12 @@
|
|||||||
|
# Synapse 1.138.1 (2025-09-24)
|
||||||
|
|
||||||
|
## Bugfixes
|
||||||
|
|
||||||
|
- Fix a performance regression related to the experimental Delayed Events ([MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140)) feature. ([\#18926](https://github.com/element-hq/synapse/issues/18926))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Synapse 1.138.0 (2025-09-09)
|
# Synapse 1.138.0 (2025-09-09)
|
||||||
|
|
||||||
No significant changes since 1.138.0rc1.
|
No significant changes since 1.138.0rc1.
|
||||||
|
|||||||
6
debian/changelog
vendored
6
debian/changelog
vendored
@@ -1,3 +1,9 @@
|
|||||||
|
matrix-synapse-py3 (1.138.1) stable; urgency=medium
|
||||||
|
|
||||||
|
* New Synapse release 1.138.1.
|
||||||
|
|
||||||
|
-- Synapse Packaging team <packages@matrix.org> Wed, 24 Sep 2025 11:32:38 +0100
|
||||||
|
|
||||||
matrix-synapse-py3 (1.138.0) stable; urgency=medium
|
matrix-synapse-py3 (1.138.0) stable; urgency=medium
|
||||||
|
|
||||||
* New Synapse release 1.138.0.
|
* New Synapse release 1.138.0.
|
||||||
|
|||||||
@@ -101,7 +101,7 @@ module-name = "synapse.synapse_rust"
|
|||||||
|
|
||||||
[tool.poetry]
|
[tool.poetry]
|
||||||
name = "matrix-synapse"
|
name = "matrix-synapse"
|
||||||
version = "1.138.0"
|
version = "1.138.1"
|
||||||
description = "Homeserver for the Matrix decentralised comms protocol"
|
description = "Homeserver for the Matrix decentralised comms protocol"
|
||||||
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
|
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
|
||||||
license = "AGPL-3.0-or-later"
|
license = "AGPL-3.0-or-later"
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, List, Optional, Set, Tuple
|
|||||||
from twisted.internet.interfaces import IDelayedCall
|
from twisted.internet.interfaces import IDelayedCall
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.api.errors import ShadowBanError
|
from synapse.api.errors import ShadowBanError, SynapseError
|
||||||
from synapse.api.ratelimiting import Ratelimiter
|
from synapse.api.ratelimiting import Ratelimiter
|
||||||
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
|
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
|
||||||
from synapse.logging.opentracing import set_tag
|
from synapse.logging.opentracing import set_tag
|
||||||
@@ -45,6 +45,7 @@ from synapse.types import (
|
|||||||
)
|
)
|
||||||
from synapse.util.events import generate_fake_event_id
|
from synapse.util.events import generate_fake_event_id
|
||||||
from synapse.util.metrics import Measure
|
from synapse.util.metrics import Measure
|
||||||
|
from synapse.util.sentinel import Sentinel
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
@@ -146,10 +147,37 @@ class DelayedEventsHandler:
|
|||||||
)
|
)
|
||||||
|
|
||||||
async def _unsafe_process_new_event(self) -> None:
|
async def _unsafe_process_new_event(self) -> None:
|
||||||
|
# We purposefully fetch the current max room stream ordering before
|
||||||
|
# doing anything else, as it could increment duing processing of state
|
||||||
|
# deltas. We want to avoid updating `delayed_events_stream_pos` past
|
||||||
|
# the stream ordering of the state deltas we've processed. Otherwise
|
||||||
|
# we'll leave gaps in our processing.
|
||||||
|
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
|
||||||
|
|
||||||
|
# Check that there are actually any delayed events to process. If not, bail early.
|
||||||
|
delayed_events_count = await self._store.get_count_of_delayed_events()
|
||||||
|
if delayed_events_count == 0:
|
||||||
|
# There are no delayed events to process. Update the
|
||||||
|
# `delayed_events_stream_pos` to the latest `events` stream pos and
|
||||||
|
# exit early.
|
||||||
|
self._event_pos = room_max_stream_ordering
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
"No delayed events to process. Updating `delayed_events_stream_pos` to max stream ordering (%s)",
|
||||||
|
room_max_stream_ordering,
|
||||||
|
)
|
||||||
|
|
||||||
|
await self._store.update_delayed_events_stream_pos(room_max_stream_ordering)
|
||||||
|
|
||||||
|
event_processing_positions.labels(
|
||||||
|
name="delayed_events", **{SERVER_NAME_LABEL: self.server_name}
|
||||||
|
).set(room_max_stream_ordering)
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
# If self._event_pos is None then means we haven't fetched it from the DB yet
|
# If self._event_pos is None then means we haven't fetched it from the DB yet
|
||||||
if self._event_pos is None:
|
if self._event_pos is None:
|
||||||
self._event_pos = await self._store.get_delayed_events_stream_pos()
|
self._event_pos = await self._store.get_delayed_events_stream_pos()
|
||||||
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
|
|
||||||
if self._event_pos > room_max_stream_ordering:
|
if self._event_pos > room_max_stream_ordering:
|
||||||
# apparently, we've processed more events than exist in the database!
|
# apparently, we've processed more events than exist in the database!
|
||||||
# this can happen if events are removed with history purge or similar.
|
# this can happen if events are removed with history purge or similar.
|
||||||
@@ -167,7 +195,7 @@ class DelayedEventsHandler:
|
|||||||
self._clock, name="delayed_events_delta", server_name=self.server_name
|
self._clock, name="delayed_events_delta", server_name=self.server_name
|
||||||
):
|
):
|
||||||
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
|
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
|
||||||
if self._event_pos == room_max_stream_ordering:
|
if self._event_pos >= room_max_stream_ordering:
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
@@ -202,23 +230,81 @@ class DelayedEventsHandler:
|
|||||||
Process current state deltas to cancel other users' pending delayed events
|
Process current state deltas to cancel other users' pending delayed events
|
||||||
that target the same state.
|
that target the same state.
|
||||||
"""
|
"""
|
||||||
|
# Get the senders of each delta's state event (as sender information is
|
||||||
|
# not currently stored in the `current_state_deltas` table).
|
||||||
|
event_id_and_sender_dict = await self._store.get_senders_for_event_ids(
|
||||||
|
[delta.event_id for delta in deltas if delta.event_id is not None]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Note: No need to batch as `get_current_state_deltas` will only ever
|
||||||
|
# return 100 rows at a time.
|
||||||
for delta in deltas:
|
for delta in deltas:
|
||||||
|
logger.debug(
|
||||||
|
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
|
||||||
|
)
|
||||||
|
|
||||||
|
# `delta.event_id` and `delta.sender` can be `None` in a few valid
|
||||||
|
# cases (see the docstring of
|
||||||
|
# `get_current_state_delta_membership_changes_for_user` for details).
|
||||||
if delta.event_id is None:
|
if delta.event_id is None:
|
||||||
logger.debug(
|
# TODO: Differentiate between this being caused by a state reset
|
||||||
"Not handling delta for deleted state: %r %r",
|
# which removed a user from a room, or the homeserver
|
||||||
|
# purposefully having left the room. We can do so by checking
|
||||||
|
# whether there are any local memberships still left in the
|
||||||
|
# room. If so, then this is the result of a state reset.
|
||||||
|
#
|
||||||
|
# If it is a state reset, we should avoid cancelling new,
|
||||||
|
# delayed state events due to old state resurfacing. So we
|
||||||
|
# should skip and log a warning in this case.
|
||||||
|
#
|
||||||
|
# If the homeserver has left the room, then we should cancel all
|
||||||
|
# delayed state events intended for this room, as there is no
|
||||||
|
# need to try and send a delayed event into a room we've left.
|
||||||
|
logger.warning(
|
||||||
|
"Skipping state delta (%r, %r) without corresponding event ID. "
|
||||||
|
"This can happen if the homeserver has left the room (in which "
|
||||||
|
"case this can be ignored), or if there has been a state reset "
|
||||||
|
"which has caused the sender to be kicked out of the room",
|
||||||
delta.event_type,
|
delta.event_type,
|
||||||
delta.state_key,
|
delta.state_key,
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
logger.debug(
|
sender_str = event_id_and_sender_dict.get(
|
||||||
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
|
delta.event_id, Sentinel.UNSET_SENTINEL
|
||||||
)
|
)
|
||||||
|
if sender_str is None:
|
||||||
event = await self._store.get_event(delta.event_id, allow_none=True)
|
# An event exists, but the `sender` field was "null" and Synapse
|
||||||
if not event:
|
# incorrectly accepted the event. This is not expected.
|
||||||
|
logger.error(
|
||||||
|
"Skipping state delta with event ID '%s' as 'sender' was None. "
|
||||||
|
"This is unexpected - please report it as a bug!",
|
||||||
|
delta.event_id,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
if sender_str is Sentinel.UNSET_SENTINEL:
|
||||||
|
# We have an event ID, but the event was not found in the
|
||||||
|
# datastore. This can happen if a room, or its history, is
|
||||||
|
# purged. State deltas related to the room are left behind, but
|
||||||
|
# the event no longer exists.
|
||||||
|
#
|
||||||
|
# As we cannot get the sender of this event, we can't calculate
|
||||||
|
# whether to cancel delayed events related to this one. So we skip.
|
||||||
|
logger.debug(
|
||||||
|
"Skipping state delta with event ID '%s' - the room, or its history, may have been purged",
|
||||||
|
delta.event_id,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
sender = UserID.from_string(sender_str)
|
||||||
|
except SynapseError as e:
|
||||||
|
logger.error(
|
||||||
|
"Skipping state delta with Matrix User ID '%s' that failed to parse: %s",
|
||||||
|
sender_str,
|
||||||
|
e,
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
sender = UserID.from_string(event.sender)
|
|
||||||
|
|
||||||
next_send_ts = await self._store.cancel_delayed_state_events(
|
next_send_ts = await self._store.cancel_delayed_state_events(
|
||||||
room_id=delta.room_id,
|
room_id=delta.room_id,
|
||||||
|
|||||||
@@ -1540,7 +1540,7 @@ class PresenceHandler(BasePresenceHandler):
|
|||||||
self.clock, name="presence_delta", server_name=self.server_name
|
self.clock, name="presence_delta", server_name=self.server_name
|
||||||
):
|
):
|
||||||
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
|
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
|
||||||
if self._event_pos == room_max_stream_ordering:
|
if self._event_pos >= room_max_stream_ordering:
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
|
|||||||
@@ -13,7 +13,6 @@
|
|||||||
#
|
#
|
||||||
|
|
||||||
|
|
||||||
import enum
|
|
||||||
import logging
|
import logging
|
||||||
from itertools import chain
|
from itertools import chain
|
||||||
from typing import (
|
from typing import (
|
||||||
@@ -75,6 +74,7 @@ from synapse.types.handlers.sliding_sync import (
|
|||||||
)
|
)
|
||||||
from synapse.types.state import StateFilter
|
from synapse.types.state import StateFilter
|
||||||
from synapse.util import MutableOverlayMapping
|
from synapse.util import MutableOverlayMapping
|
||||||
|
from synapse.util.sentinel import Sentinel
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
@@ -83,12 +83,6 @@ if TYPE_CHECKING:
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class Sentinel(enum.Enum):
|
|
||||||
# defining a sentinel in this way allows mypy to correctly handle the
|
|
||||||
# type of a dictionary lookup and subsequent type narrowing.
|
|
||||||
UNSET_SENTINEL = object()
|
|
||||||
|
|
||||||
|
|
||||||
# Helper definition for the types that we might return. We do this to avoid
|
# Helper definition for the types that we might return. We do this to avoid
|
||||||
# copying data between types (which can be expensive for many rooms).
|
# copying data between types (which can be expensive for many rooms).
|
||||||
RoomsForUserType = Union[RoomsForUserStateReset, RoomsForUser, RoomsForUserSlidingSync]
|
RoomsForUserType = Union[RoomsForUserStateReset, RoomsForUser, RoomsForUserSlidingSync]
|
||||||
|
|||||||
@@ -682,6 +682,8 @@ class StateStorageController:
|
|||||||
- the stream id which these results go up to
|
- the stream id which these results go up to
|
||||||
- list of current_state_delta_stream rows. If it is empty, we are
|
- list of current_state_delta_stream rows. If it is empty, we are
|
||||||
up to date.
|
up to date.
|
||||||
|
|
||||||
|
A maximum of 100 rows will be returned.
|
||||||
"""
|
"""
|
||||||
# FIXME(faster_joins): what do we do here?
|
# FIXME(faster_joins): what do we do here?
|
||||||
# https://github.com/matrix-org/synapse/issues/13008
|
# https://github.com/matrix-org/synapse/issues/13008
|
||||||
|
|||||||
@@ -182,6 +182,21 @@ class DelayedEventsStore(SQLBaseStore):
|
|||||||
"restart_delayed_event", restart_delayed_event_txn
|
"restart_delayed_event", restart_delayed_event_txn
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def get_count_of_delayed_events(self) -> int:
|
||||||
|
"""Returns the number of pending delayed events in the DB."""
|
||||||
|
|
||||||
|
def _get_count_of_delayed_events(txn: LoggingTransaction) -> int:
|
||||||
|
sql = "SELECT count(*) FROM delayed_events"
|
||||||
|
|
||||||
|
txn.execute(sql)
|
||||||
|
resp = txn.fetchone()
|
||||||
|
return resp[0] if resp is not None else 0
|
||||||
|
|
||||||
|
return await self.db_pool.runInteraction(
|
||||||
|
"get_count_of_delayed_events",
|
||||||
|
_get_count_of_delayed_events,
|
||||||
|
)
|
||||||
|
|
||||||
async def get_all_delayed_events_for_user(
|
async def get_all_delayed_events_for_user(
|
||||||
self,
|
self,
|
||||||
user_localpart: str,
|
user_localpart: str,
|
||||||
|
|||||||
@@ -2135,6 +2135,39 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
|
|
||||||
return rows, to_token, True
|
return rows, to_token, True
|
||||||
|
|
||||||
|
async def get_senders_for_event_ids(
|
||||||
|
self, event_ids: Collection[str]
|
||||||
|
) -> Dict[str, Optional[str]]:
|
||||||
|
"""
|
||||||
|
Given a sequence of event IDs, return the sender associated with each.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event_ids: A collection of event IDs as strings.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A dict of event ID -> sender of the event.
|
||||||
|
|
||||||
|
If a given event ID does not exist in the `events` table, then no entry
|
||||||
|
for that event ID will be returned.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _get_senders_for_event_ids(
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
) -> Dict[str, Optional[str]]:
|
||||||
|
rows = self.db_pool.simple_select_many_txn(
|
||||||
|
txn=txn,
|
||||||
|
table="events",
|
||||||
|
column="event_id",
|
||||||
|
iterable=event_ids,
|
||||||
|
keyvalues={},
|
||||||
|
retcols=["event_id", "sender"],
|
||||||
|
)
|
||||||
|
return dict(rows)
|
||||||
|
|
||||||
|
return await self.db_pool.runInteraction(
|
||||||
|
"get_senders_for_event_ids", _get_senders_for_event_ids
|
||||||
|
)
|
||||||
|
|
||||||
@cached(max_entries=5000)
|
@cached(max_entries=5000)
|
||||||
async def get_event_ordering(self, event_id: str, room_id: str) -> Tuple[int, int]:
|
async def get_event_ordering(self, event_id: str, room_id: str) -> Tuple[int, int]:
|
||||||
res = await self.db_pool.simple_select_one(
|
res = await self.db_pool.simple_select_one(
|
||||||
|
|||||||
@@ -94,6 +94,8 @@ class StateDeltasStore(SQLBaseStore):
|
|||||||
- the stream id which these results go up to
|
- the stream id which these results go up to
|
||||||
- list of current_state_delta_stream rows. If it is empty, we are
|
- list of current_state_delta_stream rows. If it is empty, we are
|
||||||
up to date.
|
up to date.
|
||||||
|
|
||||||
|
A maximum of 100 rows will be returned.
|
||||||
"""
|
"""
|
||||||
prev_stream_id = int(prev_stream_id)
|
prev_stream_id = int(prev_stream_id)
|
||||||
|
|
||||||
|
|||||||
21
synapse/util/sentinel.py
Normal file
21
synapse/util/sentinel.py
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
#
|
||||||
|
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||||
|
#
|
||||||
|
# Copyright (C) 2025 New Vector, Ltd
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as
|
||||||
|
# published by the Free Software Foundation, either version 3 of the
|
||||||
|
# License, or (at your option) any later version.
|
||||||
|
#
|
||||||
|
# See the GNU Affero General Public License for more details:
|
||||||
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||||
|
#
|
||||||
|
|
||||||
|
import enum
|
||||||
|
|
||||||
|
|
||||||
|
class Sentinel(enum.Enum):
|
||||||
|
# defining a sentinel in this way allows mypy to correctly handle the
|
||||||
|
# type of a dictionary lookup and subsequent type narrowing.
|
||||||
|
UNSET_SENTINEL = object()
|
||||||
@@ -20,7 +20,7 @@
|
|||||||
#
|
#
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from typing import List, Optional
|
from typing import Dict, List, Optional
|
||||||
|
|
||||||
from twisted.internet.testing import MemoryReactor
|
from twisted.internet.testing import MemoryReactor
|
||||||
|
|
||||||
@@ -39,6 +39,77 @@ from tests.unittest import HomeserverTestCase
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class EventsTestCase(HomeserverTestCase):
|
||||||
|
servlets = [
|
||||||
|
admin.register_servlets,
|
||||||
|
room.register_servlets,
|
||||||
|
login.register_servlets,
|
||||||
|
]
|
||||||
|
|
||||||
|
def prepare(
|
||||||
|
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
|
||||||
|
) -> None:
|
||||||
|
self._store = self.hs.get_datastores().main
|
||||||
|
|
||||||
|
def test_get_senders_for_event_ids(self) -> None:
|
||||||
|
"""Tests the `get_senders_for_event_ids` storage function."""
|
||||||
|
|
||||||
|
users_and_tokens: Dict[str, str] = {}
|
||||||
|
for localpart_suffix in range(10):
|
||||||
|
localpart = f"user_{localpart_suffix}"
|
||||||
|
user_id = self.register_user(localpart, "rabbit")
|
||||||
|
token = self.login(localpart, "rabbit")
|
||||||
|
|
||||||
|
users_and_tokens[user_id] = token
|
||||||
|
|
||||||
|
room_creator_user_id = self.register_user("room_creator", "rabbit")
|
||||||
|
room_creator_token = self.login("room_creator", "rabbit")
|
||||||
|
users_and_tokens[room_creator_user_id] = room_creator_token
|
||||||
|
|
||||||
|
# Create a room and invite some users.
|
||||||
|
room_id = self.helper.create_room_as(
|
||||||
|
room_creator_user_id, tok=room_creator_token
|
||||||
|
)
|
||||||
|
event_ids_to_senders: Dict[str, str] = {}
|
||||||
|
for user_id, token in users_and_tokens.items():
|
||||||
|
if user_id == room_creator_user_id:
|
||||||
|
continue
|
||||||
|
|
||||||
|
self.helper.invite(
|
||||||
|
room=room_id,
|
||||||
|
targ=user_id,
|
||||||
|
tok=room_creator_token,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Have the user accept the invite and join the room.
|
||||||
|
self.helper.join(
|
||||||
|
room=room_id,
|
||||||
|
user=user_id,
|
||||||
|
tok=token,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Have the user send an event.
|
||||||
|
response = self.helper.send_event(
|
||||||
|
room_id=room_id,
|
||||||
|
type="m.room.message",
|
||||||
|
content={
|
||||||
|
"msgtype": "m.text",
|
||||||
|
"body": f"hello, I'm {user_id}!",
|
||||||
|
},
|
||||||
|
tok=token,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Record the event ID and sender.
|
||||||
|
event_id = response["event_id"]
|
||||||
|
event_ids_to_senders[event_id] = user_id
|
||||||
|
|
||||||
|
# Check that `get_senders_for_event_ids` returns the correct data.
|
||||||
|
response = self.get_success(
|
||||||
|
self._store.get_senders_for_event_ids(list(event_ids_to_senders.keys()))
|
||||||
|
)
|
||||||
|
self.assert_dict(event_ids_to_senders, response)
|
||||||
|
|
||||||
|
|
||||||
class ExtremPruneTestCase(HomeserverTestCase):
|
class ExtremPruneTestCase(HomeserverTestCase):
|
||||||
servlets = [
|
servlets = [
|
||||||
admin.register_servlets,
|
admin.register_servlets,
|
||||||
|
|||||||
Reference in New Issue
Block a user