mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-07 01:20:16 +00:00
Compare commits
6 Commits
madlittlem
...
madlittlem
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3ac78ff218 | ||
|
|
c51b9fb51e | ||
|
|
6ccd2d8b32 | ||
|
|
e8a5dbb10c | ||
|
|
45cb906695 | ||
|
|
6885d368bf |
1
changelog.d/17505.feature
Normal file
1
changelog.d/17505.feature
Normal file
@@ -0,0 +1 @@
|
||||
Add typing notification extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
|
||||
@@ -288,6 +288,10 @@ class ReceiptEventSource(EventSource[MultiWriterStreamToken, JsonMapping]):
|
||||
explicit_room_id: Optional[str] = None,
|
||||
to_key: Optional[MultiWriterStreamToken] = None,
|
||||
) -> Tuple[List[JsonMapping], MultiWriterStreamToken]:
|
||||
"""
|
||||
Find read receipts for given rooms (> `from_token` and <= `to_token`)
|
||||
"""
|
||||
|
||||
if to_key is None:
|
||||
to_key = self.get_current_key()
|
||||
|
||||
|
||||
@@ -2284,11 +2284,24 @@ class SlidingSyncHandler:
|
||||
from_token=from_token,
|
||||
)
|
||||
|
||||
typing_response = None
|
||||
if sync_config.extensions.typing is not None:
|
||||
typing_response = await self.get_typing_extension_response(
|
||||
sync_config=sync_config,
|
||||
actual_lists=actual_lists,
|
||||
actual_room_ids=actual_room_ids,
|
||||
actual_room_response_map=actual_room_response_map,
|
||||
typing_request=sync_config.extensions.typing,
|
||||
to_token=to_token,
|
||||
from_token=from_token,
|
||||
)
|
||||
|
||||
return SlidingSyncResult.Extensions(
|
||||
to_device=to_device_response,
|
||||
e2ee=e2ee_response,
|
||||
account_data=account_data_response,
|
||||
receipts=receipts_response,
|
||||
typing=typing_response,
|
||||
)
|
||||
|
||||
def find_relevant_room_ids_for_extension(
|
||||
@@ -2615,6 +2628,8 @@ class SlidingSyncHandler:
|
||||
|
||||
room_id_to_receipt_map: Dict[str, JsonMapping] = {}
|
||||
if len(relevant_room_ids) > 0:
|
||||
# TODO: Take connection tracking into account so that when a room comes back
|
||||
# into range we can send the receipts that were missed.
|
||||
receipt_source = self.event_sources.sources.receipt
|
||||
receipts, _ = await receipt_source.get_new_events(
|
||||
user=sync_config.user,
|
||||
@@ -2636,6 +2651,8 @@ class SlidingSyncHandler:
|
||||
type = receipt["type"]
|
||||
content = receipt["content"]
|
||||
|
||||
# For `inital: True` rooms, we only want to include receipts for events
|
||||
# in the timeline.
|
||||
room_result = actual_room_response_map.get(room_id)
|
||||
if room_result is not None:
|
||||
if room_result.initial:
|
||||
@@ -2659,6 +2676,70 @@ class SlidingSyncHandler:
|
||||
room_id_to_receipt_map=room_id_to_receipt_map,
|
||||
)
|
||||
|
||||
async def get_typing_extension_response(
|
||||
self,
|
||||
sync_config: SlidingSyncConfig,
|
||||
actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
|
||||
actual_room_ids: Set[str],
|
||||
actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult],
|
||||
typing_request: SlidingSyncConfig.Extensions.TypingExtension,
|
||||
to_token: StreamToken,
|
||||
from_token: Optional[SlidingSyncStreamToken],
|
||||
) -> Optional[SlidingSyncResult.Extensions.TypingExtension]:
|
||||
"""Handle Typing Notification extension (MSC3961)
|
||||
|
||||
Args:
|
||||
sync_config: Sync configuration
|
||||
actual_lists: Sliding window API. A map of list key to list results in the
|
||||
Sliding Sync response.
|
||||
actual_room_ids: The actual room IDs in the the Sliding Sync response.
|
||||
actual_room_response_map: A map of room ID to room results in the the
|
||||
Sliding Sync response.
|
||||
account_data_request: The account_data extension from the request
|
||||
to_token: The point in the stream to sync up to.
|
||||
from_token: The point in the stream to sync from.
|
||||
"""
|
||||
# Skip if the extension is not enabled
|
||||
if not typing_request.enabled:
|
||||
return None
|
||||
|
||||
relevant_room_ids = self.find_relevant_room_ids_for_extension(
|
||||
requested_lists=typing_request.lists,
|
||||
requested_room_ids=typing_request.rooms,
|
||||
actual_lists=actual_lists,
|
||||
actual_room_ids=actual_room_ids,
|
||||
)
|
||||
|
||||
room_id_to_typing_map: Dict[str, JsonMapping] = {}
|
||||
if len(relevant_room_ids) > 0:
|
||||
# Note: We don't need to take connection tracking into account for typing
|
||||
# notifications because they'll get anything still relevant and hasn't timed
|
||||
# out when the room comes back in range. We consider the gap where the room
|
||||
# fell out of range, as long enough for any typing notifications to have
|
||||
# timed out (it's not worth the 30 seconds of data we may have missed).
|
||||
typing_source = self.event_sources.sources.typing
|
||||
typing_notifications, _ = await typing_source.get_new_events(
|
||||
user=sync_config.user,
|
||||
from_key=(from_token.stream_token.typing_key if from_token else 0),
|
||||
to_key=to_token.typing_key,
|
||||
# This is a dummy value and isn't used in the function
|
||||
limit=0,
|
||||
room_ids=relevant_room_ids,
|
||||
is_guest=False,
|
||||
)
|
||||
|
||||
for typing_notification in typing_notifications:
|
||||
# These fields should exist for every typing notification
|
||||
room_id = typing_notification["room_id"]
|
||||
type = typing_notification["type"]
|
||||
content = typing_notification["content"]
|
||||
|
||||
room_id_to_typing_map[room_id] = {"type": type, "content": content}
|
||||
|
||||
return SlidingSyncResult.Extensions.TypingExtension(
|
||||
room_id_to_typing_map=room_id_to_typing_map,
|
||||
)
|
||||
|
||||
|
||||
class HaveSentRoomFlag(Enum):
|
||||
"""Flag for whether we have sent the room down a sliding sync connection.
|
||||
|
||||
@@ -565,7 +565,12 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
|
||||
room_ids: Iterable[str],
|
||||
is_guest: bool,
|
||||
explicit_room_id: Optional[str] = None,
|
||||
to_key: Optional[int] = None,
|
||||
) -> Tuple[List[JsonMapping], int]:
|
||||
"""
|
||||
Find typing notifications for given rooms (> `from_token` and <= `to_token`)
|
||||
"""
|
||||
|
||||
with Measure(self.clock, "typing.get_new_events"):
|
||||
from_key = int(from_key)
|
||||
handler = self.get_typing_handler()
|
||||
@@ -574,7 +579,9 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
|
||||
for room_id in room_ids:
|
||||
if room_id not in handler._room_serials:
|
||||
continue
|
||||
if handler._room_serials[room_id] <= from_key:
|
||||
if handler._room_serials[room_id] <= from_key or (
|
||||
to_key is not None and handler._room_serials[room_id] > to_key
|
||||
):
|
||||
continue
|
||||
|
||||
events.append(self._make_event_for(room_id))
|
||||
|
||||
@@ -1152,10 +1152,14 @@ class SlidingSyncRestServlet(RestServlet):
|
||||
|
||||
if extensions.receipts is not None:
|
||||
serialized_extensions["receipts"] = {
|
||||
# Same as the the top-level `account_data.events` field in Sync v2.
|
||||
"rooms": extensions.receipts.room_id_to_receipt_map,
|
||||
}
|
||||
|
||||
if extensions.typing is not None:
|
||||
serialized_extensions["typing"] = {
|
||||
"rooms": extensions.typing.room_id_to_typing_map,
|
||||
}
|
||||
|
||||
return serialized_extensions
|
||||
|
||||
|
||||
|
||||
@@ -366,7 +366,8 @@ class SlidingSyncResult:
|
||||
"""The Receipts extension (MSC3960)
|
||||
|
||||
Attributes:
|
||||
room_id_to_receipt_map: Mapping from room_id to `m.receipt` event (type, content)
|
||||
room_id_to_receipt_map: Mapping from room_id to `m.receipt` ephemeral
|
||||
event (type, content)
|
||||
"""
|
||||
|
||||
room_id_to_receipt_map: Mapping[str, JsonMapping]
|
||||
@@ -374,14 +375,33 @@ class SlidingSyncResult:
|
||||
def __bool__(self) -> bool:
|
||||
return bool(self.room_id_to_receipt_map)
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class TypingExtension:
|
||||
"""The Typing Notification extension (MSC3961)
|
||||
|
||||
Attributes:
|
||||
room_id_to_typing_map: Mapping from room_id to `m.typing` ephemeral
|
||||
event (type, content)
|
||||
"""
|
||||
|
||||
room_id_to_typing_map: Mapping[str, JsonMapping]
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return bool(self.room_id_to_typing_map)
|
||||
|
||||
to_device: Optional[ToDeviceExtension] = None
|
||||
e2ee: Optional[E2eeExtension] = None
|
||||
account_data: Optional[AccountDataExtension] = None
|
||||
receipts: Optional[ReceiptsExtension] = None
|
||||
typing: Optional[TypingExtension] = None
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return bool(
|
||||
self.to_device or self.e2ee or self.account_data or self.receipts
|
||||
self.to_device
|
||||
or self.e2ee
|
||||
or self.account_data
|
||||
or self.receipts
|
||||
or self.typing
|
||||
)
|
||||
|
||||
next_pos: SlidingSyncStreamToken
|
||||
|
||||
@@ -359,10 +359,28 @@ class SlidingSyncBody(RequestBodyModel):
|
||||
# Process all room subscriptions defined in the Room Subscription API. (This is the default.)
|
||||
rooms: Optional[List[StrictStr]] = ["*"]
|
||||
|
||||
class TypingExtension(RequestBodyModel):
|
||||
"""The Typing Notification extension (MSC3961)
|
||||
|
||||
Attributes:
|
||||
enabled
|
||||
lists: List of list keys (from the Sliding Window API) to apply this
|
||||
extension to.
|
||||
rooms: List of room IDs (from the Room Subscription API) to apply this
|
||||
extension to.
|
||||
"""
|
||||
|
||||
enabled: Optional[StrictBool] = False
|
||||
# Process all lists defined in the Sliding Window API. (This is the default.)
|
||||
lists: Optional[List[StrictStr]] = ["*"]
|
||||
# Process all room subscriptions defined in the Room Subscription API. (This is the default.)
|
||||
rooms: Optional[List[StrictStr]] = ["*"]
|
||||
|
||||
to_device: Optional[ToDeviceExtension] = None
|
||||
e2ee: Optional[E2eeExtension] = None
|
||||
account_data: Optional[AccountDataExtension] = None
|
||||
receipts: Optional[ReceiptsExtension] = None
|
||||
typing: Optional[TypingExtension] = None
|
||||
|
||||
conn_id: Optional[str]
|
||||
|
||||
|
||||
20
tests/rest/client/sliding_sync/__init__.py
Normal file
20
tests/rest/client/sliding_sync/__init__.py
Normal file
@@ -0,0 +1,20 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright 2015, 2016 OpenMarket Ltd
|
||||
# Copyright (C) 2023 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
# Originally licensed under the Apache License, Version 2.0:
|
||||
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
||||
#
|
||||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
489
tests/rest/client/sliding_sync/test_extension_typing.py
Normal file
489
tests/rest/client/sliding_sync/test_extension_typing.py
Normal file
@@ -0,0 +1,489 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
# Copyright (C) 2023 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
# Originally licensed under the Apache License, Version 2.0:
|
||||
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
||||
#
|
||||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
import logging
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
import synapse.rest.admin
|
||||
from synapse.api.constants import EduTypes
|
||||
from synapse.rest.client import login, room, sync
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import StreamKeyType
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase
|
||||
from tests.server import TimedOutException
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SlidingSyncTypingExtensionTestCase(SlidingSyncBase):
|
||||
"""Tests for the typing notification sliding sync extension"""
|
||||
|
||||
servlets = [
|
||||
synapse.rest.admin.register_servlets,
|
||||
login.register_servlets,
|
||||
room.register_servlets,
|
||||
sync.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
def test_no_data_initial_sync(self) -> None:
|
||||
"""
|
||||
Test that enabling the typing extension works during an intitial sync,
|
||||
even if there is no-data.
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
# Make an initial Sliding Sync request with the typing extension enabled
|
||||
sync_body = {
|
||||
"lists": {},
|
||||
"extensions": {
|
||||
"typing": {
|
||||
"enabled": True,
|
||||
}
|
||||
},
|
||||
}
|
||||
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
self.assertIncludes(
|
||||
response_body["extensions"]["typing"].get("rooms").keys(),
|
||||
set(),
|
||||
exact=True,
|
||||
)
|
||||
|
||||
def test_no_data_incremental_sync(self) -> None:
|
||||
"""
|
||||
Test that enabling typing extension works during an incremental sync, even
|
||||
if there is no-data.
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
sync_body = {
|
||||
"lists": {},
|
||||
"extensions": {
|
||||
"typing": {
|
||||
"enabled": True,
|
||||
}
|
||||
},
|
||||
}
|
||||
_, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# Make an incremental Sliding Sync request with the typing extension enabled
|
||||
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||
|
||||
self.assertIncludes(
|
||||
response_body["extensions"]["typing"].get("rooms").keys(),
|
||||
set(),
|
||||
exact=True,
|
||||
)
|
||||
|
||||
def test_typing_initial_sync(self) -> None:
|
||||
"""
|
||||
On initial sync, we return all typing notifications for rooms that we request
|
||||
and are being returned in the Sliding Sync response.
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
user3_id = self.register_user("user3", "pass")
|
||||
user3_tok = self.login(user3_id, "pass")
|
||||
user4_id = self.register_user("user4", "pass")
|
||||
user4_tok = self.login(user4_id, "pass")
|
||||
|
||||
# Create a room
|
||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||
self.helper.join(room_id1, user3_id, tok=user3_tok)
|
||||
self.helper.join(room_id1, user4_id, tok=user4_tok)
|
||||
# User1 starts typing in room1
|
||||
channel = self.make_request(
|
||||
"PUT",
|
||||
f"/rooms/{room_id1}/typing/{user1_id}",
|
||||
b'{"typing": true, "timeout": 30000}',
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
# User2 starts typing in room1
|
||||
channel = self.make_request(
|
||||
"PUT",
|
||||
f"/rooms/{room_id1}/typing/{user2_id}",
|
||||
b'{"typing": true, "timeout": 30000}',
|
||||
access_token=user2_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
# Create another room
|
||||
room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.join(room_id2, user1_id, tok=user1_tok)
|
||||
self.helper.join(room_id2, user3_id, tok=user3_tok)
|
||||
self.helper.join(room_id2, user4_id, tok=user4_tok)
|
||||
# User1 starts typing in room2
|
||||
channel = self.make_request(
|
||||
"PUT",
|
||||
f"/rooms/{room_id2}/typing/{user1_id}",
|
||||
b'{"typing": true, "timeout": 30000}',
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
# User2 starts typing in room2
|
||||
channel = self.make_request(
|
||||
"PUT",
|
||||
f"/rooms/{room_id2}/typing/{user2_id}",
|
||||
b'{"typing": true, "timeout": 30000}',
|
||||
access_token=user2_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
# Make an initial Sliding Sync request with the typing extension enabled
|
||||
sync_body = {
|
||||
"lists": {},
|
||||
"room_subscriptions": {
|
||||
room_id1: {
|
||||
"required_state": [],
|
||||
"timeline_limit": 0,
|
||||
}
|
||||
},
|
||||
"extensions": {
|
||||
"typing": {
|
||||
"enabled": True,
|
||||
"rooms": [room_id1, room_id2],
|
||||
}
|
||||
},
|
||||
}
|
||||
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# Even though we requested room2, we only expect room1 to show up because that's
|
||||
# the only room in the Sliding Sync response (room2 is not one of our room
|
||||
# subscriptions or in a sliding window list).
|
||||
self.assertIncludes(
|
||||
response_body["extensions"]["typing"].get("rooms").keys(),
|
||||
{room_id1},
|
||||
exact=True,
|
||||
)
|
||||
# Sanity check that it's the correct ephemeral event type
|
||||
self.assertEqual(
|
||||
response_body["extensions"]["typing"]["rooms"][room_id1]["type"],
|
||||
EduTypes.TYPING,
|
||||
)
|
||||
# We can see user1 and user2 typing
|
||||
self.assertIncludes(
|
||||
set(
|
||||
response_body["extensions"]["typing"]["rooms"][room_id1]["content"][
|
||||
"user_ids"
|
||||
]
|
||||
),
|
||||
{user1_id, user2_id},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
def test_typing_incremental_sync(self) -> None:
|
||||
"""
|
||||
On incremental sync, we return all typing notifications in the token range for a
|
||||
given room but only for rooms that we request and are being returned in the
|
||||
Sliding Sync response.
|
||||
"""
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
user3_id = self.register_user("user3", "pass")
|
||||
user3_tok = self.login(user3_id, "pass")
|
||||
|
||||
# Create room1
|
||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||
self.helper.join(room_id1, user3_id, tok=user3_tok)
|
||||
# User2 starts typing in room1
|
||||
channel = self.make_request(
|
||||
"PUT",
|
||||
f"/rooms/{room_id1}/typing/{user2_id}",
|
||||
b'{"typing": true, "timeout": 30000}',
|
||||
access_token=user2_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
# Create room2
|
||||
room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.join(room_id2, user1_id, tok=user1_tok)
|
||||
# User1 starts typing in room2 (before the `from_token`)
|
||||
channel = self.make_request(
|
||||
"PUT",
|
||||
f"/rooms/{room_id2}/typing/{user1_id}",
|
||||
b'{"typing": true, "timeout": 30000}',
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
# Create room3
|
||||
room_id3 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.join(room_id3, user1_id, tok=user1_tok)
|
||||
self.helper.join(room_id3, user3_id, tok=user3_tok)
|
||||
|
||||
# Create room4
|
||||
room_id4 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.join(room_id4, user1_id, tok=user1_tok)
|
||||
self.helper.join(room_id4, user3_id, tok=user3_tok)
|
||||
# User1 starts typing in room4 (before the `from_token`)
|
||||
channel = self.make_request(
|
||||
"PUT",
|
||||
f"/rooms/{room_id4}/typing/{user1_id}",
|
||||
b'{"typing": true, "timeout": 30000}',
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
# Advance time so all of the typing notifications timeout before we make our
|
||||
# Sliding Sync requests. Even though these are sent before the `from_token`, the
|
||||
# typing code only keeps track of stream position of the latest typing
|
||||
# notification so "old" typing notifications that are still "alive" (haven't
|
||||
# timed out) can appear in the response.
|
||||
self.reactor.advance(36)
|
||||
|
||||
sync_body = {
|
||||
"lists": {},
|
||||
"room_subscriptions": {
|
||||
room_id1: {
|
||||
"required_state": [],
|
||||
"timeline_limit": 0,
|
||||
},
|
||||
room_id3: {
|
||||
"required_state": [],
|
||||
"timeline_limit": 0,
|
||||
},
|
||||
room_id4: {
|
||||
"required_state": [],
|
||||
"timeline_limit": 0,
|
||||
},
|
||||
},
|
||||
"extensions": {
|
||||
"typing": {
|
||||
"enabled": True,
|
||||
"rooms": [room_id1, room_id2, room_id3, room_id4],
|
||||
}
|
||||
},
|
||||
}
|
||||
_, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# Add some more typing notifications after the `from_token`
|
||||
#
|
||||
# User1 starts typing in room1
|
||||
channel = self.make_request(
|
||||
"PUT",
|
||||
f"/rooms/{room_id1}/typing/{user1_id}",
|
||||
b'{"typing": true, "timeout": 30000}',
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
# User1 starts typing in room2
|
||||
channel = self.make_request(
|
||||
"PUT",
|
||||
f"/rooms/{room_id2}/typing/{user1_id}",
|
||||
b'{"typing": true, "timeout": 30000}',
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
# User3 starts typing in room3
|
||||
channel = self.make_request(
|
||||
"PUT",
|
||||
f"/rooms/{room_id3}/typing/{user3_id}",
|
||||
b'{"typing": true, "timeout": 30000}',
|
||||
access_token=user3_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
# No activity for room4 after the `from_token`
|
||||
|
||||
# Make an incremental Sliding Sync request with the typing extension enabled
|
||||
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||
|
||||
# Even though we requested room2, we only expect rooms to show up if they are
|
||||
# already in the Sliding Sync response. room4 doesn't show up because there is
|
||||
# no activity after the `from_token`.
|
||||
self.assertIncludes(
|
||||
response_body["extensions"]["typing"].get("rooms").keys(),
|
||||
{room_id1, room_id3},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Check room1:
|
||||
#
|
||||
# Sanity check that it's the correct ephemeral event type
|
||||
self.assertEqual(
|
||||
response_body["extensions"]["typing"]["rooms"][room_id1]["type"],
|
||||
EduTypes.TYPING,
|
||||
)
|
||||
# We only see that user1 is typing in room1 since the `from_token`
|
||||
self.assertIncludes(
|
||||
set(
|
||||
response_body["extensions"]["typing"]["rooms"][room_id1]["content"][
|
||||
"user_ids"
|
||||
]
|
||||
),
|
||||
{user1_id},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Check room3:
|
||||
#
|
||||
# Sanity check that it's the correct ephemeral event type
|
||||
self.assertEqual(
|
||||
response_body["extensions"]["typing"]["rooms"][room_id3]["type"],
|
||||
EduTypes.TYPING,
|
||||
)
|
||||
# We only see that user3 is typing in room1 since the `from_token`
|
||||
self.assertIncludes(
|
||||
set(
|
||||
response_body["extensions"]["typing"]["rooms"][room_id3]["content"][
|
||||
"user_ids"
|
||||
]
|
||||
),
|
||||
{user3_id},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
def test_wait_for_new_data(self) -> None:
|
||||
"""
|
||||
Test to make sure that the Sliding Sync request waits for new data to arrive.
|
||||
|
||||
(Only applies to incremental syncs with a `timeout` specified)
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.join(room_id, user1_id, tok=user1_tok)
|
||||
|
||||
sync_body = {
|
||||
"lists": {},
|
||||
"room_subscriptions": {
|
||||
room_id: {
|
||||
"required_state": [],
|
||||
"timeline_limit": 0,
|
||||
},
|
||||
},
|
||||
"extensions": {
|
||||
"typing": {
|
||||
"enabled": True,
|
||||
"rooms": [room_id],
|
||||
}
|
||||
},
|
||||
}
|
||||
_, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# Make an incremental Sliding Sync request with the typing extension enabled
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint + f"?timeout=10000&pos={from_token}",
|
||||
content=sync_body,
|
||||
access_token=user1_tok,
|
||||
await_result=False,
|
||||
)
|
||||
# Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
|
||||
with self.assertRaises(TimedOutException):
|
||||
channel.await_result(timeout_ms=5000)
|
||||
# Bump the typing status to trigger new results
|
||||
typing_channel = self.make_request(
|
||||
"PUT",
|
||||
f"/rooms/{room_id}/typing/{user2_id}",
|
||||
b'{"typing": true, "timeout": 30000}',
|
||||
access_token=user2_tok,
|
||||
)
|
||||
self.assertEqual(typing_channel.code, 200, typing_channel.json_body)
|
||||
# Should respond before the 10 second timeout
|
||||
channel.await_result(timeout_ms=3000)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
# We should see the new typing notification
|
||||
self.assertIncludes(
|
||||
channel.json_body.get("extensions", {})
|
||||
.get("typing", {})
|
||||
.get("rooms", {})
|
||||
.keys(),
|
||||
{room_id},
|
||||
exact=True,
|
||||
message=str(channel.json_body),
|
||||
)
|
||||
self.assertIncludes(
|
||||
set(
|
||||
channel.json_body["extensions"]["typing"]["rooms"][room_id]["content"][
|
||||
"user_ids"
|
||||
]
|
||||
),
|
||||
{user2_id},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
def test_wait_for_new_data_timeout(self) -> None:
|
||||
"""
|
||||
Test to make sure that the Sliding Sync request waits for new data to arrive but
|
||||
no data ever arrives so we timeout. We're also making sure that the default data
|
||||
from the typing extension doesn't trigger a false-positive for new data.
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
sync_body = {
|
||||
"lists": {},
|
||||
"extensions": {
|
||||
"typing": {
|
||||
"enabled": True,
|
||||
}
|
||||
},
|
||||
}
|
||||
_, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# Make the Sliding Sync request
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint + f"?timeout=10000&pos={from_token}",
|
||||
content=sync_body,
|
||||
access_token=user1_tok,
|
||||
await_result=False,
|
||||
)
|
||||
# Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
|
||||
with self.assertRaises(TimedOutException):
|
||||
channel.await_result(timeout_ms=5000)
|
||||
# Wake-up `notifier.wait_for_events(...)` that will cause us test
|
||||
# `SlidingSyncResult.__bool__` for new results.
|
||||
self._bump_notifier_wait_for_events(
|
||||
user1_id, wake_stream_key=StreamKeyType.ACCOUNT_DATA
|
||||
)
|
||||
# Block for a little bit more to ensure we don't see any new results.
|
||||
with self.assertRaises(TimedOutException):
|
||||
channel.await_result(timeout_ms=4000)
|
||||
# Wait for the sync to complete (wait for the rest of the 10 second timeout,
|
||||
# 5000 + 4000 + 1200 > 10000)
|
||||
channel.await_result(timeout_ms=1200)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
self.assertIncludes(
|
||||
channel.json_body["extensions"]["typing"].get("rooms").keys(),
|
||||
set(),
|
||||
exact=True,
|
||||
)
|
||||
290
tests/rest/client/sliding_sync/test_extensions.py
Normal file
290
tests/rest/client/sliding_sync/test_extensions.py
Normal file
@@ -0,0 +1,290 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
# Copyright (C) 2023 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
# Originally licensed under the Apache License, Version 2.0:
|
||||
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
||||
#
|
||||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
import logging
|
||||
from typing import Literal
|
||||
|
||||
from parameterized import parameterized
|
||||
from typing_extensions import assert_never
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
import synapse.rest.admin
|
||||
from synapse.api.constants import ReceiptTypes
|
||||
from synapse.rest.client import login, receipts, room, sync
|
||||
from synapse.server import HomeServer
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SlidingSyncExtensionsTestCase(SlidingSyncBase):
|
||||
"""
|
||||
Test general extensions behavior in the Sliding Sync API. Each extension has their
|
||||
own suite of tests in their own file as well.
|
||||
"""
|
||||
|
||||
servlets = [
|
||||
synapse.rest.admin.register_servlets,
|
||||
login.register_servlets,
|
||||
room.register_servlets,
|
||||
sync.register_servlets,
|
||||
receipts.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.store = hs.get_datastores().main
|
||||
self.storage_controllers = hs.get_storage_controllers()
|
||||
self.account_data_handler = hs.get_account_data_handler()
|
||||
|
||||
# Any extensions that use `lists`/`rooms` should be tested here
|
||||
@parameterized.expand([("account_data",), ("receipts",), ("typing",)])
|
||||
def test_extensions_lists_rooms_relevant_rooms(
|
||||
self,
|
||||
extension_name: Literal["account_data", "receipts", "typing"],
|
||||
) -> None:
|
||||
"""
|
||||
With various extensions, test out requesting different variations of
|
||||
`lists`/`rooms`.
|
||||
|
||||
Stresses `SlidingSyncHandler.find_relevant_room_ids_for_extension(...)`
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
# Create some rooms
|
||||
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||
room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||
room_id4 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||
room_id5 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||
|
||||
room_id_to_human_name_map = {
|
||||
room_id1: "room1",
|
||||
room_id2: "room2",
|
||||
room_id3: "room3",
|
||||
room_id4: "room4",
|
||||
room_id5: "room5",
|
||||
}
|
||||
|
||||
for room_id in room_id_to_human_name_map.keys():
|
||||
if extension_name == "account_data":
|
||||
# Add some account data to each room
|
||||
self.get_success(
|
||||
self.account_data_handler.add_account_data_to_room(
|
||||
user_id=user1_id,
|
||||
room_id=room_id,
|
||||
account_data_type="org.matrix.roorarraz",
|
||||
content={"roo": "rar"},
|
||||
)
|
||||
)
|
||||
elif extension_name == "receipts":
|
||||
event_response = self.helper.send(
|
||||
room_id, body="new event", tok=user1_tok
|
||||
)
|
||||
# Read last event
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{event_response['event_id']}",
|
||||
{},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
elif extension_name == "typing":
|
||||
# Start a typing notification
|
||||
channel = self.make_request(
|
||||
"PUT",
|
||||
f"/rooms/{room_id}/typing/{user1_id}",
|
||||
b'{"typing": true, "timeout": 30000}',
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
else:
|
||||
assert_never(extension_name)
|
||||
|
||||
main_sync_body = {
|
||||
"lists": {
|
||||
# We expect this list range to include room5 and room4
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 0,
|
||||
},
|
||||
# We expect this list range to include room5, room4, room3
|
||||
"bar-list": {
|
||||
"ranges": [[0, 2]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 0,
|
||||
},
|
||||
},
|
||||
"room_subscriptions": {
|
||||
room_id1: {
|
||||
"required_state": [],
|
||||
"timeline_limit": 0,
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
# Mix lists and rooms
|
||||
sync_body = {
|
||||
**main_sync_body,
|
||||
"extensions": {
|
||||
extension_name: {
|
||||
"enabled": True,
|
||||
"lists": ["foo-list", "non-existent-list"],
|
||||
"rooms": [room_id1, room_id2, "!non-existent-room"],
|
||||
}
|
||||
},
|
||||
}
|
||||
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# room1: ✅ Requested via `rooms` and a room subscription exists
|
||||
# room2: ❌ Requested via `rooms` but not in the response (from lists or room subscriptions)
|
||||
# room3: ❌ Not requested
|
||||
# room4: ✅ Shows up because requested via `lists` and list exists in the response
|
||||
# room5: ✅ Shows up because requested via `lists` and list exists in the response
|
||||
self.assertIncludes(
|
||||
{
|
||||
room_id_to_human_name_map[room_id]
|
||||
for room_id in response_body["extensions"][extension_name]
|
||||
.get("rooms")
|
||||
.keys()
|
||||
},
|
||||
{"room1", "room4", "room5"},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Try wildcards (this is the default)
|
||||
sync_body = {
|
||||
**main_sync_body,
|
||||
"extensions": {
|
||||
extension_name: {
|
||||
"enabled": True,
|
||||
# "lists": ["*"],
|
||||
# "rooms": ["*"],
|
||||
}
|
||||
},
|
||||
}
|
||||
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# room1: ✅ Shows up because of default `rooms` wildcard and is in one of the room subscriptions
|
||||
# room2: ❌ Not requested
|
||||
# room3: ✅ Shows up because of default `lists` wildcard and is in a list
|
||||
# room4: ✅ Shows up because of default `lists` wildcard and is in a list
|
||||
# room5: ✅ Shows up because of default `lists` wildcard and is in a list
|
||||
self.assertIncludes(
|
||||
{
|
||||
room_id_to_human_name_map[room_id]
|
||||
for room_id in response_body["extensions"][extension_name]
|
||||
.get("rooms")
|
||||
.keys()
|
||||
},
|
||||
{"room1", "room3", "room4", "room5"},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Empty list will return nothing
|
||||
sync_body = {
|
||||
**main_sync_body,
|
||||
"extensions": {
|
||||
extension_name: {
|
||||
"enabled": True,
|
||||
"lists": [],
|
||||
"rooms": [],
|
||||
}
|
||||
},
|
||||
}
|
||||
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# room1: ❌ Not requested
|
||||
# room2: ❌ Not requested
|
||||
# room3: ❌ Not requested
|
||||
# room4: ❌ Not requested
|
||||
# room5: ❌ Not requested
|
||||
self.assertIncludes(
|
||||
{
|
||||
room_id_to_human_name_map[room_id]
|
||||
for room_id in response_body["extensions"][extension_name]
|
||||
.get("rooms")
|
||||
.keys()
|
||||
},
|
||||
set(),
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Try wildcard and none
|
||||
sync_body = {
|
||||
**main_sync_body,
|
||||
"extensions": {
|
||||
extension_name: {
|
||||
"enabled": True,
|
||||
"lists": ["*"],
|
||||
"rooms": [],
|
||||
}
|
||||
},
|
||||
}
|
||||
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# room1: ❌ Not requested
|
||||
# room2: ❌ Not requested
|
||||
# room3: ✅ Shows up because of default `lists` wildcard and is in a list
|
||||
# room4: ✅ Shows up because of default `lists` wildcard and is in a list
|
||||
# room5: ✅ Shows up because of default `lists` wildcard and is in a list
|
||||
self.assertIncludes(
|
||||
{
|
||||
room_id_to_human_name_map[room_id]
|
||||
for room_id in response_body["extensions"][extension_name]
|
||||
.get("rooms")
|
||||
.keys()
|
||||
},
|
||||
{"room3", "room4", "room5"},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Try requesting a room that is only in a list
|
||||
sync_body = {
|
||||
**main_sync_body,
|
||||
"extensions": {
|
||||
extension_name: {
|
||||
"enabled": True,
|
||||
"lists": [],
|
||||
"rooms": [room_id5],
|
||||
}
|
||||
},
|
||||
}
|
||||
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# room1: ❌ Not requested
|
||||
# room2: ❌ Not requested
|
||||
# room3: ❌ Not requested
|
||||
# room4: ❌ Not requested
|
||||
# room5: ✅ Requested via `rooms` and is in a list
|
||||
self.assertIncludes(
|
||||
{
|
||||
room_id_to_human_name_map[room_id]
|
||||
for room_id in response_body["extensions"][extension_name]
|
||||
.get("rooms")
|
||||
.keys()
|
||||
},
|
||||
{"room5"},
|
||||
exact=True,
|
||||
)
|
||||
195
tests/rest/client/sliding_sync/test_sliding_sync.py
Normal file
195
tests/rest/client/sliding_sync/test_sliding_sync.py
Normal file
@@ -0,0 +1,195 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
# Copyright (C) 2023 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
# Originally licensed under the Apache License, Version 2.0:
|
||||
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
||||
#
|
||||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
import logging
|
||||
from typing import Any, Iterable, Literal, Optional, Tuple
|
||||
|
||||
from typing_extensions import assert_never
|
||||
|
||||
from synapse.events import EventBase
|
||||
from synapse.types import JsonDict, StreamKeyType, StreamToken
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
from tests import unittest
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SlidingSyncBase(unittest.HomeserverTestCase):
|
||||
"""Base class for sliding sync test cases"""
|
||||
|
||||
sync_endpoint = "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
|
||||
|
||||
def default_config(self) -> JsonDict:
|
||||
config = super().default_config()
|
||||
# Enable sliding sync
|
||||
config["experimental_features"] = {"msc3575_enabled": True}
|
||||
return config
|
||||
|
||||
def do_sync(
|
||||
self, sync_body: JsonDict, *, since: Optional[str] = None, tok: str
|
||||
) -> Tuple[JsonDict, str]:
|
||||
"""Do a sliding sync request with given body.
|
||||
|
||||
Asserts the request was successful.
|
||||
|
||||
Attributes:
|
||||
sync_body: The full request body to use
|
||||
since: Optional since token
|
||||
tok: Access token to use
|
||||
|
||||
Returns:
|
||||
A tuple of the response body and the `pos` field.
|
||||
"""
|
||||
|
||||
sync_path = self.sync_endpoint
|
||||
if since:
|
||||
sync_path += f"?pos={since}"
|
||||
|
||||
channel = self.make_request(
|
||||
method="POST",
|
||||
path=sync_path,
|
||||
content=sync_body,
|
||||
access_token=tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
return channel.json_body, channel.json_body["pos"]
|
||||
|
||||
def _assertRequiredStateIncludes(
|
||||
self,
|
||||
actual_required_state: Any,
|
||||
expected_state_events: Iterable[EventBase],
|
||||
exact: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
Wrapper around `assertIncludes` to give slightly better looking diff error
|
||||
messages that include some context "$event_id (type, state_key)".
|
||||
|
||||
Args:
|
||||
actual_required_state: The "required_state" of a room from a Sliding Sync
|
||||
request response.
|
||||
expected_state_events: The expected state events to be included in the
|
||||
`actual_required_state`.
|
||||
exact: Whether the actual state should be exactly equal to the expected
|
||||
state (no extras).
|
||||
"""
|
||||
|
||||
assert isinstance(actual_required_state, list)
|
||||
for event in actual_required_state:
|
||||
assert isinstance(event, dict)
|
||||
|
||||
self.assertIncludes(
|
||||
{
|
||||
f'{event["event_id"]} ("{event["type"]}", "{event["state_key"]}")'
|
||||
for event in actual_required_state
|
||||
},
|
||||
{
|
||||
f'{event.event_id} ("{event.type}", "{event.state_key}")'
|
||||
for event in expected_state_events
|
||||
},
|
||||
exact=exact,
|
||||
# Message to help understand the diff in context
|
||||
message=str(actual_required_state),
|
||||
)
|
||||
|
||||
def _bump_notifier_wait_for_events(
|
||||
self,
|
||||
user_id: str,
|
||||
wake_stream_key: Literal[
|
||||
StreamKeyType.ACCOUNT_DATA,
|
||||
StreamKeyType.PRESENCE,
|
||||
],
|
||||
) -> None:
|
||||
"""
|
||||
Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
|
||||
Sync results.
|
||||
|
||||
Args:
|
||||
user_id: The user ID to wake up the notifier for
|
||||
wake_stream_key: The stream key to wake up. This will create an actual new
|
||||
entity in that stream so it's best to choose one that won't affect the
|
||||
Sliding Sync results you're testing for. In other words, if your testing
|
||||
account data, choose `StreamKeyType.PRESENCE` instead. We support two
|
||||
possible stream keys because you're probably testing one or the other so
|
||||
one is always a "safe" option.
|
||||
"""
|
||||
# We're expecting some new activity from this point onwards
|
||||
from_token = self.hs.get_event_sources().get_current_token()
|
||||
|
||||
triggered_notifier_wait_for_events = False
|
||||
|
||||
async def _on_new_acivity(
|
||||
before_token: StreamToken, after_token: StreamToken
|
||||
) -> bool:
|
||||
nonlocal triggered_notifier_wait_for_events
|
||||
triggered_notifier_wait_for_events = True
|
||||
return True
|
||||
|
||||
notifier = self.hs.get_notifier()
|
||||
|
||||
# Listen for some new activity for the user. We're just trying to confirm that
|
||||
# our bump below actually does what we think it does (triggers new activity for
|
||||
# the user).
|
||||
result_awaitable = notifier.wait_for_events(
|
||||
user_id,
|
||||
1000,
|
||||
_on_new_acivity,
|
||||
from_token=from_token,
|
||||
)
|
||||
|
||||
# Update the account data or presence so that `notifier.wait_for_events(...)`
|
||||
# wakes up. We chose these two options because they're least likely to show up
|
||||
# in the Sliding Sync response so it won't affect whether we have results.
|
||||
if wake_stream_key == StreamKeyType.ACCOUNT_DATA:
|
||||
self.get_success(
|
||||
self.hs.get_account_data_handler().add_account_data_for_user(
|
||||
user_id,
|
||||
"org.matrix.foobarbaz",
|
||||
{"foo": "bar"},
|
||||
)
|
||||
)
|
||||
elif wake_stream_key == StreamKeyType.PRESENCE:
|
||||
sending_user_id = self.register_user(
|
||||
"user_bump_notifier_wait_for_events_" + random_string(10), "pass"
|
||||
)
|
||||
sending_user_tok = self.login(sending_user_id, "pass")
|
||||
test_msg = {"foo": "bar"}
|
||||
chan = self.make_request(
|
||||
"PUT",
|
||||
"/_matrix/client/r0/sendToDevice/m.test/1234",
|
||||
content={"messages": {user_id: {"d1": test_msg}}},
|
||||
access_token=sending_user_tok,
|
||||
)
|
||||
self.assertEqual(chan.code, 200, chan.result)
|
||||
else:
|
||||
assert_never(wake_stream_key)
|
||||
|
||||
# Wait for our notifier result
|
||||
self.get_success(result_awaitable)
|
||||
|
||||
if not triggered_notifier_wait_for_events:
|
||||
raise AssertionError(
|
||||
"Expected `notifier.wait_for_events(...)` to be triggered"
|
||||
)
|
||||
|
||||
|
||||
# FIXME: Do not merge like this. We need to resolve everything after
|
||||
# https://github.com/element-hq/synapse/pull/17504 merges.
|
||||
Reference in New Issue
Block a user