Compare commits

...

12 Commits

Author SHA1 Message Date
Half-Shot
0b997bdd6e Add support for Sliding Sync 2025-07-07 10:38:31 +01:00
Half-Shot
613a7ea847 Add support for appservices. 2025-07-07 10:38:19 +01:00
Half-Shot
5c91913f5a Cleanup handling of legacy sync 2025-07-07 10:37:29 +01:00
Half-Shot
c810460e40 Document db changes. 2025-07-07 10:37:03 +01:00
Half-Shot
e4312d9af8 fix typo 2025-07-07 10:36:31 +01:00
Half-Shot
7a91519fe8 Add stream ID to the AS column 2025-07-07 10:32:05 +01:00
Half-Shot
e6fcab4309 Ensure we store the room version 2025-07-07 10:31:58 +01:00
Half-Shot
2160889a3a Update store methods 2025-07-07 10:31:50 +01:00
Will Hunt
0355e6fd6d Add hacks to sync to send the deleted room down sync. 2025-06-27 17:25:56 +01:00
Will Hunt
1904389b5d Store deleted room members before deleting the room 2025-06-27 17:25:48 +01:00
Will Hunt
8f33017653 Add functions to store and get the deleted room members 2025-06-27 17:25:28 +01:00
Will Hunt
b590056ba7 Add deleted rooms table to schema. 2025-06-27 17:25:15 +01:00
10 changed files with 404 additions and 19 deletions

View File

@@ -345,7 +345,7 @@ def check_state_dependent_auth_rules(
logger.debug("Allowing! %s", event)
return
# 5. If type is m.room.membership
# 5. If type is m.room.member
if event.type == EventTypes.Member:
_is_membership_change_allowed(event.room_version, event, auth_dict)
logger.debug("Allowing! %s", event)

View File

@@ -36,7 +36,7 @@ from prometheus_client import Counter
from twisted.internet import defer
import synapse
from synapse.api.constants import EduTypes, EventTypes
from synapse.api.constants import EduTypes, EventTypes, Membership
from synapse.appservice import ApplicationService
from synapse.events import EventBase
from synapse.handlers.presence import format_user_presence_state
@@ -139,6 +139,24 @@ class ApplicationServicesHandler:
events_by_room: Dict[str, List[EventBase]] = {}
for event in events:
# We never want to send leave events from deleted rooms to the AS since the events
# may have been purged by the time it's pushed to the AS. Instead, we handle this
# elsewhere.
membership = event.content.get("membership")
state_key = event.get_state_key()
if (
state_key is not None
and event.type == EventTypes.Member
and membership == Membership.LEAVE
):
if await self.store.has_room_been_deleted(event.room_id):
logger.debug(
"Filtering %s from appservice as it's a leave event (%s) from deleted room %s",
event.event_id,
state_key,
event.room_id,
)
continue
events_by_room.setdefault(event.room_id, []).append(event)
async def handle_event(event: EventBase) -> None:
@@ -222,6 +240,18 @@ class ApplicationServicesHandler:
finally:
self.is_processing = False
async def notify_room_deletion(
self, room_id: str, deleted_stream_id: int, users: Iterable[str]
) -> None:
services = self.store.get_app_services()
for service in services:
if any(
service.is_interested_in_user(user_id) for user_id in users
) or await service.is_interested_in_room(room_id, self.store):
await self.store.create_appservice_stream_id_txn(
service, deleted_stream_id
)
def notify_interested_services_ephemeral(
self,
stream_key: StreamKeyType,

View File

@@ -1801,6 +1801,7 @@ class RoomShutdownHandler:
self._replication = hs.get_replication_data_handler()
self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
self.event_creation_handler = hs.get_event_creation_handler()
self.as_handler = hs.get_application_service_handler()
self.store = hs.get_datastores().main
async def shutdown_room(
@@ -1920,12 +1921,23 @@ class RoomShutdownHandler:
else:
logger.info("Shutting down room %r", room_id)
users = await self.store.get_local_users_related_to_room(room_id)
for user_id, membership in users:
# If the user is not in the room (or is banned), nothing to do.
if membership not in (Membership.JOIN, Membership.INVITE, Membership.KNOCK):
continue
# If the user is not in the room (or is banned), nothing to do.
users = [
user
for user in await self.store.get_local_users_related_to_room(room_id)
if user[1] in (Membership.JOIN, Membership.INVITE, Membership.KNOCK)
]
# When deleting a room, we want to store the local membership state so that we
# can still send synthetic leaves down sync after the room has been purged (if indeed it has).
# We must do this prior to kicking as otherwise the current_state_events
# table will be empty.
delete_stream_id = await self.store.store_deleted_room_members(room_id)
await self.as_handler.notify_room_deletion(
room_id, delete_stream_id, [user_id for user_id, _membership in users]
)
for user_id, membership in users:
logger.info("Kicking %r from %r...", user_id, room_id)
try:

View File

@@ -1235,6 +1235,10 @@ class SlidingSyncRoomLists:
)
)
deleted_rooms = await self.store.get_deleted_rooms_for_user(
user_id, to_token.room_key.stream
)
# 1) Assemble a list of the last membership events in some given ranges. Someone
# could have left and joined multiple times during the given range but we only
# care about end-result so we grab the last one.
@@ -1295,6 +1299,32 @@ class SlidingSyncRoomLists:
event_pos=last_membership_change_in_from_to_range.event_pos,
room_version_id=await self.store.get_room_version_id(room_id),
)
for room_id, room_version_id, stream_pos in deleted_rooms:
if newly_left_room_map[room_id]:
logger.info(
"Room %s in newly deleted list, not handling for %s",
room_id,
user_id,
)
# It's possible that if the user is syncing at the same time the room is deleted then they will
# see a genuine leave event from the room, so we don't need a synthetic leave.
continue
# Otherwise, generate a synthetic leave to tell clients that the room has been deleted.
logger.info(
"Generating synthetic leave for %s in %s as room was deleted.",
user_id,
room_id,
)
# Note we use RoomsForUserStateReset here as it's ideal for the purpose of a deleted
# room where the event that caused us to leave no longer exists.
newly_left_room_map[room_id] = RoomsForUserStateReset(
room_id=room_id,
sender=None,
membership=Membership.LEAVE,
event_id=None,
event_pos=stream_pos,
room_version_id=room_version_id,
)
# 2) Figure out `newly_joined`
for room_id in possibly_newly_joined_room_ids:

View File

@@ -52,7 +52,7 @@ from synapse.api.constants import (
from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.events import EventBase, make_event_from_dict
from synapse.handlers.relations import BundledAggregations
from synapse.logging import issue9533_logger
from synapse.logging.context import current_context
@@ -1829,7 +1829,7 @@ class SyncHandler:
full_state,
)
logger.debug(
logger.info(
"Calculating sync response for %r between %s and %s",
sync_config.user,
sync_result_builder.since_token,
@@ -2386,6 +2386,9 @@ class SyncHandler:
since_token = sync_result_builder.since_token
user_id = sync_result_builder.sync_config.user.to_string()
logger.info(
"Generating _generate_sync_entry_for_rooms for %s %s", user_id, since_token
)
blocks_all_rooms = (
sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
@@ -2427,19 +2430,22 @@ class SyncHandler:
# no point in going further.
if not sync_result_builder.full_state:
if since_token and not ephemeral_by_room and not account_data_by_room:
have_changed = await self._have_rooms_changed(sync_result_builder)
have_changed = await self._have_rooms_changed(
sync_result_builder, user_id
)
log_kv({"rooms_have_changed": have_changed})
if not have_changed:
tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key
)
if not tags_by_room:
logger.debug("no-oping sync")
logger.info("no-oping sync")
return set(), set()
# 3. Work out which rooms need reporting in the sync response.
ignored_users = await self.store.ignored_users(user_id)
if since_token:
logger.info("With since_token %s %s", user_id, since_token)
room_changes = await self._get_room_changes_for_incremental_sync(
sync_result_builder, ignored_users
)
@@ -2484,7 +2490,7 @@ class SyncHandler:
return set(newly_joined_rooms), set(newly_left_rooms)
async def _have_rooms_changed(
self, sync_result_builder: "SyncResultBuilder"
self, sync_result_builder: "SyncResultBuilder", user_id: str
) -> bool:
"""Returns whether there may be any new events that should be sent down
the sync. Returns True if there are.
@@ -2499,6 +2505,13 @@ class SyncHandler:
if membership_change_events or sync_result_builder.forced_newly_joined_room_ids:
return True
# If we have any deleted rooms to send down sync (which do not appear down the event paths)
# then also emit a room change.
if await self.store.has_deleted_rooms_for_user(
user_id, since_token.room_key.stream
):
return True
stream_id = since_token.room_key.stream
for room_id in sync_result_builder.joined_room_ids:
if self.store.has_room_changed_since(room_id, stream_id):
@@ -2757,6 +2770,56 @@ class SyncHandler:
room_entries.append(entry)
deleted_left_rooms = await self.store.get_deleted_rooms_for_user(
user_id, since_token.room_key.stream
)
for room_id, room_version, deleted_stream_id in deleted_left_rooms:
if room_id in newly_left_rooms:
# It's possible that if the user is syncing at the same time the room is deleted then they will
# see a genuine leave event from the room, so we don't need a synthetic leave.
continue
# Otherwise, generate a synthetic leave to tell clients that the room has been deleted.
logger.info(
"Generating synthetic leave for %s in %s as room was deleted.",
user_id,
room_id,
)
# Synthetic leaves for deleted rooms
leave_evt = make_event_from_dict(
{
"state_key": user_id,
"sender": user_id,
"room_id": room_id,
"type": "m.room.member",
"content": {
"membership": "leave",
"reason": "The room has been deleted",
},
},
# We have no idea what the room version is since the room is gone
KNOWN_ROOM_VERSIONS[room_version],
)
# Ensure the event is treated as an outlier since we are not persisting this!
leave_evt.internal_metadata.outlier = True
leave_evt.internal_metadata.out_of_band_membership = True
leave_evt.internal_metadata.stream_ordering = deleted_stream_id.stream
room_entries.append(
RoomSyncResultBuilder(
room_id=room_id,
rtype="archived",
events=[leave_evt],
newly_joined=False,
full_state=False,
since_token=since_token,
upto_token=since_token,
end_token=since_token,
out_of_band=True,
)
)
newly_left_rooms.append(room_id)
return _RoomChanges(
room_entries,
invited,

View File

@@ -30,7 +30,7 @@ from synapse.appservice import (
TransactionUnusedFallbackKeys,
)
from synapse.config.appservice import load_appservices
from synapse.events import EventBase
from synapse.events import EventBase, make_event_from_dict
from synapse.storage._base import db_to_json
from synapse.storage.database import (
DatabasePool,
@@ -319,6 +319,75 @@ class ApplicationServiceTransactionWorkerStore(
"create_appservice_txn", _create_appservice_txn
)
async def get_deleted_room_members_for_appservice(
self, room_stream_id: int
) -> List[EventBase]:
# If we have a room_stream_id, let's see if there are any deleted events.
events: List[EventBase] = []
logger.info("Processing deleted rooms stream ID %s", room_stream_id)
rooms = await self.get_deleted_room_members_at(room_stream_id)
for room_id, room_data in rooms.items():
room_version, members = room_data
for user_id in members:
leave_evt = make_event_from_dict(
{
"state_key": user_id,
"sender": user_id,
"room_id": room_id,
"type": "m.room.member",
"content": {
"membership": "leave",
"reason": "The room has been deleted",
},
},
room_version,
)
events.append(leave_evt)
return events
async def create_appservice_stream_id_txn(
self,
service: ApplicationService,
stream_id: int,
) -> AppServiceTransaction:
"""Atomically creates a new transaction for this application service
with the given room stream token.
Args:
service: The service who the transaction is for.
token: A list of persistent events to put in the transaction.
Returns:
A new transaction.
"""
events = await self.get_deleted_room_members_for_appservice(stream_id)
def _create_appservice_txn(txn: LoggingTransaction) -> AppServiceTransaction:
new_txn_id = self._as_txn_seq_gen.get_next_id_txn(txn)
# Insert new txn into txn table
txn.execute(
"INSERT INTO application_services_txns(as_id, txn_id, event_ids, stream_id) "
"VALUES(?,?,?,?)",
(service.id, new_txn_id, [], stream_id),
)
return AppServiceTransaction(
service=service,
id=new_txn_id,
events=events,
ephemeral=[],
to_device_messages=[],
one_time_keys_count={},
unused_fallback_keys={},
device_list_summary=DeviceListUpdates(),
)
return await self.db_pool.runInteraction(
"create_appservice_txn", _create_appservice_txn
)
async def complete_appservice_txn(
self, txn_id: int, service: ApplicationService
) -> None:
@@ -354,15 +423,15 @@ class ApplicationServiceTransactionWorkerStore(
def _get_oldest_unsent_txn(
txn: LoggingTransaction,
) -> Optional[Tuple[int, str]]:
) -> Optional[Tuple[int, str, Optional[int]]]:
# Monotonically increasing txn ids, so just select the smallest
# one in the txns table (we delete them when they are sent)
txn.execute(
"SELECT txn_id, event_ids FROM application_services_txns WHERE as_id=?"
"SELECT txn_id, event_ids, stream_id FROM application_services_txns WHERE as_id=?"
" ORDER BY txn_id ASC LIMIT 1",
(service.id,),
)
return cast(Optional[Tuple[int, str]], txn.fetchone())
return cast(Optional[Tuple[int, str, Optional[int]]], txn.fetchone())
entry = await self.db_pool.runInteraction(
"get_oldest_unsent_appservice_txn", _get_oldest_unsent_txn
@@ -371,11 +440,14 @@ class ApplicationServiceTransactionWorkerStore(
if not entry:
return None
txn_id, event_ids_str = entry
txn_id, event_ids_str, room_stream_id = entry
event_ids = db_to_json(event_ids_str)
events = await self.get_events_as_list(event_ids)
if room_stream_id:
events += await self.get_deleted_room_members_for_appservice(room_stream_id)
# TODO: to-device messages, one-time key counts, device list summaries and unused
# fallback keys are not yet populated for catch-up transactions.
# We likely want to populate those for reliability.

View File

@@ -102,6 +102,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
super().__init__(database, db_conn, hs)
self._server_notices_mxid = hs.config.servernotices.server_notices_mxid
self._our_server_name = hs.config.server.server_name
if (
self.hs.config.worker.run_background_tasks
@@ -1388,7 +1389,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
rows = cast(
List[Tuple[str, str, str]],
await self.db_pool.simple_select_many_batch(
await self.db_pool.simple_select_onecol(
table="room_memberships",
column="event_id",
iterable=member_event_ids,
@@ -1845,6 +1846,146 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
"_get_room_participation_txn", _get_room_participation_txn, user_id, room_id
)
async def store_deleted_room_members(
self,
room_id: str,
) -> int:
"""Get all local members of a given room and copy them to
the deleted room members table.
This should be run just before a room is deleted (before any
kicks are made).
Args:
room_id: the ID of the room
"""
max = self.get_room_max_stream_ordering()
room_version = await self.get_room_version_id()
def _store_deleted_room_members_txn(
txn: LoggingTransaction,
) -> None:
# This copies the current membership from the room into the deleted members table,
# taking care to preseve the old stream ordering for users who were banned or left,
# otherwise using the latest stream ordering.
sql = """
INSERT INTO deleted_room_members (room_id, user_id, deleted_at_stream_id, room_version)
SELECT room_id, state_key, (CASE
WHEN (membership = 'ban' OR membership = 'leave') THEN event_stream_ordering
ELSE ?
END), ? FROM current_state_events
WHERE type = 'm.room.member'
AND room_id = ?
AND state_key LIKE ?
"""
return txn.execute(
sql,
(max, room_version, room_id, "%" + self._our_server_name),
)
await self.db_pool.runInteraction(
"store_deleted_room_members",
_store_deleted_room_members_txn,
)
return max
async def get_deleted_rooms_for_user(
self, user_id: str, stream_pos: int
) -> list[(str, str, PersistedEventPosition)]:
"""Get all rooms and stream positions of deleted rooms to
send down the user's sync.
Returns a tuple of room_id, stream position.
"""
def _get_deleted_rooms_for_user(
txn: LoggingTransaction,
) -> list[(str, str, PersistedEventPosition)]:
sql = """
SELECT room_id, room_version, deleted_at_stream_id FROM deleted_room_members
WHERE user_id = ?
AND ? < deleted_at_stream_id
"""
txn.execute(sql, (user_id, stream_pos))
return [(r[0], r[1], PersistedEventPosition("master", r[2])) for r in txn]
return await self.db_pool.runInteraction(
"get_deleted_rooms_for_user", _get_deleted_rooms_for_user
)
async def has_deleted_rooms_for_user(self, user_id: str, stream_pos: int) -> bool:
"""Checks if the user has any outstanding deleted rooms to send
down sync.
Returns true if there are rooms, otherwise false.
"""
def _has_deleted_rooms_for_user(txn: LoggingTransaction) -> bool:
sql = """
SELECT 1 FROM deleted_room_members
WHERE user_id = ?
AND ? < deleted_at_stream_id
LIMIT 1
"""
txn.execute(sql, (user_id, stream_pos))
return bool(txn.fetchone())
return await self.db_pool.runInteraction(
"has_deleted_rooms_for_user", _has_deleted_rooms_for_user
)
async def get_deleted_room_members_at(
self, stream_pos: int
) -> dict[str, (str, list[str])]:
"""Gets any rooms and users have been deleted globally at the given stream pos.
Returns true if there are rooms, otherwise false.
"""
def _get_deleted_room_members_at(
txn: LoggingTransaction,
) -> dict[str, (str, list[str])]:
sql = """
SELECT room_id, room_version, user_id FROM deleted_room_members
WHERE deleted_at_stream_id = ?
LIMIT 1
"""
txn.execute(sql, (stream_pos,))
room_map: dict[str, (str, list[str])] = {}
for room_id, room_version, user_id in txn:
if not room_map.get(room_id):
room_map[room_id] = (room_version,[])
room_map[room_id].append(user_id)
return room_map
return await self.db_pool.runInteraction(
"get_deleted_room_members_at", _get_deleted_room_members_at
)
@cached()
async def has_room_been_deleted(self, room_id: str) -> bool:
"""Checks if a room has been deleted.
Returns True if there are any members in the deleted room, otherwise False.
"""
def _has_room_been_deleted(txn: LoggingTransaction) -> bool:
sql = """
SELECT 1 FROM deleted_room_members
WHERE room_id = ?
LIMIT 1
"""
txn.execute(sql, (room_id, ))
return bool(txn.fetchone())
return await self.db_pool.runInteraction(
"has_room_been_deleted", _has_room_been_deleted
)
class RoomMemberBackgroundUpdateStore(SQLBaseStore):
def __init__(

View File

@@ -19,7 +19,7 @@
#
#
SCHEMA_VERSION = 92 # remember to update the list below when updating
SCHEMA_VERSION = 93 # remember to update the list below when updating
"""Represents the expectations made by the codebase about the database schema
This should be incremented whenever the codebase changes its requirements on the
@@ -168,6 +168,9 @@ Changes in SCHEMA_VERSION = 91
Changes in SCHEMA_VERSION = 92
- Cleaned up a trigger that was added in #18260 and then reverted.
Changes in SCHEMA_VERSION = 93:
- Add `deleted_room_members` table.
- Add `steam_id` to the `application_services_txns` table.
"""

View File

@@ -0,0 +1,20 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
CREATE TABLE deleted_room_members (
room_id TEXT NOT NULL,
room_version TEXT NOT NULL,
user_id TEXT NOT NULL,
deleted_at_stream_id bigint NOT NULL
);
CREATE UNIQUE INDEX deleted_room_member_idx ON deleted_room_members(room_id, user_id);

View File

@@ -0,0 +1,14 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
ALTER TABLE application_services_txns ADD COLUMN stream_id BIGINT;