Compare commits

...

23 Commits

Author SHA1 Message Date
Eric Eastwood
9cd2098c50 Fixup reversed filter logic 2025-05-06 16:22:28 -05:00
Eric Eastwood
d6eb04a911 Fixup function 2025-05-06 16:12:38 -05:00
Eric Eastwood
dd4104cabe Fix cache messing up our self-leave after to_token fetching
See https://github.com/element-hq/synapse/pull/18399#discussion_r2076177350
2025-05-06 15:59:03 -05:00
Eric Eastwood
addb43c66b Better changelog 2025-05-06 14:51:21 -05:00
Eric Eastwood
2e520f530c Remove debug logs 2025-05-06 14:32:12 -05:00
Eric Eastwood
5fadd6169e Align more tests to use assertIncludes 2025-05-06 14:30:42 -05:00
Eric Eastwood
59b9cffc50 Fix sharded event persisting test case 2025-05-06 14:27:00 -05:00
Eric Eastwood
94efd8b9ff Fix tests 2025-05-06 14:16:32 -05:00
Eric Eastwood
2e2b8bf36d Fix changes after token showing up in new path 2025-05-06 14:07:29 -05:00
Eric Eastwood
e4b9d01b4c Refactor to look at room ID's in actual list 2025-05-06 13:39:17 -05:00
Eric Eastwood
2fe8e355ce Fix test in fallback path (more loose criteria) 2025-05-06 13:19:18 -05:00
Eric Eastwood
4ad96716a8 Align other test with real state reset 2025-05-06 13:19:18 -05:00
Eric Eastwood
235a52eb9d Make state reset test more real and passes with new Sliding Sync path 2025-05-06 13:19:18 -05:00
Eric Eastwood
6c4e8779fd Add better notes on how why we do this specific logic 2025-05-06 13:19:18 -05:00
Eric Eastwood
a980e10445 Fix missing self-leave rooms when looking at token range before the leave 2025-05-06 13:19:13 -05:00
Eric Eastwood
1794c552ca Revert "Remove extra copies"
This reverts commit d2a4179960e266dc35a06e28c08015570c9a4b21.
2025-05-06 13:18:37 -05:00
Eric Eastwood
1a046bf179 Remove extra copies 2025-05-06 13:18:37 -05:00
Eric Eastwood
1b4eb2bfa2 Add extra test for not newly_joined or newly_left display name change 2025-05-06 13:18:37 -05:00
Devon Hudson
02d76576b3 Merge branch 'develop' into devon/ss_test_refactor 2025-05-06 15:45:32 +00:00
Devon Hudson
de80574391 Remove leave membership filter when getting rooms for user 2025-05-06 09:45:03 -06:00
Devon Hudson
a434892773 Rename test class 2025-05-06 09:41:13 -06:00
Devon Hudson
cd4520ed5f Add changelog entry 2025-05-05 15:57:14 -06:00
Devon Hudson
92b53e4f8c Convert tests to use compute_interested_rooms 2025-05-05 15:54:57 -06:00
7 changed files with 1240 additions and 439 deletions

1
changelog.d/18399.misc Normal file
View File

@@ -0,0 +1 @@
Refactor [MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) Simplified Sliding Sync room list tests to cover both new and fallback logic paths.

View File

@@ -244,14 +244,47 @@ class SlidingSyncRoomLists:
# Note: this won't include rooms the user has left themselves. We add back
# `newly_left` rooms below. This is more efficient than fetching all rooms and
# then filtering out the old left rooms.
room_membership_for_user_map = await self.store.get_sliding_sync_rooms_for_user(
user_id
room_membership_for_user_map = (
await self.store.get_sliding_sync_rooms_for_user_from_membership_snapshots(
user_id
)
)
# To play nice with the rewind logic below, we need to go fetch the rooms the
# user has left themselves but only if it changed after the `to_token`.
#
# If a leave happens *after* the token range, we may have still been joined (or
# any non-self-leave which is relevant to sync) to the room before so we need to
# include it in the list of potentially relevant rooms and apply our rewind
# logic (outside of this function) to see if it's actually relevant.
#
# We do this separately from
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` as those results
# are cached and the `to_token` isn't very cache friendly (people are constantly
# requesting with new tokens) so we separate it out here.
self_leave_room_membership_for_user_map = (
await self.store.get_sliding_sync_self_leave_rooms_after_to_token(
user_id, to_token
)
)
if self_leave_room_membership_for_user_map:
# FIXME: It would be nice to avoid this copy but since
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
# can't return a mutable value like a `dict`. We make the copy to get a
# mutable dict that we can change. We try to only make a copy when necessary
# (if we actually need to change something) as in most cases, the logic
# doesn't need to run.
room_membership_for_user_map = dict(room_membership_for_user_map)
room_membership_for_user_map.update(self_leave_room_membership_for_user_map)
# Remove invites from ignored users
ignored_users = await self.store.ignored_users(user_id)
if ignored_users:
# TODO: It would be nice to avoid these copies
# FIXME: It would be nice to avoid this copy but since
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
# can't return a mutable value like a `dict`. We make the copy to get a
# mutable dict that we can change. We try to only make a copy when necessary
# (if we actually need to change something) as in most cases, the logic
# doesn't need to run.
room_membership_for_user_map = dict(room_membership_for_user_map)
# Make a copy so we don't run into an error: `dictionary changed size during
# iteration`, when we remove items
@@ -263,11 +296,23 @@ class SlidingSyncRoomLists:
):
room_membership_for_user_map.pop(room_id, None)
(
newly_joined_room_ids,
newly_left_room_map,
) = await self._get_newly_joined_and_left_rooms(
user_id, from_token=from_token, to_token=to_token
)
changes = await self._get_rewind_changes_to_current_membership_to_token(
sync_config.user, room_membership_for_user_map, to_token=to_token
)
if changes:
# TODO: It would be nice to avoid these copies
# FIXME: It would be nice to avoid this copy but since
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
# can't return a mutable value like a `dict`. We make the copy to get a
# mutable dict that we can change. We try to only make a copy when necessary
# (if we actually need to change something) as in most cases, the logic
# doesn't need to run.
room_membership_for_user_map = dict(room_membership_for_user_map)
for room_id, change in changes.items():
if change is None:
@@ -278,7 +323,7 @@ class SlidingSyncRoomLists:
existing_room = room_membership_for_user_map.get(room_id)
if existing_room is not None:
# Update room membership events to the point in time of the `to_token`
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
room_for_user = RoomsForUserSlidingSync(
room_id=room_id,
sender=change.sender,
membership=change.membership,
@@ -290,18 +335,18 @@ class SlidingSyncRoomLists:
room_type=existing_room.room_type,
is_encrypted=existing_room.is_encrypted,
)
(
newly_joined_room_ids,
newly_left_room_map,
) = await self._get_newly_joined_and_left_rooms(
user_id, from_token=from_token, to_token=to_token
)
dm_room_ids = await self._get_dm_rooms_for_user(user_id)
if filter_membership_for_sync(
user_id=user_id,
room_membership_for_user=room_for_user,
newly_left=room_id in newly_left_room_map,
):
room_membership_for_user_map[room_id] = room_for_user
else:
room_membership_for_user_map.pop(room_id, None)
# Add back `newly_left` rooms (rooms left in the from -> to token range).
#
# We do this because `get_sliding_sync_rooms_for_user(...)` doesn't include
# We do this because `get_sliding_sync_rooms_for_user_from_membership_snapshots(...)` doesn't include
# rooms that the user left themselves as it's more efficient to add them back
# here than to fetch all rooms and then filter out the old left rooms. The user
# only leaves a room once in a blue moon so this barely needs to run.
@@ -310,7 +355,12 @@ class SlidingSyncRoomLists:
newly_left_room_map.keys() - room_membership_for_user_map.keys()
)
if missing_newly_left_rooms:
# TODO: It would be nice to avoid these copies
# FIXME: It would be nice to avoid this copy but since
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
# can't return a mutable value like a `dict`. We make the copy to get a
# mutable dict that we can change. We try to only make a copy when necessary
# (if we actually need to change something) as in most cases, the logic
# doesn't need to run.
room_membership_for_user_map = dict(room_membership_for_user_map)
for room_id in missing_newly_left_rooms:
newly_left_room_for_user = newly_left_room_map[room_id]
@@ -327,14 +377,21 @@ class SlidingSyncRoomLists:
# If the membership exists, it's just a normal user left the room on
# their own
if newly_left_room_for_user_sliding_sync is not None:
room_membership_for_user_map[room_id] = (
newly_left_room_for_user_sliding_sync
)
if filter_membership_for_sync(
user_id=user_id,
room_membership_for_user=newly_left_room_for_user_sliding_sync,
newly_left=room_id in newly_left_room_map,
):
room_membership_for_user_map[room_id] = (
newly_left_room_for_user_sliding_sync
)
else:
room_membership_for_user_map.pop(room_id, None)
change = changes.get(room_id)
if change is not None:
# Update room membership events to the point in time of the `to_token`
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
room_for_user = RoomsForUserSlidingSync(
room_id=room_id,
sender=change.sender,
membership=change.membership,
@@ -346,6 +403,14 @@ class SlidingSyncRoomLists:
room_type=newly_left_room_for_user_sliding_sync.room_type,
is_encrypted=newly_left_room_for_user_sliding_sync.is_encrypted,
)
if filter_membership_for_sync(
user_id=user_id,
room_membership_for_user=room_for_user,
newly_left=room_id in newly_left_room_map,
):
room_membership_for_user_map[room_id] = room_for_user
else:
room_membership_for_user_map.pop(room_id, None)
# If we are `newly_left` from the room but can't find any membership,
# then we have been "state reset" out of the room
@@ -367,7 +432,7 @@ class SlidingSyncRoomLists:
newly_left_room_for_user.event_pos.to_room_stream_token(),
)
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
room_for_user = RoomsForUserSlidingSync(
room_id=room_id,
sender=newly_left_room_for_user.sender,
membership=newly_left_room_for_user.membership,
@@ -378,6 +443,16 @@ class SlidingSyncRoomLists:
room_type=room_type,
is_encrypted=is_encrypted,
)
if filter_membership_for_sync(
user_id=user_id,
room_membership_for_user=room_for_user,
newly_left=room_id in newly_left_room_map,
):
room_membership_for_user_map[room_id] = room_for_user
else:
room_membership_for_user_map.pop(room_id, None)
dm_room_ids = await self._get_dm_rooms_for_user(user_id)
if sync_config.lists:
sync_room_map = room_membership_for_user_map
@@ -493,7 +568,12 @@ class SlidingSyncRoomLists:
if sync_config.room_subscriptions:
with start_active_span("assemble_room_subscriptions"):
# TODO: It would be nice to avoid these copies
# FIXME: It would be nice to avoid this copy but since
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
# can't return a mutable value like a `dict`. We make the copy to get a
# mutable dict that we can change. We try to only make a copy when necessary
# (if we actually need to change something) as in most cases, the logic
# doesn't need to run.
room_membership_for_user_map = dict(room_membership_for_user_map)
# Find which rooms are partially stated and may need to be filtered out

View File

@@ -130,7 +130,7 @@ class SQLBaseStore(metaclass=ABCMeta):
"_get_rooms_for_local_user_where_membership_is_inner", (user_id,)
)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user", (user_id,)
"get_sliding_sync_rooms_for_user_from_membership_snapshots", (user_id,)
)
# Purge other caches based on room state.
@@ -138,7 +138,9 @@ class SQLBaseStore(metaclass=ABCMeta):
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
self._attempt_to_invalidate_cache("get_room_type", (room_id,))
self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
)
def _invalidate_state_caches_all(self, room_id: str) -> None:
"""Invalidates caches that are based on the current state, but does
@@ -168,7 +170,9 @@ class SQLBaseStore(metaclass=ABCMeta):
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
self._attempt_to_invalidate_cache("get_room_type", (room_id,))
self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
)
def _attempt_to_invalidate_cache(
self, cache_name: str, key: Optional[Collection[Any]]

View File

@@ -307,7 +307,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
"get_rooms_for_user", (data.state_key,)
)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user", None
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
)
self._membership_stream_cache.entity_has_changed(data.state_key, token) # type: ignore[attr-defined]
elif data.type == EventTypes.RoomEncryption:
@@ -319,7 +319,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
if (data.type, data.state_key) in SLIDING_SYNC_RELEVANT_STATE_SET:
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user", None
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
)
elif row.type == EventsStreamAllStateRow.TypeId:
assert isinstance(data, EventsStreamAllStateRow)
@@ -330,7 +330,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_room_type", (data.room_id,))
self._attempt_to_invalidate_cache("get_room_encryption", (data.room_id,))
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
)
else:
raise Exception("Unknown events stream row type %s" % (row.type,))
@@ -394,7 +396,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
"_get_rooms_for_local_user_where_membership_is_inner", (state_key,)
)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user", (state_key,)
"get_sliding_sync_rooms_for_user_from_membership_snapshots",
(state_key,),
)
self._attempt_to_invalidate_cache(
@@ -413,7 +416,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
if (etype, state_key) in SLIDING_SYNC_RELEVANT_STATE_SET:
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
)
if relates_to:
self._attempt_to_invalidate_cache(
@@ -470,7 +475,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache(
"_get_rooms_for_local_user_where_membership_is_inner", None
)
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
)
self._attempt_to_invalidate_cache("did_forget", None)
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_references_for_event", None)
@@ -529,7 +536,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache(
"get_current_hosts_in_room_ordered", (room_id,)
)
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
)
self._attempt_to_invalidate_cache("did_forget", None)
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)

View File

@@ -53,6 +53,7 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.stream import _filter_results_by_stream
from synapse.storage.engines import Sqlite3Engine
from synapse.storage.roommember import (
MemberSummary,
@@ -65,6 +66,7 @@ from synapse.types import (
PersistedEventPosition,
StateMap,
StrCollection,
StreamToken,
get_domain_from_id,
)
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
@@ -1389,7 +1391,9 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
txn, self.get_forgotten_rooms_for_user, (user_id,)
)
self._invalidate_cache_and_stream(
txn, self.get_sliding_sync_rooms_for_user, (user_id,)
txn,
self.get_sliding_sync_rooms_for_user_from_membership_snapshots,
(user_id,),
)
await self.db_pool.runInteraction("forget_membership", f)
@@ -1421,25 +1425,30 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
)
@cached(iterable=True, max_entries=10000)
async def get_sliding_sync_rooms_for_user(
self,
user_id: str,
async def get_sliding_sync_rooms_for_user_from_membership_snapshots(
self, user_id: str
) -> Mapping[str, RoomsForUserSlidingSync]:
"""Get all the rooms for a user to handle a sliding sync request.
"""
Get all the rooms for a user to handle a sliding sync request from the
`sliding_sync_membership_snapshots` table. These will be current memberships and
need to be rewound to the token range.
Ignores forgotten rooms and rooms that the user has left themselves.
Args:
user_id: The user ID to get the rooms for.
Returns:
Map from room ID to membership info
"""
def get_sliding_sync_rooms_for_user_txn(
def _txn(
txn: LoggingTransaction,
) -> Dict[str, RoomsForUserSlidingSync]:
# XXX: If you use any new columns that can change (like from
# `sliding_sync_joined_rooms` or `forgotten`), make sure to bust the
# `get_sliding_sync_rooms_for_user` cache in the appropriate places (and add
# tests).
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` cache in the
# appropriate places (and add tests).
sql = """
SELECT m.room_id, m.sender, m.membership, m.membership_event_id,
r.room_version,
@@ -1455,6 +1464,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
AND (m.membership != 'leave' OR m.user_id != m.sender)
"""
txn.execute(sql, (user_id,))
return {
row[0]: RoomsForUserSlidingSync(
room_id=row[0],
@@ -1475,8 +1485,113 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
}
return await self.db_pool.runInteraction(
"get_sliding_sync_rooms_for_user",
get_sliding_sync_rooms_for_user_txn,
"get_sliding_sync_rooms_for_user_from_membership_snapshots",
_txn,
)
async def get_sliding_sync_self_leave_rooms_after_to_token(
self,
user_id: str,
to_token: StreamToken,
) -> Dict[str, RoomsForUserSlidingSync]:
"""
Get all the self-leave rooms for a user after the `to_token` (outside the token
range) that are potentially relevant[1] and needed to handle a sliding sync
request. The results are from the `sliding_sync_membership_snapshots` table and
will be current memberships and need to be rewound to the token range.
[1] If a leave happens after the token range, we may have still been joined (or
any non-self-leave which is relevant to sync) to the room before so we need to
include it in the list of potentially relevant rooms and apply
our rewind logic (outside of this function) to see if it's actually relevant.
This is basically a sister-function to
`get_sliding_sync_rooms_for_user_from_membership_snapshots`. We could
alternatively incorporate this logic into
`get_sliding_sync_rooms_for_user_from_membership_snapshots` but those results
are cached and the `to_token` isn't very cache friendly (people are constantly
requesting with new tokens) so we separate it out here.
Args:
user_id: The user ID to get the rooms for.
to_token: Any self-leave memberships after this position will be returned.
Returns:
Map from room ID to membership info
"""
# TODO: Potential to check
# `self._membership_stream_cache.has_entity_changed(...)` as an early-return
# shortcut.
def _txn(
txn: LoggingTransaction,
) -> Dict[str, RoomsForUserSlidingSync]:
sql = """
SELECT m.room_id, m.sender, m.membership, m.membership_event_id,
r.room_version,
m.event_instance_name, m.event_stream_ordering,
m.has_known_state,
m.room_type,
m.is_encrypted
FROM sliding_sync_membership_snapshots AS m
INNER JOIN rooms AS r USING (room_id)
WHERE user_id = ?
AND m.forgotten = 0
AND m.membership = 'leave'
AND m.user_id = m.sender
AND (m.event_stream_ordering > ?)
"""
# If a leave happens after the token range, we may have still been joined
# (or any non-self-leave which is relevant to sync) to the room before so we
# need to include it in the list of potentially relevant rooms and apply our
# rewind logic (outside of this function).
#
# To handle tokens with a non-empty instance_map we fetch more
# results than necessary and then filter down
min_to_token_position = to_token.room_key.stream
txn.execute(sql, (user_id, min_to_token_position))
# Map from room_id to membership info
room_membership_for_user_map: Dict[str, RoomsForUserSlidingSync] = {}
for row in txn:
room_for_user = RoomsForUserSlidingSync(
room_id=row[0],
sender=row[1],
membership=row[2],
event_id=row[3],
room_version_id=row[4],
event_pos=PersistedEventPosition(row[5], row[6]),
has_known_state=bool(row[7]),
room_type=row[8],
is_encrypted=bool(row[9]),
)
# We filter out unknown room versions proactively. They shouldn't go
# down sync and their metadata may be in a broken state (causing
# errors).
if row[4] not in KNOWN_ROOM_VERSIONS:
continue
# We only want to include the self-leave membership if it happened after
# the token range.
#
# Since the database pulls out more than necessary, we need to filter it
# down here.
if _filter_results_by_stream(
lower_token=None,
upper_token=to_token.room_key,
instance_name=room_for_user.event_pos.instance_name,
stream_ordering=room_for_user.event_pos.stream,
):
continue
room_membership_for_user_map[room_for_user.room_id] = room_for_user
return room_membership_for_user_map
return await self.db_pool.runInteraction(
"get_sliding_sync_self_leave_rooms_after_to_token",
_txn,
)
async def get_sliding_sync_room_for_user(

View File

@@ -453,6 +453,8 @@ def _filter_results_by_stream(
stream_ordering falls between the two tokens (taking a None
token to mean unbounded).
The token range is defined by > `lower_token` and <= `upper_token`.
Used to filter results from fetching events in the DB against the given
tokens. This is necessary to handle the case where the tokens include
position maps, which we handle by fetching more than necessary from the DB

File diff suppressed because it is too large Load Diff