mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-07 01:20:16 +00:00
Compare commits
9 Commits
madlittlem
...
erikj/acco
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2b2e37e580 | ||
|
|
7d5484ea0d | ||
|
|
65dc3aa5b8 | ||
|
|
48ab85f276 | ||
|
|
68c5cd8c0b | ||
|
|
aae9e912de | ||
|
|
1c1eaf7b5f | ||
|
|
20be70dae4 | ||
|
|
119b7527fb |
1
changelog.d/17695.bugfix
Normal file
1
changelog.d/17695.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix bug where room account data would not correctly be sent down sliding sync for old rooms.
|
||||
@@ -19,7 +19,6 @@ from typing import (
|
||||
AbstractSet,
|
||||
ChainMap,
|
||||
Dict,
|
||||
List,
|
||||
Mapping,
|
||||
MutableMapping,
|
||||
Optional,
|
||||
@@ -119,6 +118,8 @@ class SlidingSyncExtensionHandler:
|
||||
if sync_config.extensions.account_data is not None:
|
||||
account_data_response = await self.get_account_data_extension_response(
|
||||
sync_config=sync_config,
|
||||
previous_connection_state=previous_connection_state,
|
||||
new_connection_state=new_connection_state,
|
||||
actual_lists=actual_lists,
|
||||
actual_room_ids=actual_room_ids,
|
||||
account_data_request=sync_config.extensions.account_data,
|
||||
@@ -361,6 +362,8 @@ class SlidingSyncExtensionHandler:
|
||||
async def get_account_data_extension_response(
|
||||
self,
|
||||
sync_config: SlidingSyncConfig,
|
||||
previous_connection_state: "PerConnectionState",
|
||||
new_connection_state: "MutablePerConnectionState",
|
||||
actual_lists: Mapping[str, SlidingSyncResult.SlidingWindowList],
|
||||
actual_room_ids: Set[str],
|
||||
account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension,
|
||||
@@ -425,15 +428,7 @@ class SlidingSyncExtensionHandler:
|
||||
|
||||
# Fetch room account data
|
||||
#
|
||||
# List of -> Mapping from room_id to mapping of `type` to `content` of room
|
||||
# account data events.
|
||||
#
|
||||
# This is is a list so we can avoid making copies of immutable data and instead
|
||||
# just provide multiple maps that need to be combined. Normally, we could
|
||||
# reach for `ChainMap` in this scenario, but this is a nested map and accessing
|
||||
# the ChainMap by room_id won't combine the two maps for that room (we would
|
||||
# need a new `NestedChainMap` type class).
|
||||
account_data_by_room_maps: List[Mapping[str, Mapping[str, JsonMapping]]] = []
|
||||
account_data_by_room_map: MutableMapping[str, Mapping[str, JsonMapping]] = {}
|
||||
relevant_room_ids = self.find_relevant_room_ids_for_extension(
|
||||
requested_lists=account_data_request.lists,
|
||||
requested_room_ids=account_data_request.rooms,
|
||||
@@ -441,9 +436,43 @@ class SlidingSyncExtensionHandler:
|
||||
actual_room_ids=actual_room_ids,
|
||||
)
|
||||
if len(relevant_room_ids) > 0:
|
||||
# We need to handle the different cases depending on if we have sent
|
||||
# down account data previously or not, so we split the relevant
|
||||
# rooms up into different collections based on status.
|
||||
live_rooms = set()
|
||||
previously_rooms: Dict[str, int] = {}
|
||||
initial_rooms = set()
|
||||
|
||||
for room_id in relevant_room_ids:
|
||||
if not from_token:
|
||||
initial_rooms.add(room_id)
|
||||
continue
|
||||
|
||||
room_status = previous_connection_state.account_data.have_sent_room(
|
||||
room_id
|
||||
)
|
||||
if room_status.status == HaveSentRoomFlag.LIVE:
|
||||
live_rooms.add(room_id)
|
||||
elif room_status.status == HaveSentRoomFlag.PREVIOUSLY:
|
||||
assert room_status.last_token is not None
|
||||
previously_rooms[room_id] = room_status.last_token
|
||||
elif room_status.status == HaveSentRoomFlag.NEVER:
|
||||
initial_rooms.add(room_id)
|
||||
else:
|
||||
assert_never(room_status.status)
|
||||
|
||||
# We fetch all room account data since the from_token. This is so
|
||||
# that we can record which rooms have updates that haven't been sent
|
||||
# down.
|
||||
#
|
||||
# Mapping from room_id to mapping of `type` to `content` of room account
|
||||
# data events.
|
||||
all_updates_since_the_from_token: Mapping[
|
||||
str, Mapping[str, JsonMapping]
|
||||
] = {}
|
||||
if from_token is not None:
|
||||
# TODO: This should take into account the `from_token` and `to_token`
|
||||
account_data_by_room_map = (
|
||||
all_updates_since_the_from_token = (
|
||||
await self.store.get_updated_room_account_data_for_user(
|
||||
user_id, from_token.stream_token.account_data_key
|
||||
)
|
||||
@@ -456,58 +485,109 @@ class SlidingSyncExtensionHandler:
|
||||
user_id, from_token.stream_token.account_data_key
|
||||
)
|
||||
for room_id, tags in tags_by_room.items():
|
||||
account_data_by_room_map.setdefault(room_id, {})[
|
||||
all_updates_since_the_from_token.setdefault(room_id, {})[
|
||||
AccountDataTypes.TAG
|
||||
] = {"tags": tags}
|
||||
|
||||
account_data_by_room_maps.append(account_data_by_room_map)
|
||||
else:
|
||||
# TODO: This should take into account the `to_token`
|
||||
immutable_account_data_by_room_map = (
|
||||
await self.store.get_room_account_data_for_user(user_id)
|
||||
)
|
||||
account_data_by_room_maps.append(immutable_account_data_by_room_map)
|
||||
# For live rooms we just get the updates from `all_updates_since_the_from_token`
|
||||
if live_rooms:
|
||||
for room_id in all_updates_since_the_from_token.keys() & live_rooms:
|
||||
account_data_by_room_map[room_id] = (
|
||||
all_updates_since_the_from_token[room_id]
|
||||
)
|
||||
|
||||
# Add room tags
|
||||
#
|
||||
# TODO: This should take into account the `to_token`
|
||||
tags_by_room = await self.store.get_tags_for_user(user_id)
|
||||
account_data_by_room_maps.append(
|
||||
{
|
||||
room_id: {AccountDataTypes.TAG: {"tags": tags}}
|
||||
for room_id, tags in tags_by_room.items()
|
||||
}
|
||||
# For previously and initial rooms we query each room individually.
|
||||
if previously_rooms or initial_rooms:
|
||||
|
||||
async def handle_previously(room_id: str) -> None:
|
||||
# Either get updates or all account data in the room
|
||||
# depending on if the room state is PREVIOUSLY or NEVER.
|
||||
previous_token = previously_rooms.get(room_id)
|
||||
if previous_token is not None:
|
||||
room_account_data = await (
|
||||
self.store.get_updated_room_account_data_for_user_for_room(
|
||||
user_id=user_id,
|
||||
room_id=room_id,
|
||||
from_stream_id=previous_token,
|
||||
to_stream_id=to_token.account_data_key,
|
||||
)
|
||||
)
|
||||
|
||||
# Add room tags
|
||||
changed = await self.store.has_tags_changed_for_room(
|
||||
user_id=user_id,
|
||||
room_id=room_id,
|
||||
from_stream_id=previous_token,
|
||||
to_stream_id=to_token.account_data_key,
|
||||
)
|
||||
if changed:
|
||||
# XXX: Ideally, this should take into account the `to_token`
|
||||
# and return the set of tags at that time but we don't track
|
||||
# changes to tags so we just have to return all tags for the
|
||||
# room.
|
||||
immutable_tag_map = await self.store.get_tags_for_room(
|
||||
user_id, room_id
|
||||
)
|
||||
if immutable_tag_map:
|
||||
room_account_data[AccountDataTypes.TAG] = {
|
||||
"tags": immutable_tag_map
|
||||
}
|
||||
|
||||
# Only add an entry if there were any updates.
|
||||
if room_account_data:
|
||||
account_data_by_room_map[room_id] = room_account_data
|
||||
else:
|
||||
# TODO: This should take into account the `to_token`
|
||||
immutable_room_account_data = (
|
||||
await self.store.get_account_data_for_room(user_id, room_id)
|
||||
)
|
||||
|
||||
# Add room tags
|
||||
#
|
||||
# XXX: Ideally, this should take into account the `to_token`
|
||||
# and return the set of tags at that time but we don't track
|
||||
# changes to tags so we just have to return all tags for the
|
||||
# room.
|
||||
immutable_tag_map = await self.store.get_tags_for_room(
|
||||
user_id, room_id
|
||||
)
|
||||
|
||||
account_data_by_room_map[room_id] = ChainMap(
|
||||
{AccountDataTypes.TAG: {"tags": immutable_tag_map}}
|
||||
if immutable_tag_map
|
||||
else {},
|
||||
# Cast is safe because `ChainMap` only mutates the top-most map,
|
||||
# see https://github.com/python/typeshed/issues/8430
|
||||
cast(
|
||||
MutableMapping[str, JsonMapping],
|
||||
immutable_room_account_data,
|
||||
),
|
||||
)
|
||||
|
||||
# We handle these rooms concurrently to speed it up.
|
||||
await concurrently_execute(
|
||||
handle_previously,
|
||||
previously_rooms.keys() | initial_rooms,
|
||||
limit=20,
|
||||
)
|
||||
|
||||
# Filter down to the relevant rooms ... and combine the maps
|
||||
relevant_account_data_by_room_map: MutableMapping[
|
||||
str, Mapping[str, JsonMapping]
|
||||
] = {}
|
||||
for room_id in relevant_room_ids:
|
||||
# We want to avoid adding empty maps for relevant rooms that have no room
|
||||
# account data so do a quick check to see if it's in any of the maps.
|
||||
is_room_in_maps = False
|
||||
for room_map in account_data_by_room_maps:
|
||||
if room_id in room_map:
|
||||
is_room_in_maps = True
|
||||
break
|
||||
# Now record which rooms are now up to data, and which rooms have
|
||||
# pending updates to send.
|
||||
new_connection_state.account_data.record_sent_rooms(relevant_room_ids)
|
||||
missing_updates = (
|
||||
all_updates_since_the_from_token.keys() - relevant_room_ids
|
||||
)
|
||||
if missing_updates:
|
||||
# If we have missing updates then we must have had a from_token.
|
||||
assert from_token is not None
|
||||
|
||||
# If we found the room in any of the maps, combine the maps for that room
|
||||
if is_room_in_maps:
|
||||
relevant_account_data_by_room_map[room_id] = ChainMap(
|
||||
{},
|
||||
*(
|
||||
# Cast is safe because `ChainMap` only mutates the top-most map,
|
||||
# see https://github.com/python/typeshed/issues/8430
|
||||
cast(MutableMapping[str, JsonMapping], room_map[room_id])
|
||||
for room_map in account_data_by_room_maps
|
||||
if room_map.get(room_id)
|
||||
),
|
||||
new_connection_state.account_data.record_unsent_rooms(
|
||||
missing_updates, from_token.stream_token.account_data_key
|
||||
)
|
||||
|
||||
return SlidingSyncResult.Extensions.AccountDataExtension(
|
||||
global_account_data_map=global_account_data_map,
|
||||
account_data_by_room_map=relevant_account_data_by_room_map,
|
||||
account_data_by_room_map=account_data_by_room_map,
|
||||
)
|
||||
|
||||
@trace
|
||||
|
||||
@@ -467,6 +467,56 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
|
||||
get_updated_room_account_data_for_user_txn,
|
||||
)
|
||||
|
||||
async def get_updated_room_account_data_for_user_for_room(
|
||||
self,
|
||||
# Since there are multiple arguments with the same type, force keyword arguments
|
||||
# so people don't accidentally swap the order
|
||||
*,
|
||||
user_id: str,
|
||||
room_id: str,
|
||||
from_stream_id: int,
|
||||
to_stream_id: int,
|
||||
) -> Dict[str, JsonMapping]:
|
||||
"""Get the room account_data that's changed for a user in a room.
|
||||
|
||||
(> `from_stream_id` and <= `to_stream_id`)
|
||||
|
||||
Args:
|
||||
user_id: The user to get the account_data for.
|
||||
room_id: The room to check
|
||||
from_stream_id: The point in the stream to fetch from
|
||||
to_stream_id: The point in the stream to fetch to
|
||||
|
||||
Returns:
|
||||
A dict of the room account data.
|
||||
"""
|
||||
|
||||
def get_updated_room_account_data_for_user_for_room_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Dict[str, JsonMapping]:
|
||||
sql = """
|
||||
SELECT account_data_type, content FROM room_account_data
|
||||
WHERE user_id = ? AND room_id = ? AND stream_id > ? AND stream_id <= ?
|
||||
"""
|
||||
txn.execute(sql, (user_id, room_id, from_stream_id, to_stream_id))
|
||||
|
||||
room_account_data: Dict[str, JsonMapping] = {}
|
||||
for row in txn:
|
||||
room_account_data[row[0]] = db_to_json(row[1])
|
||||
|
||||
return room_account_data
|
||||
|
||||
changed = self._account_data_stream_cache.has_entity_changed(
|
||||
user_id, int(from_stream_id)
|
||||
)
|
||||
if not changed:
|
||||
return {}
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_updated_room_account_data_for_user_for_room",
|
||||
get_updated_room_account_data_for_user_for_room_txn,
|
||||
)
|
||||
|
||||
@cached(max_entries=5000, iterable=True)
|
||||
async def ignored_by(self, user_id: str) -> FrozenSet[str]:
|
||||
"""
|
||||
|
||||
@@ -267,6 +267,15 @@ class SlidingSyncStore(SQLBaseStore):
|
||||
(have_sent_room.status.value, have_sent_room.last_token)
|
||||
)
|
||||
|
||||
for (
|
||||
room_id,
|
||||
have_sent_room,
|
||||
) in per_connection_state.account_data._statuses.items():
|
||||
key_values.append((connection_position, "account_data", room_id))
|
||||
value_values.append(
|
||||
(have_sent_room.status.value, have_sent_room.last_token)
|
||||
)
|
||||
|
||||
self.db_pool.simple_upsert_many_txn(
|
||||
txn,
|
||||
table="sliding_sync_connection_streams",
|
||||
@@ -407,6 +416,7 @@ class SlidingSyncStore(SQLBaseStore):
|
||||
# Now look up the per-room stream data.
|
||||
rooms: Dict[str, HaveSentRoom[str]] = {}
|
||||
receipts: Dict[str, HaveSentRoom[str]] = {}
|
||||
account_data: Dict[str, HaveSentRoom[str]] = {}
|
||||
|
||||
receipt_rows = self.db_pool.simple_select_list_txn(
|
||||
txn,
|
||||
@@ -427,6 +437,8 @@ class SlidingSyncStore(SQLBaseStore):
|
||||
rooms[room_id] = have_sent_room
|
||||
elif stream == "receipts":
|
||||
receipts[room_id] = have_sent_room
|
||||
elif stream == "account_data":
|
||||
account_data[room_id] = have_sent_room
|
||||
else:
|
||||
# For forwards compatibility we ignore unknown streams, as in
|
||||
# future we want to be able to easily add more stream types.
|
||||
@@ -435,6 +447,7 @@ class SlidingSyncStore(SQLBaseStore):
|
||||
return PerConnectionStateDB(
|
||||
rooms=RoomStatusMap(rooms),
|
||||
receipts=RoomStatusMap(receipts),
|
||||
account_data=RoomStatusMap(account_data),
|
||||
room_configs=room_configs,
|
||||
)
|
||||
|
||||
@@ -452,6 +465,7 @@ class PerConnectionStateDB:
|
||||
|
||||
rooms: "RoomStatusMap[str]"
|
||||
receipts: "RoomStatusMap[str]"
|
||||
account_data: "RoomStatusMap[str]"
|
||||
|
||||
room_configs: Mapping[str, "RoomSyncConfig"]
|
||||
|
||||
@@ -484,10 +498,21 @@ class PerConnectionStateDB:
|
||||
for room_id, status in per_connection_state.receipts.get_updates().items()
|
||||
}
|
||||
|
||||
account_data = {
|
||||
room_id: HaveSentRoom(
|
||||
status=status.status,
|
||||
last_token=(
|
||||
str(status.last_token) if status.last_token is not None else None
|
||||
),
|
||||
)
|
||||
for room_id, status in per_connection_state.account_data.get_updates().items()
|
||||
}
|
||||
|
||||
log_kv(
|
||||
{
|
||||
"rooms": rooms,
|
||||
"receipts": receipts,
|
||||
"account_data": account_data,
|
||||
"room_configs": per_connection_state.room_configs.maps[0],
|
||||
}
|
||||
)
|
||||
@@ -495,6 +520,7 @@ class PerConnectionStateDB:
|
||||
return PerConnectionStateDB(
|
||||
rooms=RoomStatusMap(rooms),
|
||||
receipts=RoomStatusMap(receipts),
|
||||
account_data=RoomStatusMap(account_data),
|
||||
room_configs=per_connection_state.room_configs.maps[0],
|
||||
)
|
||||
|
||||
@@ -524,8 +550,19 @@ class PerConnectionStateDB:
|
||||
for room_id, status in self.receipts._statuses.items()
|
||||
}
|
||||
|
||||
account_data = {
|
||||
room_id: HaveSentRoom(
|
||||
status=status.status,
|
||||
last_token=(
|
||||
int(status.last_token) if status.last_token is not None else None
|
||||
),
|
||||
)
|
||||
for room_id, status in self.account_data._statuses.items()
|
||||
}
|
||||
|
||||
return PerConnectionState(
|
||||
rooms=RoomStatusMap(rooms),
|
||||
receipts=RoomStatusMap(receipts),
|
||||
account_data=RoomStatusMap(account_data),
|
||||
room_configs=self.room_configs,
|
||||
)
|
||||
|
||||
@@ -158,6 +158,52 @@ class TagsWorkerStore(AccountDataWorkerStore):
|
||||
|
||||
return results
|
||||
|
||||
async def has_tags_changed_for_room(
|
||||
self,
|
||||
# Since there are multiple arguments with the same type, force keyword arguments
|
||||
# so people don't accidentally swap the order
|
||||
*,
|
||||
user_id: str,
|
||||
room_id: str,
|
||||
from_stream_id: int,
|
||||
to_stream_id: int,
|
||||
) -> bool:
|
||||
"""Check if the users tags for a room have been updated in the token range
|
||||
|
||||
(> `from_stream_id` and <= `to_stream_id`)
|
||||
|
||||
Args:
|
||||
user_id: The user to get tags for
|
||||
room_id: The room to get tags for
|
||||
from_stream_id: The point in the stream to fetch from
|
||||
to_stream_id: The point in the stream to fetch to
|
||||
|
||||
Returns:
|
||||
A mapping of tags to tag content.
|
||||
"""
|
||||
|
||||
# Shortcut if no room has changed for the user
|
||||
changed = self._account_data_stream_cache.has_entity_changed(
|
||||
user_id, int(from_stream_id)
|
||||
)
|
||||
if not changed:
|
||||
return False
|
||||
|
||||
last_change_position_for_room = await self.db_pool.simple_select_one_onecol(
|
||||
table="room_tags_revisions",
|
||||
keyvalues={"user_id": user_id, "room_id": room_id},
|
||||
retcol="stream_id",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
if last_change_position_for_room is None:
|
||||
return False
|
||||
|
||||
return (
|
||||
last_change_position_for_room > from_stream_id
|
||||
and last_change_position_for_room <= to_stream_id
|
||||
)
|
||||
|
||||
async def get_tags_for_room(
|
||||
self, user_id: str, room_id: str
|
||||
) -> Dict[str, JsonDict]:
|
||||
|
||||
@@ -675,7 +675,7 @@ class HaveSentRoomFlag(Enum):
|
||||
LIVE = "live"
|
||||
|
||||
|
||||
T = TypeVar("T", str, RoomStreamToken, MultiWriterStreamToken)
|
||||
T = TypeVar("T", str, RoomStreamToken, MultiWriterStreamToken, int)
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True, slots=True, frozen=True)
|
||||
@@ -823,6 +823,7 @@ class PerConnectionState:
|
||||
|
||||
rooms: RoomStatusMap[RoomStreamToken] = attr.Factory(RoomStatusMap)
|
||||
receipts: RoomStatusMap[MultiWriterStreamToken] = attr.Factory(RoomStatusMap)
|
||||
account_data: RoomStatusMap[int] = attr.Factory(RoomStatusMap)
|
||||
|
||||
room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict)
|
||||
|
||||
@@ -833,6 +834,7 @@ class PerConnectionState:
|
||||
return MutablePerConnectionState(
|
||||
rooms=self.rooms.get_mutable(),
|
||||
receipts=self.receipts.get_mutable(),
|
||||
account_data=self.account_data.get_mutable(),
|
||||
room_configs=ChainMap({}, room_configs),
|
||||
)
|
||||
|
||||
@@ -840,6 +842,7 @@ class PerConnectionState:
|
||||
return PerConnectionState(
|
||||
rooms=self.rooms.copy(),
|
||||
receipts=self.receipts.copy(),
|
||||
account_data=self.account_data.copy(),
|
||||
room_configs=dict(self.room_configs),
|
||||
)
|
||||
|
||||
@@ -853,6 +856,7 @@ class MutablePerConnectionState(PerConnectionState):
|
||||
|
||||
rooms: MutableRoomStatusMap[RoomStreamToken]
|
||||
receipts: MutableRoomStatusMap[MultiWriterStreamToken]
|
||||
account_data: MutableRoomStatusMap[int]
|
||||
|
||||
room_configs: typing.ChainMap[str, RoomSyncConfig]
|
||||
|
||||
@@ -860,6 +864,7 @@ class MutablePerConnectionState(PerConnectionState):
|
||||
return (
|
||||
bool(self.rooms.get_updates())
|
||||
or bool(self.receipts.get_updates())
|
||||
or bool(self.account_data.get_updates())
|
||||
or bool(self.get_room_config_updates())
|
||||
)
|
||||
|
||||
|
||||
@@ -354,6 +354,8 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
|
||||
"""
|
||||
On incremental sync, we return all account data for a given room but only for
|
||||
rooms that we request and are being returned in the Sliding Sync response.
|
||||
|
||||
(HaveSentRoomFlag.LIVE)
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
@@ -480,6 +482,337 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
|
||||
{"tags": {"m.favourite": {}, "m.server_notice": {}}},
|
||||
)
|
||||
|
||||
def test_room_account_data_incremental_sync_out_of_range_never(self) -> None:
|
||||
"""Tests that we don't return account data for rooms that fall out of
|
||||
range, but then do send all account data once they're back in range.
|
||||
|
||||
(initial/HaveSentRoomFlag.NEVER)
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
# Create a room and add some room account data
|
||||
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||
self.get_success(
|
||||
self.account_data_handler.add_account_data_to_room(
|
||||
user_id=user1_id,
|
||||
room_id=room_id1,
|
||||
account_data_type="org.matrix.roorarraz",
|
||||
content={"roo": "rar"},
|
||||
)
|
||||
)
|
||||
# Add a room tag to mark the room as a favourite
|
||||
self.get_success(
|
||||
self.account_data_handler.add_tag_to_room(
|
||||
user_id=user1_id,
|
||||
room_id=room_id1,
|
||||
tag="m.favourite",
|
||||
content={},
|
||||
)
|
||||
)
|
||||
|
||||
# Create another room with some room account data
|
||||
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||
self.get_success(
|
||||
self.account_data_handler.add_account_data_to_room(
|
||||
user_id=user1_id,
|
||||
room_id=room_id2,
|
||||
account_data_type="org.matrix.roorarraz",
|
||||
content={"roo": "rar"},
|
||||
)
|
||||
)
|
||||
# Add a room tag to mark the room as a favourite
|
||||
self.get_success(
|
||||
self.account_data_handler.add_tag_to_room(
|
||||
user_id=user1_id,
|
||||
room_id=room_id2,
|
||||
tag="m.favourite",
|
||||
content={},
|
||||
)
|
||||
)
|
||||
|
||||
# Now send a message into room1 so that it is at the top of the list
|
||||
self.helper.send(room_id1, body="new event", tok=user1_tok)
|
||||
|
||||
# Make a SS request for only the top room.
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"main": {
|
||||
"ranges": [[0, 0]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 0,
|
||||
}
|
||||
},
|
||||
"extensions": {
|
||||
"account_data": {
|
||||
"enabled": True,
|
||||
"lists": ["main"],
|
||||
}
|
||||
},
|
||||
}
|
||||
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# Only room1 should be in the response since it's the latest room with activity
|
||||
# and our range only includes 1 room.
|
||||
self.assertIncludes(
|
||||
response_body["extensions"]["account_data"].get("rooms").keys(),
|
||||
{room_id1},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Add some other room account data
|
||||
self.get_success(
|
||||
self.account_data_handler.add_account_data_to_room(
|
||||
user_id=user1_id,
|
||||
room_id=room_id1,
|
||||
account_data_type="org.matrix.roorarraz2",
|
||||
content={"roo": "rar"},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.account_data_handler.add_account_data_to_room(
|
||||
user_id=user1_id,
|
||||
room_id=room_id2,
|
||||
account_data_type="org.matrix.roorarraz2",
|
||||
content={"roo": "rar"},
|
||||
)
|
||||
)
|
||||
# Add another room tag
|
||||
self.get_success(
|
||||
self.account_data_handler.add_tag_to_room(
|
||||
user_id=user1_id,
|
||||
room_id=room_id1,
|
||||
tag="m.server_notice",
|
||||
content={},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.account_data_handler.add_tag_to_room(
|
||||
user_id=user1_id,
|
||||
room_id=room_id2,
|
||||
tag="m.server_notice",
|
||||
content={},
|
||||
)
|
||||
)
|
||||
|
||||
# Move room2 into range.
|
||||
self.helper.send(room_id2, body="new event", tok=user1_tok)
|
||||
|
||||
# Make an incremental Sliding Sync request with the account_data extension enabled
|
||||
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||
|
||||
self.assertIsNotNone(response_body["extensions"]["account_data"].get("global"))
|
||||
# We expect to see the account data of room2, as that has the most
|
||||
# recent update.
|
||||
self.assertIncludes(
|
||||
response_body["extensions"]["account_data"].get("rooms").keys(),
|
||||
{room_id2},
|
||||
exact=True,
|
||||
)
|
||||
# Since this is the first time we're seeing room2 down sync, we should see all
|
||||
# room account data for it.
|
||||
account_data_map = {
|
||||
event["type"]: event["content"]
|
||||
for event in response_body["extensions"]["account_data"]
|
||||
.get("rooms")
|
||||
.get(room_id2)
|
||||
}
|
||||
self.assertIncludes(
|
||||
account_data_map.keys(),
|
||||
{"org.matrix.roorarraz", "org.matrix.roorarraz2", AccountDataTypes.TAG},
|
||||
exact=True,
|
||||
)
|
||||
self.assertEqual(account_data_map["org.matrix.roorarraz"], {"roo": "rar"})
|
||||
self.assertEqual(account_data_map["org.matrix.roorarraz2"], {"roo": "rar"})
|
||||
self.assertEqual(
|
||||
account_data_map[AccountDataTypes.TAG],
|
||||
{"tags": {"m.favourite": {}, "m.server_notice": {}}},
|
||||
)
|
||||
|
||||
def test_room_account_data_incremental_sync_out_of_range_previously(self) -> None:
|
||||
"""Tests that we don't return account data for rooms that fall out of
|
||||
range, but then do send all account data that has changed they're back in range.
|
||||
|
||||
(HaveSentRoomFlag.PREVIOUSLY)
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
# Create a room and add some room account data
|
||||
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||
self.get_success(
|
||||
self.account_data_handler.add_account_data_to_room(
|
||||
user_id=user1_id,
|
||||
room_id=room_id1,
|
||||
account_data_type="org.matrix.roorarraz",
|
||||
content={"roo": "rar"},
|
||||
)
|
||||
)
|
||||
# Add a room tag to mark the room as a favourite
|
||||
self.get_success(
|
||||
self.account_data_handler.add_tag_to_room(
|
||||
user_id=user1_id,
|
||||
room_id=room_id1,
|
||||
tag="m.favourite",
|
||||
content={},
|
||||
)
|
||||
)
|
||||
|
||||
# Create another room with some room account data
|
||||
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||
self.get_success(
|
||||
self.account_data_handler.add_account_data_to_room(
|
||||
user_id=user1_id,
|
||||
room_id=room_id2,
|
||||
account_data_type="org.matrix.roorarraz",
|
||||
content={"roo": "rar"},
|
||||
)
|
||||
)
|
||||
# Add a room tag to mark the room as a favourite
|
||||
self.get_success(
|
||||
self.account_data_handler.add_tag_to_room(
|
||||
user_id=user1_id,
|
||||
room_id=room_id2,
|
||||
tag="m.favourite",
|
||||
content={},
|
||||
)
|
||||
)
|
||||
|
||||
# Make an initial Sliding Sync request for only room1 and room2.
|
||||
sync_body = {
|
||||
"lists": {},
|
||||
"room_subscriptions": {
|
||||
room_id1: {
|
||||
"required_state": [],
|
||||
"timeline_limit": 0,
|
||||
},
|
||||
room_id2: {
|
||||
"required_state": [],
|
||||
"timeline_limit": 0,
|
||||
},
|
||||
},
|
||||
"extensions": {
|
||||
"account_data": {
|
||||
"enabled": True,
|
||||
"rooms": [room_id1, room_id2],
|
||||
}
|
||||
},
|
||||
}
|
||||
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# Both rooms show up because we have a room subscription for each and they're
|
||||
# requested in the `account_data` extension.
|
||||
self.assertIncludes(
|
||||
response_body["extensions"]["account_data"].get("rooms").keys(),
|
||||
{room_id1, room_id2},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Add some other room account data
|
||||
self.get_success(
|
||||
self.account_data_handler.add_account_data_to_room(
|
||||
user_id=user1_id,
|
||||
room_id=room_id1,
|
||||
account_data_type="org.matrix.roorarraz2",
|
||||
content={"roo": "rar"},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.account_data_handler.add_account_data_to_room(
|
||||
user_id=user1_id,
|
||||
room_id=room_id2,
|
||||
account_data_type="org.matrix.roorarraz2",
|
||||
content={"roo": "rar"},
|
||||
)
|
||||
)
|
||||
# Add another room tag
|
||||
self.get_success(
|
||||
self.account_data_handler.add_tag_to_room(
|
||||
user_id=user1_id,
|
||||
room_id=room_id1,
|
||||
tag="m.server_notice",
|
||||
content={},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.account_data_handler.add_tag_to_room(
|
||||
user_id=user1_id,
|
||||
room_id=room_id2,
|
||||
tag="m.server_notice",
|
||||
content={},
|
||||
)
|
||||
)
|
||||
|
||||
# Make an incremental Sliding Sync request for just room1
|
||||
response_body, from_token = self.do_sync(
|
||||
{
|
||||
**sync_body,
|
||||
"room_subscriptions": {
|
||||
room_id1: {
|
||||
"required_state": [],
|
||||
"timeline_limit": 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
since=from_token,
|
||||
tok=user1_tok,
|
||||
)
|
||||
|
||||
# Only room1 shows up because we only have a room subscription for room1 now.
|
||||
self.assertIncludes(
|
||||
response_body["extensions"]["account_data"].get("rooms").keys(),
|
||||
{room_id1},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Make an incremental Sliding Sync request for just room2 now
|
||||
response_body, from_token = self.do_sync(
|
||||
{
|
||||
**sync_body,
|
||||
"room_subscriptions": {
|
||||
room_id2: {
|
||||
"required_state": [],
|
||||
"timeline_limit": 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
since=from_token,
|
||||
tok=user1_tok,
|
||||
)
|
||||
|
||||
# Only room2 shows up because we only have a room subscription for room2 now.
|
||||
self.assertIncludes(
|
||||
response_body["extensions"]["account_data"].get("rooms").keys(),
|
||||
{room_id2},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
self.assertIsNotNone(response_body["extensions"]["account_data"].get("global"))
|
||||
# Check for room account data for room2
|
||||
self.assertIncludes(
|
||||
response_body["extensions"]["account_data"].get("rooms").keys(),
|
||||
{room_id2},
|
||||
exact=True,
|
||||
)
|
||||
# We should see any room account data updates for room2 since the last
|
||||
# time we saw it down sync
|
||||
account_data_map = {
|
||||
event["type"]: event["content"]
|
||||
for event in response_body["extensions"]["account_data"]
|
||||
.get("rooms")
|
||||
.get(room_id2)
|
||||
}
|
||||
self.assertIncludes(
|
||||
account_data_map.keys(),
|
||||
{"org.matrix.roorarraz2", AccountDataTypes.TAG},
|
||||
exact=True,
|
||||
)
|
||||
self.assertEqual(account_data_map["org.matrix.roorarraz2"], {"roo": "rar"})
|
||||
self.assertEqual(
|
||||
account_data_map[AccountDataTypes.TAG],
|
||||
{"tags": {"m.favourite": {}, "m.server_notice": {}}},
|
||||
)
|
||||
|
||||
def test_wait_for_new_data(self) -> None:
|
||||
"""
|
||||
Test to make sure that the Sliding Sync request waits for new data to arrive.
|
||||
|
||||
Reference in New Issue
Block a user