mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Compare commits
23 Commits
kegan/send
...
devon/ss_t
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9cd2098c50 | ||
|
|
d6eb04a911 | ||
|
|
dd4104cabe | ||
|
|
addb43c66b | ||
|
|
2e520f530c | ||
|
|
5fadd6169e | ||
|
|
59b9cffc50 | ||
|
|
94efd8b9ff | ||
|
|
2e2b8bf36d | ||
|
|
e4b9d01b4c | ||
|
|
2fe8e355ce | ||
|
|
4ad96716a8 | ||
|
|
235a52eb9d | ||
|
|
6c4e8779fd | ||
|
|
a980e10445 | ||
|
|
1794c552ca | ||
|
|
1a046bf179 | ||
|
|
1b4eb2bfa2 | ||
|
|
02d76576b3 | ||
|
|
de80574391 | ||
|
|
a434892773 | ||
|
|
cd4520ed5f | ||
|
|
92b53e4f8c |
1
changelog.d/18399.misc
Normal file
1
changelog.d/18399.misc
Normal file
@@ -0,0 +1 @@
|
||||
Refactor [MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) Simplified Sliding Sync room list tests to cover both new and fallback logic paths.
|
||||
@@ -244,14 +244,47 @@ class SlidingSyncRoomLists:
|
||||
# Note: this won't include rooms the user has left themselves. We add back
|
||||
# `newly_left` rooms below. This is more efficient than fetching all rooms and
|
||||
# then filtering out the old left rooms.
|
||||
room_membership_for_user_map = await self.store.get_sliding_sync_rooms_for_user(
|
||||
user_id
|
||||
room_membership_for_user_map = (
|
||||
await self.store.get_sliding_sync_rooms_for_user_from_membership_snapshots(
|
||||
user_id
|
||||
)
|
||||
)
|
||||
# To play nice with the rewind logic below, we need to go fetch the rooms the
|
||||
# user has left themselves but only if it changed after the `to_token`.
|
||||
#
|
||||
# If a leave happens *after* the token range, we may have still been joined (or
|
||||
# any non-self-leave which is relevant to sync) to the room before so we need to
|
||||
# include it in the list of potentially relevant rooms and apply our rewind
|
||||
# logic (outside of this function) to see if it's actually relevant.
|
||||
#
|
||||
# We do this separately from
|
||||
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` as those results
|
||||
# are cached and the `to_token` isn't very cache friendly (people are constantly
|
||||
# requesting with new tokens) so we separate it out here.
|
||||
self_leave_room_membership_for_user_map = (
|
||||
await self.store.get_sliding_sync_self_leave_rooms_after_to_token(
|
||||
user_id, to_token
|
||||
)
|
||||
)
|
||||
if self_leave_room_membership_for_user_map:
|
||||
# FIXME: It would be nice to avoid this copy but since
|
||||
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
|
||||
# can't return a mutable value like a `dict`. We make the copy to get a
|
||||
# mutable dict that we can change. We try to only make a copy when necessary
|
||||
# (if we actually need to change something) as in most cases, the logic
|
||||
# doesn't need to run.
|
||||
room_membership_for_user_map = dict(room_membership_for_user_map)
|
||||
room_membership_for_user_map.update(self_leave_room_membership_for_user_map)
|
||||
|
||||
# Remove invites from ignored users
|
||||
ignored_users = await self.store.ignored_users(user_id)
|
||||
if ignored_users:
|
||||
# TODO: It would be nice to avoid these copies
|
||||
# FIXME: It would be nice to avoid this copy but since
|
||||
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
|
||||
# can't return a mutable value like a `dict`. We make the copy to get a
|
||||
# mutable dict that we can change. We try to only make a copy when necessary
|
||||
# (if we actually need to change something) as in most cases, the logic
|
||||
# doesn't need to run.
|
||||
room_membership_for_user_map = dict(room_membership_for_user_map)
|
||||
# Make a copy so we don't run into an error: `dictionary changed size during
|
||||
# iteration`, when we remove items
|
||||
@@ -263,11 +296,23 @@ class SlidingSyncRoomLists:
|
||||
):
|
||||
room_membership_for_user_map.pop(room_id, None)
|
||||
|
||||
(
|
||||
newly_joined_room_ids,
|
||||
newly_left_room_map,
|
||||
) = await self._get_newly_joined_and_left_rooms(
|
||||
user_id, from_token=from_token, to_token=to_token
|
||||
)
|
||||
|
||||
changes = await self._get_rewind_changes_to_current_membership_to_token(
|
||||
sync_config.user, room_membership_for_user_map, to_token=to_token
|
||||
)
|
||||
if changes:
|
||||
# TODO: It would be nice to avoid these copies
|
||||
# FIXME: It would be nice to avoid this copy but since
|
||||
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
|
||||
# can't return a mutable value like a `dict`. We make the copy to get a
|
||||
# mutable dict that we can change. We try to only make a copy when necessary
|
||||
# (if we actually need to change something) as in most cases, the logic
|
||||
# doesn't need to run.
|
||||
room_membership_for_user_map = dict(room_membership_for_user_map)
|
||||
for room_id, change in changes.items():
|
||||
if change is None:
|
||||
@@ -278,7 +323,7 @@ class SlidingSyncRoomLists:
|
||||
existing_room = room_membership_for_user_map.get(room_id)
|
||||
if existing_room is not None:
|
||||
# Update room membership events to the point in time of the `to_token`
|
||||
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
|
||||
room_for_user = RoomsForUserSlidingSync(
|
||||
room_id=room_id,
|
||||
sender=change.sender,
|
||||
membership=change.membership,
|
||||
@@ -290,18 +335,18 @@ class SlidingSyncRoomLists:
|
||||
room_type=existing_room.room_type,
|
||||
is_encrypted=existing_room.is_encrypted,
|
||||
)
|
||||
|
||||
(
|
||||
newly_joined_room_ids,
|
||||
newly_left_room_map,
|
||||
) = await self._get_newly_joined_and_left_rooms(
|
||||
user_id, from_token=from_token, to_token=to_token
|
||||
)
|
||||
dm_room_ids = await self._get_dm_rooms_for_user(user_id)
|
||||
if filter_membership_for_sync(
|
||||
user_id=user_id,
|
||||
room_membership_for_user=room_for_user,
|
||||
newly_left=room_id in newly_left_room_map,
|
||||
):
|
||||
room_membership_for_user_map[room_id] = room_for_user
|
||||
else:
|
||||
room_membership_for_user_map.pop(room_id, None)
|
||||
|
||||
# Add back `newly_left` rooms (rooms left in the from -> to token range).
|
||||
#
|
||||
# We do this because `get_sliding_sync_rooms_for_user(...)` doesn't include
|
||||
# We do this because `get_sliding_sync_rooms_for_user_from_membership_snapshots(...)` doesn't include
|
||||
# rooms that the user left themselves as it's more efficient to add them back
|
||||
# here than to fetch all rooms and then filter out the old left rooms. The user
|
||||
# only leaves a room once in a blue moon so this barely needs to run.
|
||||
@@ -310,7 +355,12 @@ class SlidingSyncRoomLists:
|
||||
newly_left_room_map.keys() - room_membership_for_user_map.keys()
|
||||
)
|
||||
if missing_newly_left_rooms:
|
||||
# TODO: It would be nice to avoid these copies
|
||||
# FIXME: It would be nice to avoid this copy but since
|
||||
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
|
||||
# can't return a mutable value like a `dict`. We make the copy to get a
|
||||
# mutable dict that we can change. We try to only make a copy when necessary
|
||||
# (if we actually need to change something) as in most cases, the logic
|
||||
# doesn't need to run.
|
||||
room_membership_for_user_map = dict(room_membership_for_user_map)
|
||||
for room_id in missing_newly_left_rooms:
|
||||
newly_left_room_for_user = newly_left_room_map[room_id]
|
||||
@@ -327,14 +377,21 @@ class SlidingSyncRoomLists:
|
||||
# If the membership exists, it's just a normal user left the room on
|
||||
# their own
|
||||
if newly_left_room_for_user_sliding_sync is not None:
|
||||
room_membership_for_user_map[room_id] = (
|
||||
newly_left_room_for_user_sliding_sync
|
||||
)
|
||||
if filter_membership_for_sync(
|
||||
user_id=user_id,
|
||||
room_membership_for_user=newly_left_room_for_user_sliding_sync,
|
||||
newly_left=room_id in newly_left_room_map,
|
||||
):
|
||||
room_membership_for_user_map[room_id] = (
|
||||
newly_left_room_for_user_sliding_sync
|
||||
)
|
||||
else:
|
||||
room_membership_for_user_map.pop(room_id, None)
|
||||
|
||||
change = changes.get(room_id)
|
||||
if change is not None:
|
||||
# Update room membership events to the point in time of the `to_token`
|
||||
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
|
||||
room_for_user = RoomsForUserSlidingSync(
|
||||
room_id=room_id,
|
||||
sender=change.sender,
|
||||
membership=change.membership,
|
||||
@@ -346,6 +403,14 @@ class SlidingSyncRoomLists:
|
||||
room_type=newly_left_room_for_user_sliding_sync.room_type,
|
||||
is_encrypted=newly_left_room_for_user_sliding_sync.is_encrypted,
|
||||
)
|
||||
if filter_membership_for_sync(
|
||||
user_id=user_id,
|
||||
room_membership_for_user=room_for_user,
|
||||
newly_left=room_id in newly_left_room_map,
|
||||
):
|
||||
room_membership_for_user_map[room_id] = room_for_user
|
||||
else:
|
||||
room_membership_for_user_map.pop(room_id, None)
|
||||
|
||||
# If we are `newly_left` from the room but can't find any membership,
|
||||
# then we have been "state reset" out of the room
|
||||
@@ -367,7 +432,7 @@ class SlidingSyncRoomLists:
|
||||
newly_left_room_for_user.event_pos.to_room_stream_token(),
|
||||
)
|
||||
|
||||
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
|
||||
room_for_user = RoomsForUserSlidingSync(
|
||||
room_id=room_id,
|
||||
sender=newly_left_room_for_user.sender,
|
||||
membership=newly_left_room_for_user.membership,
|
||||
@@ -378,6 +443,16 @@ class SlidingSyncRoomLists:
|
||||
room_type=room_type,
|
||||
is_encrypted=is_encrypted,
|
||||
)
|
||||
if filter_membership_for_sync(
|
||||
user_id=user_id,
|
||||
room_membership_for_user=room_for_user,
|
||||
newly_left=room_id in newly_left_room_map,
|
||||
):
|
||||
room_membership_for_user_map[room_id] = room_for_user
|
||||
else:
|
||||
room_membership_for_user_map.pop(room_id, None)
|
||||
|
||||
dm_room_ids = await self._get_dm_rooms_for_user(user_id)
|
||||
|
||||
if sync_config.lists:
|
||||
sync_room_map = room_membership_for_user_map
|
||||
@@ -493,7 +568,12 @@ class SlidingSyncRoomLists:
|
||||
|
||||
if sync_config.room_subscriptions:
|
||||
with start_active_span("assemble_room_subscriptions"):
|
||||
# TODO: It would be nice to avoid these copies
|
||||
# FIXME: It would be nice to avoid this copy but since
|
||||
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
|
||||
# can't return a mutable value like a `dict`. We make the copy to get a
|
||||
# mutable dict that we can change. We try to only make a copy when necessary
|
||||
# (if we actually need to change something) as in most cases, the logic
|
||||
# doesn't need to run.
|
||||
room_membership_for_user_map = dict(room_membership_for_user_map)
|
||||
|
||||
# Find which rooms are partially stated and may need to be filtered out
|
||||
|
||||
@@ -130,7 +130,7 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||
"_get_rooms_for_local_user_where_membership_is_inner", (user_id,)
|
||||
)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_sliding_sync_rooms_for_user", (user_id,)
|
||||
"get_sliding_sync_rooms_for_user_from_membership_snapshots", (user_id,)
|
||||
)
|
||||
|
||||
# Purge other caches based on room state.
|
||||
@@ -138,7 +138,9 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_room_type", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
|
||||
)
|
||||
|
||||
def _invalidate_state_caches_all(self, room_id: str) -> None:
|
||||
"""Invalidates caches that are based on the current state, but does
|
||||
@@ -168,7 +170,9 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_room_type", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
|
||||
)
|
||||
|
||||
def _attempt_to_invalidate_cache(
|
||||
self, cache_name: str, key: Optional[Collection[Any]]
|
||||
|
||||
@@ -307,7 +307,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
"get_rooms_for_user", (data.state_key,)
|
||||
)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_sliding_sync_rooms_for_user", None
|
||||
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
|
||||
)
|
||||
self._membership_stream_cache.entity_has_changed(data.state_key, token) # type: ignore[attr-defined]
|
||||
elif data.type == EventTypes.RoomEncryption:
|
||||
@@ -319,7 +319,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
|
||||
if (data.type, data.state_key) in SLIDING_SYNC_RELEVANT_STATE_SET:
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_sliding_sync_rooms_for_user", None
|
||||
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
|
||||
)
|
||||
elif row.type == EventsStreamAllStateRow.TypeId:
|
||||
assert isinstance(data, EventsStreamAllStateRow)
|
||||
@@ -330,7 +330,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache("get_room_type", (data.room_id,))
|
||||
self._attempt_to_invalidate_cache("get_room_encryption", (data.room_id,))
|
||||
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
|
||||
)
|
||||
else:
|
||||
raise Exception("Unknown events stream row type %s" % (row.type,))
|
||||
|
||||
@@ -394,7 +396,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
"_get_rooms_for_local_user_where_membership_is_inner", (state_key,)
|
||||
)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_sliding_sync_rooms_for_user", (state_key,)
|
||||
"get_sliding_sync_rooms_for_user_from_membership_snapshots",
|
||||
(state_key,),
|
||||
)
|
||||
|
||||
self._attempt_to_invalidate_cache(
|
||||
@@ -413,7 +416,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
|
||||
|
||||
if (etype, state_key) in SLIDING_SYNC_RELEVANT_STATE_SET:
|
||||
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
|
||||
)
|
||||
|
||||
if relates_to:
|
||||
self._attempt_to_invalidate_cache(
|
||||
@@ -470,7 +475,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
self._attempt_to_invalidate_cache(
|
||||
"_get_rooms_for_local_user_where_membership_is_inner", None
|
||||
)
|
||||
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
|
||||
)
|
||||
self._attempt_to_invalidate_cache("did_forget", None)
|
||||
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache("get_references_for_event", None)
|
||||
@@ -529,7 +536,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_current_hosts_in_room_ordered", (room_id,)
|
||||
)
|
||||
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
|
||||
)
|
||||
self._attempt_to_invalidate_cache("did_forget", None)
|
||||
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
|
||||
|
||||
@@ -53,6 +53,7 @@ from synapse.storage.database import (
|
||||
)
|
||||
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
|
||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.databases.main.stream import _filter_results_by_stream
|
||||
from synapse.storage.engines import Sqlite3Engine
|
||||
from synapse.storage.roommember import (
|
||||
MemberSummary,
|
||||
@@ -65,6 +66,7 @@ from synapse.types import (
|
||||
PersistedEventPosition,
|
||||
StateMap,
|
||||
StrCollection,
|
||||
StreamToken,
|
||||
get_domain_from_id,
|
||||
)
|
||||
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
|
||||
@@ -1389,7 +1391,9 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
txn, self.get_forgotten_rooms_for_user, (user_id,)
|
||||
)
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_sliding_sync_rooms_for_user, (user_id,)
|
||||
txn,
|
||||
self.get_sliding_sync_rooms_for_user_from_membership_snapshots,
|
||||
(user_id,),
|
||||
)
|
||||
|
||||
await self.db_pool.runInteraction("forget_membership", f)
|
||||
@@ -1421,25 +1425,30 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
)
|
||||
|
||||
@cached(iterable=True, max_entries=10000)
|
||||
async def get_sliding_sync_rooms_for_user(
|
||||
self,
|
||||
user_id: str,
|
||||
async def get_sliding_sync_rooms_for_user_from_membership_snapshots(
|
||||
self, user_id: str
|
||||
) -> Mapping[str, RoomsForUserSlidingSync]:
|
||||
"""Get all the rooms for a user to handle a sliding sync request.
|
||||
"""
|
||||
Get all the rooms for a user to handle a sliding sync request from the
|
||||
`sliding_sync_membership_snapshots` table. These will be current memberships and
|
||||
need to be rewound to the token range.
|
||||
|
||||
Ignores forgotten rooms and rooms that the user has left themselves.
|
||||
|
||||
Args:
|
||||
user_id: The user ID to get the rooms for.
|
||||
|
||||
Returns:
|
||||
Map from room ID to membership info
|
||||
"""
|
||||
|
||||
def get_sliding_sync_rooms_for_user_txn(
|
||||
def _txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Dict[str, RoomsForUserSlidingSync]:
|
||||
# XXX: If you use any new columns that can change (like from
|
||||
# `sliding_sync_joined_rooms` or `forgotten`), make sure to bust the
|
||||
# `get_sliding_sync_rooms_for_user` cache in the appropriate places (and add
|
||||
# tests).
|
||||
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` cache in the
|
||||
# appropriate places (and add tests).
|
||||
sql = """
|
||||
SELECT m.room_id, m.sender, m.membership, m.membership_event_id,
|
||||
r.room_version,
|
||||
@@ -1455,6 +1464,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
AND (m.membership != 'leave' OR m.user_id != m.sender)
|
||||
"""
|
||||
txn.execute(sql, (user_id,))
|
||||
|
||||
return {
|
||||
row[0]: RoomsForUserSlidingSync(
|
||||
room_id=row[0],
|
||||
@@ -1475,8 +1485,113 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
}
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_sliding_sync_rooms_for_user",
|
||||
get_sliding_sync_rooms_for_user_txn,
|
||||
"get_sliding_sync_rooms_for_user_from_membership_snapshots",
|
||||
_txn,
|
||||
)
|
||||
|
||||
async def get_sliding_sync_self_leave_rooms_after_to_token(
|
||||
self,
|
||||
user_id: str,
|
||||
to_token: StreamToken,
|
||||
) -> Dict[str, RoomsForUserSlidingSync]:
|
||||
"""
|
||||
Get all the self-leave rooms for a user after the `to_token` (outside the token
|
||||
range) that are potentially relevant[1] and needed to handle a sliding sync
|
||||
request. The results are from the `sliding_sync_membership_snapshots` table and
|
||||
will be current memberships and need to be rewound to the token range.
|
||||
|
||||
[1] If a leave happens after the token range, we may have still been joined (or
|
||||
any non-self-leave which is relevant to sync) to the room before so we need to
|
||||
include it in the list of potentially relevant rooms and apply
|
||||
our rewind logic (outside of this function) to see if it's actually relevant.
|
||||
|
||||
This is basically a sister-function to
|
||||
`get_sliding_sync_rooms_for_user_from_membership_snapshots`. We could
|
||||
alternatively incorporate this logic into
|
||||
`get_sliding_sync_rooms_for_user_from_membership_snapshots` but those results
|
||||
are cached and the `to_token` isn't very cache friendly (people are constantly
|
||||
requesting with new tokens) so we separate it out here.
|
||||
|
||||
Args:
|
||||
user_id: The user ID to get the rooms for.
|
||||
to_token: Any self-leave memberships after this position will be returned.
|
||||
|
||||
Returns:
|
||||
Map from room ID to membership info
|
||||
"""
|
||||
# TODO: Potential to check
|
||||
# `self._membership_stream_cache.has_entity_changed(...)` as an early-return
|
||||
# shortcut.
|
||||
|
||||
def _txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Dict[str, RoomsForUserSlidingSync]:
|
||||
sql = """
|
||||
SELECT m.room_id, m.sender, m.membership, m.membership_event_id,
|
||||
r.room_version,
|
||||
m.event_instance_name, m.event_stream_ordering,
|
||||
m.has_known_state,
|
||||
m.room_type,
|
||||
m.is_encrypted
|
||||
FROM sliding_sync_membership_snapshots AS m
|
||||
INNER JOIN rooms AS r USING (room_id)
|
||||
WHERE user_id = ?
|
||||
AND m.forgotten = 0
|
||||
AND m.membership = 'leave'
|
||||
AND m.user_id = m.sender
|
||||
AND (m.event_stream_ordering > ?)
|
||||
"""
|
||||
# If a leave happens after the token range, we may have still been joined
|
||||
# (or any non-self-leave which is relevant to sync) to the room before so we
|
||||
# need to include it in the list of potentially relevant rooms and apply our
|
||||
# rewind logic (outside of this function).
|
||||
#
|
||||
# To handle tokens with a non-empty instance_map we fetch more
|
||||
# results than necessary and then filter down
|
||||
min_to_token_position = to_token.room_key.stream
|
||||
txn.execute(sql, (user_id, min_to_token_position))
|
||||
|
||||
# Map from room_id to membership info
|
||||
room_membership_for_user_map: Dict[str, RoomsForUserSlidingSync] = {}
|
||||
for row in txn:
|
||||
room_for_user = RoomsForUserSlidingSync(
|
||||
room_id=row[0],
|
||||
sender=row[1],
|
||||
membership=row[2],
|
||||
event_id=row[3],
|
||||
room_version_id=row[4],
|
||||
event_pos=PersistedEventPosition(row[5], row[6]),
|
||||
has_known_state=bool(row[7]),
|
||||
room_type=row[8],
|
||||
is_encrypted=bool(row[9]),
|
||||
)
|
||||
|
||||
# We filter out unknown room versions proactively. They shouldn't go
|
||||
# down sync and their metadata may be in a broken state (causing
|
||||
# errors).
|
||||
if row[4] not in KNOWN_ROOM_VERSIONS:
|
||||
continue
|
||||
|
||||
# We only want to include the self-leave membership if it happened after
|
||||
# the token range.
|
||||
#
|
||||
# Since the database pulls out more than necessary, we need to filter it
|
||||
# down here.
|
||||
if _filter_results_by_stream(
|
||||
lower_token=None,
|
||||
upper_token=to_token.room_key,
|
||||
instance_name=room_for_user.event_pos.instance_name,
|
||||
stream_ordering=room_for_user.event_pos.stream,
|
||||
):
|
||||
continue
|
||||
|
||||
room_membership_for_user_map[room_for_user.room_id] = room_for_user
|
||||
|
||||
return room_membership_for_user_map
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_sliding_sync_self_leave_rooms_after_to_token",
|
||||
_txn,
|
||||
)
|
||||
|
||||
async def get_sliding_sync_room_for_user(
|
||||
|
||||
@@ -453,6 +453,8 @@ def _filter_results_by_stream(
|
||||
stream_ordering falls between the two tokens (taking a None
|
||||
token to mean unbounded).
|
||||
|
||||
The token range is defined by > `lower_token` and <= `upper_token`.
|
||||
|
||||
Used to filter results from fetching events in the DB against the given
|
||||
tokens. This is necessary to handle the case where the tokens include
|
||||
position maps, which we handle by fetching more than necessary from the DB
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user