mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Pass leave from remote invite rejection down Sliding Sync (#18375)
Fixes #17753 ### Dev notes The `sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` database tables were added in https://github.com/element-hq/synapse/pull/17512 ### Pull Request Checklist <!-- Please read https://element-hq.github.io/synapse/latest/development/contributing_guide.html before submitting your pull request --> * [X] Pull request is based on the develop branch * [x] Pull request includes a [changelog file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should: - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.". - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry. * [X] [Code style](https://element-hq.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters)) --------- Co-authored-by: Erik Johnston <erik@matrix.org> Co-authored-by: Olivier 'reivilibre <oliverw@matrix.org> Co-authored-by: Eric Eastwood <erice@element.io>
This commit is contained in:
1
changelog.d/18375.bugfix
Normal file
1
changelog.d/18375.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Pass leave from remote invite rejection down Sliding Sync.
|
||||
@@ -271,6 +271,7 @@ class SlidingSyncHandler:
|
||||
from_token=from_token,
|
||||
to_token=to_token,
|
||||
newly_joined=room_id in interested_rooms.newly_joined_rooms,
|
||||
newly_left=room_id in interested_rooms.newly_left_rooms,
|
||||
is_dm=room_id in interested_rooms.dm_room_ids,
|
||||
)
|
||||
|
||||
@@ -542,6 +543,7 @@ class SlidingSyncHandler:
|
||||
from_token: Optional[SlidingSyncStreamToken],
|
||||
to_token: StreamToken,
|
||||
newly_joined: bool,
|
||||
newly_left: bool,
|
||||
is_dm: bool,
|
||||
) -> SlidingSyncResult.RoomResult:
|
||||
"""
|
||||
@@ -559,6 +561,7 @@ class SlidingSyncHandler:
|
||||
from_token: The point in the stream to sync from.
|
||||
to_token: The point in the stream to sync up to.
|
||||
newly_joined: If the user has newly joined the room
|
||||
newly_left: If the user has newly left the room
|
||||
is_dm: Whether the room is a DM room
|
||||
"""
|
||||
user = sync_config.user
|
||||
@@ -856,6 +859,26 @@ class SlidingSyncHandler:
|
||||
# TODO: Limit the number of state events we're about to send down
|
||||
# the room, if its too many we should change this to an
|
||||
# `initial=True`?
|
||||
|
||||
# For the case of rejecting remote invites, the leave event won't be
|
||||
# returned by `get_current_state_deltas_for_room`. This is due to the current
|
||||
# state only being filled out for rooms the server is in, and so doesn't pick
|
||||
# up out-of-band leaves (including locally rejected invites) as these events
|
||||
# are outliers and not added to the `current_state_delta_stream`.
|
||||
#
|
||||
# We rely on being explicitly told that the room has been `newly_left` to
|
||||
# ensure we extract the out-of-band leave.
|
||||
if newly_left and room_membership_for_user_at_to_token.event_id is not None:
|
||||
membership_changed = True
|
||||
leave_event = await self.store.get_event(
|
||||
room_membership_for_user_at_to_token.event_id
|
||||
)
|
||||
state_key = leave_event.get_state_key()
|
||||
if state_key is not None:
|
||||
room_state_delta_id_map[(leave_event.type, state_key)] = (
|
||||
room_membership_for_user_at_to_token.event_id
|
||||
)
|
||||
|
||||
deltas = await self.get_current_state_deltas_for_room(
|
||||
room_id=room_id,
|
||||
room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
|
||||
|
||||
@@ -1120,7 +1120,7 @@ class SlidingSyncRoomLists:
|
||||
(
|
||||
newly_joined_room_ids,
|
||||
newly_left_room_map,
|
||||
) = await self._get_newly_joined_and_left_rooms(
|
||||
) = await self._get_newly_joined_and_left_rooms_fallback(
|
||||
user_id, to_token=to_token, from_token=from_token
|
||||
)
|
||||
|
||||
@@ -1176,6 +1176,53 @@ class SlidingSyncRoomLists:
|
||||
"state reset" out of the room, and so that room would not be part of the
|
||||
"current memberships" of the user.
|
||||
|
||||
Returns:
|
||||
A 2-tuple of newly joined room IDs and a map of newly_left room
|
||||
IDs to the `RoomsForUserStateReset` entry.
|
||||
|
||||
We're using `RoomsForUserStateReset` but that doesn't necessarily mean the
|
||||
user was state reset of the rooms. It's just that the `event_id`/`sender`
|
||||
are optional and we can't tell the difference between the server leaving the
|
||||
room when the user was the last person participating in the room and left or
|
||||
was state reset out of the room. To actually check for a state reset, you
|
||||
need to check if a membership still exists in the room.
|
||||
"""
|
||||
|
||||
newly_joined_room_ids: Set[str] = set()
|
||||
newly_left_room_map: Dict[str, RoomsForUserStateReset] = {}
|
||||
|
||||
if not from_token:
|
||||
return newly_joined_room_ids, newly_left_room_map
|
||||
|
||||
changes = await self.store.get_sliding_sync_membership_changes(
|
||||
user_id,
|
||||
from_key=from_token.room_key,
|
||||
to_key=to_token.room_key,
|
||||
excluded_room_ids=set(self.rooms_to_exclude_globally),
|
||||
)
|
||||
|
||||
for room_id, entry in changes.items():
|
||||
if entry.membership == Membership.JOIN:
|
||||
newly_joined_room_ids.add(room_id)
|
||||
elif entry.membership == Membership.LEAVE:
|
||||
newly_left_room_map[room_id] = entry
|
||||
|
||||
return newly_joined_room_ids, newly_left_room_map
|
||||
|
||||
@trace
|
||||
async def _get_newly_joined_and_left_rooms_fallback(
|
||||
self,
|
||||
user_id: str,
|
||||
to_token: StreamToken,
|
||||
from_token: Optional[StreamToken],
|
||||
) -> Tuple[AbstractSet[str], Mapping[str, RoomsForUserStateReset]]:
|
||||
"""Fetch the sets of rooms that the user newly joined or left in the
|
||||
given token range.
|
||||
|
||||
Note: there may be rooms in the newly left rooms where the user was
|
||||
"state reset" out of the room, and so that room would not be part of the
|
||||
"current memberships" of the user.
|
||||
|
||||
Returns:
|
||||
A 2-tuple of newly joined room IDs and a map of newly_left room
|
||||
IDs to the `RoomsForUserStateReset` entry.
|
||||
|
||||
@@ -80,6 +80,7 @@ from synapse.storage.database import (
|
||||
)
|
||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
||||
from synapse.storage.roommember import RoomsForUserStateReset
|
||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
||||
from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
@@ -993,6 +994,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
available in the `current_state_delta_stream` table. To actually check for a
|
||||
state reset, you need to check if a membership still exists in the room.
|
||||
"""
|
||||
|
||||
assert from_key.topological is None
|
||||
assert to_key.topological is None
|
||||
|
||||
# Start by ruling out cases where a DB query is not necessary.
|
||||
if from_key == to_key:
|
||||
return []
|
||||
@@ -1138,6 +1143,203 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
if membership_change.room_id not in room_ids_to_exclude
|
||||
]
|
||||
|
||||
@trace
|
||||
async def get_sliding_sync_membership_changes(
|
||||
self,
|
||||
user_id: str,
|
||||
from_key: RoomStreamToken,
|
||||
to_key: RoomStreamToken,
|
||||
excluded_room_ids: Optional[AbstractSet[str]] = None,
|
||||
) -> Dict[str, RoomsForUserStateReset]:
|
||||
"""
|
||||
Fetch membership events that result in a meaningful membership change for a
|
||||
given user.
|
||||
|
||||
A meaningful membership changes is one where the `membership` value actually
|
||||
changes. This means memberships changes from `join` to `join` (like a display
|
||||
name change) will be filtered out since they result in no meaningful change.
|
||||
|
||||
Note: This function only works with "live" tokens with `stream_ordering` only.
|
||||
|
||||
We're looking for membership changes in the token range (> `from_key` and <=
|
||||
`to_key`).
|
||||
|
||||
Args:
|
||||
user_id: The user ID to fetch membership events for.
|
||||
from_key: The point in the stream to sync from (fetching events > this point).
|
||||
to_key: The token to fetch rooms up to (fetching events <= this point).
|
||||
excluded_room_ids: Optional list of room IDs to exclude from the results.
|
||||
|
||||
Returns:
|
||||
All meaningful membership changes to the current state in the token range.
|
||||
Events are sorted by `stream_ordering` ascending.
|
||||
|
||||
`event_id`/`sender` can be `None` when the server leaves a room (meaning
|
||||
everyone locally left) or a state reset which removed the person from the
|
||||
room. We can't tell the difference between the two cases with what's
|
||||
available in the `current_state_delta_stream` table. To actually check for a
|
||||
state reset, you need to check if a membership still exists in the room.
|
||||
"""
|
||||
|
||||
assert from_key.topological is None
|
||||
assert to_key.topological is None
|
||||
|
||||
# Start by ruling out cases where a DB query is not necessary.
|
||||
if from_key == to_key:
|
||||
return {}
|
||||
|
||||
if from_key:
|
||||
has_changed = self._membership_stream_cache.has_entity_changed(
|
||||
user_id, int(from_key.stream)
|
||||
)
|
||||
if not has_changed:
|
||||
return {}
|
||||
|
||||
room_ids_to_exclude: AbstractSet[str] = set()
|
||||
if excluded_room_ids is not None:
|
||||
room_ids_to_exclude = excluded_room_ids
|
||||
|
||||
def f(txn: LoggingTransaction) -> Dict[str, RoomsForUserStateReset]:
|
||||
# To handle tokens with a non-empty instance_map we fetch more
|
||||
# results than necessary and then filter down
|
||||
min_from_id = from_key.stream
|
||||
max_to_id = to_key.get_max_stream_pos()
|
||||
|
||||
# This query looks at membership changes in
|
||||
# `sliding_sync_membership_snapshots` which will not include users
|
||||
# that were state reset out of rooms; so we need to look for that
|
||||
# case in `current_state_delta_stream`.
|
||||
sql = """
|
||||
SELECT
|
||||
room_id,
|
||||
membership_event_id,
|
||||
event_instance_name,
|
||||
event_stream_ordering,
|
||||
membership,
|
||||
sender,
|
||||
prev_membership,
|
||||
room_version
|
||||
FROM
|
||||
(
|
||||
SELECT
|
||||
s.room_id,
|
||||
s.membership_event_id,
|
||||
s.event_instance_name,
|
||||
s.event_stream_ordering,
|
||||
s.membership,
|
||||
s.sender,
|
||||
m_prev.membership AS prev_membership
|
||||
FROM sliding_sync_membership_snapshots as s
|
||||
LEFT JOIN event_edges AS e ON e.event_id = s.membership_event_id
|
||||
LEFT JOIN room_memberships AS m_prev ON m_prev.event_id = e.prev_event_id
|
||||
WHERE s.user_id = ?
|
||||
|
||||
UNION ALL
|
||||
|
||||
SELECT
|
||||
s.room_id,
|
||||
e.event_id,
|
||||
s.instance_name,
|
||||
s.stream_id,
|
||||
m.membership,
|
||||
e.sender,
|
||||
m_prev.membership AS prev_membership
|
||||
FROM current_state_delta_stream AS s
|
||||
LEFT JOIN events AS e ON e.event_id = s.event_id
|
||||
LEFT JOIN room_memberships AS m ON m.event_id = s.event_id
|
||||
LEFT JOIN room_memberships AS m_prev ON m_prev.event_id = s.prev_event_id
|
||||
WHERE
|
||||
s.type = ?
|
||||
AND s.state_key = ?
|
||||
) AS c
|
||||
INNER JOIN rooms USING (room_id)
|
||||
WHERE event_stream_ordering > ? AND event_stream_ordering <= ?
|
||||
ORDER BY event_stream_ordering ASC
|
||||
"""
|
||||
|
||||
txn.execute(
|
||||
sql,
|
||||
(user_id, EventTypes.Member, user_id, min_from_id, max_to_id),
|
||||
)
|
||||
|
||||
membership_changes: Dict[str, RoomsForUserStateReset] = {}
|
||||
for (
|
||||
room_id,
|
||||
membership_event_id,
|
||||
event_instance_name,
|
||||
event_stream_ordering,
|
||||
membership,
|
||||
sender,
|
||||
prev_membership,
|
||||
room_version_id,
|
||||
) in txn:
|
||||
assert room_id is not None
|
||||
assert event_stream_ordering is not None
|
||||
|
||||
if room_id in room_ids_to_exclude:
|
||||
continue
|
||||
|
||||
if _filter_results_by_stream(
|
||||
from_key,
|
||||
to_key,
|
||||
event_instance_name,
|
||||
event_stream_ordering,
|
||||
):
|
||||
# When the server leaves a room, it will insert new rows into the
|
||||
# `current_state_delta_stream` table with `event_id = null` for all
|
||||
# current state. This means we might already have a row for the
|
||||
# leave event and then another for the same leave where the
|
||||
# `event_id=null` but the `prev_event_id` is pointing back at the
|
||||
# earlier leave event. We don't want to report the leave, if we
|
||||
# already have a leave event.
|
||||
if (
|
||||
membership_event_id is None
|
||||
and prev_membership == Membership.LEAVE
|
||||
):
|
||||
continue
|
||||
|
||||
if membership_event_id is None and room_id in membership_changes:
|
||||
# SUSPICIOUS: if we join a room and get state reset out of it
|
||||
# in the same queried window,
|
||||
# won't this ignore the 'state reset out of it' part?
|
||||
continue
|
||||
|
||||
# When `s.event_id = null`, we won't be able to get respective
|
||||
# `room_membership` but can assume the user has left the room
|
||||
# because this only happens when the server leaves a room
|
||||
# (meaning everyone locally left) or a state reset which removed
|
||||
# the person from the room.
|
||||
membership = (
|
||||
membership if membership is not None else Membership.LEAVE
|
||||
)
|
||||
|
||||
if membership == prev_membership:
|
||||
# If `membership` and `prev_membership` are the same then this
|
||||
# is not a meaningful change so we can skip it.
|
||||
# An example of this happening is when the user changes their display name.
|
||||
continue
|
||||
|
||||
membership_change = RoomsForUserStateReset(
|
||||
room_id=room_id,
|
||||
sender=sender,
|
||||
membership=membership,
|
||||
event_id=membership_event_id,
|
||||
event_pos=PersistedEventPosition(
|
||||
event_instance_name, event_stream_ordering
|
||||
),
|
||||
room_version_id=room_version_id,
|
||||
)
|
||||
|
||||
membership_changes[room_id] = membership_change
|
||||
|
||||
return membership_changes
|
||||
|
||||
membership_changes = await self.db_pool.runInteraction(
|
||||
"get_sliding_sync_membership_changes", f
|
||||
)
|
||||
|
||||
return membership_changes
|
||||
|
||||
@cancellable
|
||||
async def get_membership_changes_for_user(
|
||||
self,
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2025 New Vector, Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
-- So we can fetch all rooms for a given user sorted by stream order
|
||||
DROP INDEX IF EXISTS sliding_sync_membership_snapshots_user_id;
|
||||
CREATE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_user_id ON sliding_sync_membership_snapshots(user_id, event_stream_ordering);
|
||||
@@ -594,6 +594,12 @@ class ComputeInterestedRoomsTestCase(SlidingSyncBase):
|
||||
the correct list of rooms IDs.
|
||||
"""
|
||||
|
||||
# FIXME: We should refactor these tests to run against `compute_interested_rooms(...)`
|
||||
# instead of just `get_room_membership_for_user_at_to_token(...)` which is only used
|
||||
# in the fallback path (`_compute_interested_rooms_fallback(...)`). These scenarios do
|
||||
# well to stress that logic and we shouldn't remove them just because we're removing
|
||||
# the fallback path (tracked by https://github.com/element-hq/synapse/issues/17623).
|
||||
|
||||
servlets = [
|
||||
admin.register_servlets,
|
||||
knock.register_servlets,
|
||||
@@ -2976,6 +2982,12 @@ class ComputeInterestedRoomsShardTestCase(
|
||||
sharded event stream_writers enabled
|
||||
"""
|
||||
|
||||
# FIXME: We should refactor these tests to run against `compute_interested_rooms(...)`
|
||||
# instead of just `get_room_membership_for_user_at_to_token(...)` which is only used
|
||||
# in the fallback path (`_compute_interested_rooms_fallback(...)`). These scenarios do
|
||||
# well to stress that logic and we shouldn't remove them just because we're removing
|
||||
# the fallback path (tracked by https://github.com/element-hq/synapse/issues/17623).
|
||||
|
||||
servlets = [
|
||||
admin.register_servlets_for_client_rest_resource,
|
||||
room.register_servlets,
|
||||
|
||||
@@ -790,6 +790,64 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
||||
exact=True,
|
||||
)
|
||||
|
||||
def test_reject_remote_invite(self) -> None:
|
||||
"""Test that rejecting a remote invite comes down incremental sync"""
|
||||
|
||||
user_id = self.register_user("user1", "pass")
|
||||
user_tok = self.login(user_id, "pass")
|
||||
|
||||
# Create a remote room invite (out-of-band membership)
|
||||
room_id = "!room:remote.server"
|
||||
self._create_remote_invite_room_for_user(user_id, None, room_id)
|
||||
|
||||
# Make the Sliding Sync request
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [(EventTypes.Member, StateValues.ME)],
|
||||
"timeline_limit": 3,
|
||||
}
|
||||
}
|
||||
}
|
||||
response_body, from_token = self.do_sync(sync_body, tok=user_tok)
|
||||
# We should see the room (like normal)
|
||||
self.assertIncludes(
|
||||
set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
|
||||
{room_id},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Reject the remote room invite
|
||||
self.helper.leave(room_id, user_id, tok=user_tok)
|
||||
|
||||
# Sync again after rejecting the invite
|
||||
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user_tok)
|
||||
|
||||
# The fix to add the leave event to incremental sync when rejecting a remote
|
||||
# invite relies on the new tables to work.
|
||||
if self.use_new_tables:
|
||||
# We should see the newly_left room
|
||||
self.assertIncludes(
|
||||
set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
|
||||
{room_id},
|
||||
exact=True,
|
||||
)
|
||||
# We should see the leave state for the room so clients don't end up with stuck
|
||||
# invites
|
||||
self.assertIncludes(
|
||||
{
|
||||
(
|
||||
state["type"],
|
||||
state["state_key"],
|
||||
state["content"].get("membership"),
|
||||
)
|
||||
for state in response_body["rooms"][room_id]["required_state"]
|
||||
},
|
||||
{(EventTypes.Member, user_id, Membership.LEAVE)},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
def test_ignored_user_invites_initial_sync(self) -> None:
|
||||
"""
|
||||
Make sure we ignore invites if they are from one of the `m.ignored_user_list` on
|
||||
|
||||
Reference in New Issue
Block a user