mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Compare commits
15 Commits
develop
...
erikj/ss_i
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
234e4cb83d | ||
|
|
27848818c2 | ||
|
|
2968f2e3b8 | ||
|
|
a90c40812a | ||
|
|
e2c47bf4e8 | ||
|
|
de6e3bdee8 | ||
|
|
185831754e | ||
|
|
e2a88e44ef | ||
|
|
53273db3e8 | ||
|
|
d44f7e12b1 | ||
|
|
f3a4cfb8b4 | ||
|
|
f3030af575 | ||
|
|
e8df0d78a2 | ||
|
|
1ad1cce3f2 | ||
|
|
30263b43c2 |
1
changelog.d/17447.feature
Normal file
1
changelog.d/17447.feature
Normal file
@@ -0,0 +1 @@
|
||||
Track which rooms have been sent to clients in the experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
|
||||
1
changelog.d/17452.misc
Normal file
1
changelog.d/17452.misc
Normal file
@@ -0,0 +1 @@
|
||||
Change sliding sync to use their own token format in preparation for storing per-connection state.
|
||||
@@ -201,8 +201,8 @@ netaddr = ">=0.7.18"
|
||||
# add a lower bound to the Jinja2 dependency.
|
||||
Jinja2 = ">=3.0"
|
||||
bleach = ">=1.4.3"
|
||||
# We use `Self`, which were added in `typing-extensions` 4.0.
|
||||
typing-extensions = ">=4.0"
|
||||
# We use `assert_never`, which were added in `typing-extensions` 4.1.
|
||||
typing-extensions = ">=4.1"
|
||||
# We enforce that we have a `cryptography` version that bundles an `openssl`
|
||||
# with the latest security patches.
|
||||
cryptography = ">=3.4.7"
|
||||
|
||||
@@ -18,11 +18,13 @@
|
||||
#
|
||||
#
|
||||
import logging
|
||||
from enum import Enum
|
||||
from itertools import chain
|
||||
from typing import TYPE_CHECKING, Any, Dict, Final, List, Mapping, Optional, Set, Tuple
|
||||
|
||||
import attr
|
||||
from immutabledict import immutabledict
|
||||
from typing_extensions import assert_never
|
||||
|
||||
from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
|
||||
from synapse.events import EventBase
|
||||
@@ -36,7 +38,9 @@ from synapse.types import (
|
||||
PersistedEventPosition,
|
||||
Requester,
|
||||
RoomStreamToken,
|
||||
SlidingSyncStreamToken,
|
||||
StateMap,
|
||||
StrCollection,
|
||||
StreamKeyType,
|
||||
StreamToken,
|
||||
UserID,
|
||||
@@ -339,11 +343,13 @@ class SlidingSyncHandler:
|
||||
self.relations_handler = hs.get_relations_handler()
|
||||
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
|
||||
|
||||
self.connection_store = SlidingSyncConnectionStore()
|
||||
|
||||
async def wait_for_sync_for_user(
|
||||
self,
|
||||
requester: Requester,
|
||||
sync_config: SlidingSyncConfig,
|
||||
from_token: Optional[StreamToken] = None,
|
||||
from_token: Optional[SlidingSyncStreamToken] = None,
|
||||
timeout_ms: int = 0,
|
||||
) -> SlidingSyncResult:
|
||||
"""
|
||||
@@ -378,7 +384,7 @@ class SlidingSyncHandler:
|
||||
# this returns false, it means we timed out waiting, and we should
|
||||
# just return an empty response.
|
||||
before_wait_ts = self.clock.time_msec()
|
||||
if not await self.notifier.wait_for_stream_token(from_token):
|
||||
if not await self.notifier.wait_for_stream_token(from_token.stream_token):
|
||||
logger.warning(
|
||||
"Timed out waiting for worker to catch up. Returning empty response"
|
||||
)
|
||||
@@ -416,7 +422,7 @@ class SlidingSyncHandler:
|
||||
sync_config.user.to_string(),
|
||||
timeout_ms,
|
||||
current_sync_callback,
|
||||
from_token=from_token,
|
||||
from_token=from_token.stream_token,
|
||||
)
|
||||
|
||||
return result
|
||||
@@ -425,7 +431,7 @@ class SlidingSyncHandler:
|
||||
self,
|
||||
sync_config: SlidingSyncConfig,
|
||||
to_token: StreamToken,
|
||||
from_token: Optional[StreamToken] = None,
|
||||
from_token: Optional[SlidingSyncStreamToken] = None,
|
||||
) -> SlidingSyncResult:
|
||||
"""
|
||||
Generates the response body of a Sliding Sync result, represented as a
|
||||
@@ -446,6 +452,12 @@ class SlidingSyncHandler:
|
||||
# See https://github.com/matrix-org/matrix-doc/issues/1144
|
||||
raise NotImplementedError()
|
||||
|
||||
await self.connection_store.mark_token_seen(
|
||||
user_id,
|
||||
conn_id=sync_config.connection_id(),
|
||||
from_token=from_token,
|
||||
)
|
||||
|
||||
# Get all of the room IDs that the user should be able to see in the sync
|
||||
# response
|
||||
has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
|
||||
@@ -458,7 +470,7 @@ class SlidingSyncHandler:
|
||||
await self.get_room_membership_for_user_at_to_token(
|
||||
user=sync_config.user,
|
||||
to_token=to_token,
|
||||
from_token=from_token,
|
||||
from_token=from_token.stream_token if from_token else None,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -589,11 +601,36 @@ class SlidingSyncHandler:
|
||||
else:
|
||||
relevant_room_map[room_id] = room_sync_config
|
||||
|
||||
# Filter out rooms that haven't received updates and we've sent down
|
||||
# previously.
|
||||
if from_token:
|
||||
rooms_should_send = set()
|
||||
for room_id in relevant_room_map:
|
||||
status = await self.connection_store.have_sent_room(
|
||||
user_id,
|
||||
sync_config.connection_id(),
|
||||
from_token.connection_token,
|
||||
room_id,
|
||||
)
|
||||
if status.status != HaveSentRoomFlag.LIVE:
|
||||
rooms_should_send.add(room_id)
|
||||
|
||||
# TODO: Also check current state delta stream
|
||||
rooms_that_have_updates = (
|
||||
self.store._events_stream_cache.get_entities_changed(
|
||||
relevant_room_map, from_token.stream_token.room_key.stream
|
||||
)
|
||||
)
|
||||
rooms_should_send.update(rooms_that_have_updates)
|
||||
relevant_room_map = {
|
||||
r: c for r, c in relevant_room_map.items() if r in rooms_should_send
|
||||
}
|
||||
|
||||
# Fetch room data
|
||||
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
|
||||
for room_id, room_sync_config in relevant_room_map.items():
|
||||
room_sync_result = await self.get_room_sync_data(
|
||||
user=sync_config.user,
|
||||
sync_config=sync_config,
|
||||
room_id=room_id,
|
||||
room_sync_config=room_sync_config,
|
||||
room_membership_for_user_at_to_token=room_membership_for_user_map[
|
||||
@@ -609,8 +646,21 @@ class SlidingSyncHandler:
|
||||
sync_config=sync_config, to_token=to_token
|
||||
)
|
||||
|
||||
if has_lists or has_room_subscriptions:
|
||||
connection_token = await self.connection_store.record_rooms(
|
||||
user_id,
|
||||
conn_id=sync_config.connection_id(),
|
||||
from_token=from_token,
|
||||
sent_room_ids=relevant_room_map.keys(),
|
||||
unsent_room_ids=[], # TODO: We currently ssume that we have sent down all updates.
|
||||
)
|
||||
elif from_token:
|
||||
connection_token = from_token.connection_token
|
||||
else:
|
||||
connection_token = 0
|
||||
|
||||
return SlidingSyncResult(
|
||||
next_pos=to_token,
|
||||
next_pos=SlidingSyncStreamToken(to_token, connection_token),
|
||||
lists=lists,
|
||||
rooms=rooms,
|
||||
extensions=extensions,
|
||||
@@ -1342,11 +1392,11 @@ class SlidingSyncHandler:
|
||||
|
||||
async def get_room_sync_data(
|
||||
self,
|
||||
user: UserID,
|
||||
sync_config: SlidingSyncConfig,
|
||||
room_id: str,
|
||||
room_sync_config: RoomSyncConfig,
|
||||
room_membership_for_user_at_to_token: _RoomMembershipForUser,
|
||||
from_token: Optional[StreamToken],
|
||||
from_token: Optional[SlidingSyncStreamToken],
|
||||
to_token: StreamToken,
|
||||
) -> SlidingSyncResult.RoomResult:
|
||||
"""
|
||||
@@ -1364,6 +1414,38 @@ class SlidingSyncHandler:
|
||||
from_token: The point in the stream to sync from.
|
||||
to_token: The point in the stream to sync up to.
|
||||
"""
|
||||
user = sync_config.user
|
||||
|
||||
# Determine whether we should limit the timeline to the token range.
|
||||
#
|
||||
# We should return historical messages (before token range) in the
|
||||
# following cases because we want clients to be able to show a basic
|
||||
# screen of information:
|
||||
# - Initial sync (because no `from_token` to limit us anyway)
|
||||
# - When users `newly_joined`
|
||||
# - For an incremental sync where we haven't sent it down this
|
||||
# connection before
|
||||
to_bound = None
|
||||
initial = True
|
||||
if from_token and not room_membership_for_user_at_to_token.newly_joined:
|
||||
room_status = await self.connection_store.have_sent_room(
|
||||
user_id=user.to_string(),
|
||||
conn_id=sync_config.connection_id(),
|
||||
connection_token=from_token.connection_token,
|
||||
room_id=room_id,
|
||||
)
|
||||
if room_status.status == HaveSentRoomFlag.LIVE:
|
||||
to_bound = from_token.stream_token.room_key
|
||||
initial = False
|
||||
elif room_status.status == HaveSentRoomFlag.PREVIOUSLY:
|
||||
assert room_status.last_token is not None
|
||||
to_bound = room_status.last_token
|
||||
initial = False
|
||||
elif room_status.status == HaveSentRoomFlag.NEVER:
|
||||
to_bound = None
|
||||
initial = True
|
||||
else:
|
||||
assert_never(room_status.status)
|
||||
|
||||
# Assemble the list of timeline events
|
||||
#
|
||||
@@ -1400,22 +1482,6 @@ class SlidingSyncHandler:
|
||||
room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
|
||||
)
|
||||
|
||||
# Determine whether we should limit the timeline to the token range.
|
||||
#
|
||||
# We should return historical messages (before token range) in the
|
||||
# following cases because we want clients to be able to show a basic
|
||||
# screen of information:
|
||||
# - Initial sync (because no `from_token` to limit us anyway)
|
||||
# - When users `newly_joined`
|
||||
# - TODO: For an incremental sync where we haven't sent it down this
|
||||
# connection before
|
||||
to_bound = (
|
||||
from_token.room_key
|
||||
if from_token is not None
|
||||
and not room_membership_for_user_at_to_token.newly_joined
|
||||
else None
|
||||
)
|
||||
|
||||
timeline_events, new_room_key = await self.store.paginate_room_events(
|
||||
room_id=room_id,
|
||||
from_key=from_bound,
|
||||
@@ -1477,7 +1543,9 @@ class SlidingSyncHandler:
|
||||
instance_name=timeline_event.internal_metadata.instance_name,
|
||||
stream=timeline_event.internal_metadata.stream_ordering,
|
||||
)
|
||||
if persisted_position.persisted_after(from_token.room_key):
|
||||
if persisted_position.persisted_after(
|
||||
from_token.stream_token.room_key
|
||||
):
|
||||
num_live += 1
|
||||
else:
|
||||
# Since we're iterating over the timeline events in
|
||||
@@ -1534,12 +1602,6 @@ class SlidingSyncHandler:
|
||||
# indicate to the client that a state reset happened. Perhaps we should indicate
|
||||
# this by setting `initial: True` and empty `required_state`.
|
||||
|
||||
# TODO: Since we can't determine whether we've already sent a room down this
|
||||
# Sliding Sync connection before (we plan to add this optimization in the
|
||||
# future), we're always returning the requested room state instead of
|
||||
# updates.
|
||||
initial = True
|
||||
|
||||
# Check whether the room has a name set
|
||||
name_state_ids = await self.get_current_state_ids_at(
|
||||
room_id=room_id,
|
||||
@@ -1683,9 +1745,17 @@ class SlidingSyncHandler:
|
||||
to_token=to_token,
|
||||
)
|
||||
else:
|
||||
# TODO: Once we can figure out if we've sent a room down this connection before,
|
||||
# we can return updates instead of the full required state.
|
||||
raise NotImplementedError()
|
||||
assert to_bound is not None
|
||||
|
||||
deltas = await self.store.get_current_state_deltas_for_room(
|
||||
room_id, to_bound, to_token.room_key
|
||||
)
|
||||
# TODO: Filter room state before fetching events
|
||||
# TODO: Handle state resets where event_id is None
|
||||
events = await self.store.get_events(
|
||||
[d.event_id for d in deltas if d.event_id]
|
||||
)
|
||||
room_state = {(s.type, s.state_key): s for s in events.values()}
|
||||
|
||||
required_room_state: StateMap[EventBase] = {}
|
||||
if required_state_filter != StateFilter.none():
|
||||
@@ -1799,7 +1869,7 @@ class SlidingSyncHandler:
|
||||
"""
|
||||
|
||||
user_id = sync_config.user.to_string()
|
||||
device_id = sync_config.device_id
|
||||
device_id = sync_config.requester.device_id
|
||||
|
||||
# Check that this request has a valid device ID (not all requests have
|
||||
# to belong to a device, and so device_id is None), and that the
|
||||
@@ -1855,3 +1925,198 @@ class SlidingSyncHandler:
|
||||
next_batch=f"{stream_id}",
|
||||
events=messages,
|
||||
)
|
||||
|
||||
|
||||
class HaveSentRoomFlag(Enum):
|
||||
"""Flag for whether we have sent the room down a sliding sync connection.
|
||||
|
||||
The valid state changes here are:
|
||||
NEVER -> LIVE
|
||||
LIVE -> PREVIOUSLY
|
||||
PREVIOUSLY -> LIVE
|
||||
"""
|
||||
|
||||
# The room has never been sent down (or we have forgotten we have sent it
|
||||
# down).
|
||||
NEVER = 1
|
||||
|
||||
# We have previously sent the room down, but there are updates that we
|
||||
# haven't sent down.
|
||||
PREVIOUSLY = 2
|
||||
|
||||
# We have sent the room down and the client has received all updates.
|
||||
LIVE = 3
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True, slots=True, frozen=True)
|
||||
class HaveSentRoom:
|
||||
"""Whether we have sent the room down a sliding sync connection.
|
||||
|
||||
Attributes:
|
||||
status: Flag of if we have or haven't sent down the room
|
||||
last_token: If the flag is `PREVIOUSLY` then this is non-null and
|
||||
contains the last stream token of the last updates we sent down
|
||||
the room, i.e. we still need to send everything since then to the
|
||||
client.
|
||||
"""
|
||||
|
||||
status: HaveSentRoomFlag
|
||||
last_token: Optional[RoomStreamToken]
|
||||
|
||||
@staticmethod
|
||||
def previously(last_token: RoomStreamToken) -> "HaveSentRoom":
|
||||
"""Constructor for `PREVIOUSLY` flag."""
|
||||
return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token)
|
||||
|
||||
|
||||
HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None)
|
||||
HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None)
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True)
|
||||
class SlidingSyncConnectionStore:
|
||||
"""In-memory store of per-connection state, including what rooms we have
|
||||
previously sent down a sliding sync connection.
|
||||
|
||||
Note: This is NOT safe to run in a worker setup.
|
||||
|
||||
The complication here is that we need to handle requests being resent, i.e.
|
||||
if we sent down a room in a response that the client received, we must
|
||||
consider the room *not* sent when we get the request again.
|
||||
|
||||
This is handled by using an integer "token", which is returned to the client
|
||||
as part of the sync token. For each connection we store a mapping from
|
||||
tokens to the room states, and create a new entry when we send down new
|
||||
rooms.
|
||||
|
||||
Note that for any given sliding sync connection we will only store a maximum
|
||||
of two different tokens: the previous token from the request and a new token
|
||||
sent in the response. When we receive a request with a given token, we then
|
||||
clear out all other entries with a different token.
|
||||
|
||||
Attributes:
|
||||
_connections: Mapping from `(user_id, conn_id)` to mapping of `token`
|
||||
to mapping of room ID to `HaveSentRoom`.
|
||||
"""
|
||||
|
||||
# `(user_id, conn_id)` -> `token` -> `room_id` -> `HaveSentRoom`
|
||||
_connections: Dict[Tuple[str, str], Dict[int, Dict[str, HaveSentRoom]]] = (
|
||||
attr.Factory(dict)
|
||||
)
|
||||
|
||||
async def have_sent_room(
|
||||
self, user_id: str, conn_id: str, connection_token: int, room_id: str
|
||||
) -> HaveSentRoom:
|
||||
"""Whether for the given user_id/conn_id/token, return whether we have
|
||||
previously sent the room down
|
||||
"""
|
||||
|
||||
sync_statuses = self._connections.setdefault((user_id, conn_id), {})
|
||||
room_status = sync_statuses.get(connection_token, {}).get(
|
||||
room_id, HAVE_SENT_ROOM_NEVER
|
||||
)
|
||||
|
||||
return room_status
|
||||
|
||||
async def record_rooms(
|
||||
self,
|
||||
user_id: str,
|
||||
conn_id: str,
|
||||
from_token: Optional[SlidingSyncStreamToken],
|
||||
*,
|
||||
sent_room_ids: StrCollection,
|
||||
unsent_room_ids: StrCollection,
|
||||
) -> int:
|
||||
"""Record which rooms we have/haven't sent down in a new response
|
||||
|
||||
Attributes:
|
||||
user_id
|
||||
conn_id
|
||||
from_token: The since token from the request, if any
|
||||
sent_room_ids: The set of room IDs that we have sent down as
|
||||
part of this request (only needs to be ones we didn't
|
||||
previously sent down).
|
||||
unsent_room_ids: The set of room IDs that have had updates
|
||||
since the `last_room_token`, but which were not included in
|
||||
this request
|
||||
"""
|
||||
prev_connection_token = 0
|
||||
if from_token is not None:
|
||||
prev_connection_token = from_token.connection_token
|
||||
|
||||
# If there are no changes then this is a noop.
|
||||
if not sent_room_ids and not unsent_room_ids:
|
||||
return prev_connection_token
|
||||
|
||||
sync_statuses = self._connections.setdefault((user_id, conn_id), {})
|
||||
|
||||
# Generate a new token, removing any existing entries in that token
|
||||
# (which can happen if requests get resent).
|
||||
new_store_token = prev_connection_token + 1
|
||||
sync_statuses.pop(new_store_token, None)
|
||||
|
||||
# Copy over and update the room mappings.
|
||||
new_room_statuses = dict(sync_statuses.get(prev_connection_token, {}))
|
||||
|
||||
# Whether we have updated the `new_room_statuses`, if we don't by the
|
||||
# end we can treat this as a noop.
|
||||
have_updated = False
|
||||
for room_id in sent_room_ids:
|
||||
new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
|
||||
have_updated = True
|
||||
|
||||
# Whether we add/update the entries for unsent rooms depends on the
|
||||
# existing entry:
|
||||
# - LIVE: We have previously sent down everything up to
|
||||
# `last_room_token, so we update the entry to be `PREVIOUSLY` with
|
||||
# `last_room_token`.
|
||||
# - PREVIOUSLY: We have previously sent down everything up to *a*
|
||||
# given token, so we don't need to update the entry.
|
||||
# - NEVER: We have never previously sent down the room, and we haven't
|
||||
# sent anything down this time either so we leave it as NEVER.
|
||||
|
||||
# Work out the new state for unsent rooms that were `LIVE`.
|
||||
if from_token:
|
||||
new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
|
||||
else:
|
||||
new_unsent_state = HAVE_SENT_ROOM_NEVER
|
||||
|
||||
for room_id in unsent_room_ids:
|
||||
prev_state = new_room_statuses.get(room_id)
|
||||
if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE:
|
||||
new_room_statuses[room_id] = new_unsent_state
|
||||
have_updated = True
|
||||
|
||||
if not have_updated:
|
||||
return prev_connection_token
|
||||
|
||||
sync_statuses[new_store_token] = new_room_statuses
|
||||
|
||||
return new_store_token
|
||||
|
||||
async def mark_token_seen(
|
||||
self,
|
||||
user_id: str,
|
||||
conn_id: str,
|
||||
from_token: Optional[SlidingSyncStreamToken],
|
||||
) -> None:
|
||||
"""We have received a request with the given token, so we can clear out
|
||||
any other tokens associated with the connection.
|
||||
|
||||
If there is no from token then we have started afresh, and so we delete
|
||||
all tokens associated with the device.
|
||||
"""
|
||||
# Clear out any tokens for the connection that doesn't match the one
|
||||
# from the request.
|
||||
|
||||
sync_statuses = self._connections.pop((user_id, conn_id), {})
|
||||
if from_token is None:
|
||||
return
|
||||
|
||||
sync_statuses = {
|
||||
i: room_statuses
|
||||
for i, room_statuses in sync_statuses.items()
|
||||
if i == from_token.connection_token
|
||||
}
|
||||
if sync_statuses:
|
||||
self._connections[(user_id, conn_id)] = sync_statuses
|
||||
|
||||
@@ -54,7 +54,7 @@ from synapse.http.servlet import (
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.opentracing import trace_with_opname
|
||||
from synapse.rest.admin.experimental_features import ExperimentalFeature
|
||||
from synapse.types import JsonDict, Requester, StreamToken
|
||||
from synapse.types import JsonDict, Requester, SlidingSyncStreamToken, StreamToken
|
||||
from synapse.types.rest.client import SlidingSyncBody
|
||||
from synapse.util import json_decoder
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
@@ -881,7 +881,6 @@ class SlidingSyncRestServlet(RestServlet):
|
||||
)
|
||||
|
||||
user = requester.user
|
||||
device_id = requester.device_id
|
||||
|
||||
timeout = parse_integer(request, "timeout", default=0)
|
||||
# Position in the stream
|
||||
@@ -889,7 +888,9 @@ class SlidingSyncRestServlet(RestServlet):
|
||||
|
||||
from_token = None
|
||||
if from_token_string is not None:
|
||||
from_token = await StreamToken.from_string(self.store, from_token_string)
|
||||
from_token = await SlidingSyncStreamToken.from_string(
|
||||
self.store, from_token_string
|
||||
)
|
||||
|
||||
# TODO: We currently don't know whether we're going to use sticky params or
|
||||
# maybe some filters like sync v2 where they are built up once and referenced
|
||||
@@ -900,11 +901,12 @@ class SlidingSyncRestServlet(RestServlet):
|
||||
|
||||
sync_config = SlidingSyncConfig(
|
||||
user=user,
|
||||
device_id=device_id,
|
||||
requester=requester,
|
||||
# FIXME: Currently, we're just manually copying the fields from the
|
||||
# `SlidingSyncBody` into the config. How can we gurantee into the future
|
||||
# `SlidingSyncBody` into the config. How can we guarantee into the future
|
||||
# that we don't forget any? I would like something more structured like
|
||||
# `copy_attributes(from=body, to=config)`
|
||||
conn_id=body.conn_id,
|
||||
lists=body.lists,
|
||||
room_subscriptions=body.room_subscriptions,
|
||||
extensions=body.extensions,
|
||||
|
||||
@@ -26,6 +26,8 @@ import attr
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import LoggingTransaction
|
||||
from synapse.storage.databases.main.stream import _filter_results_by_stream
|
||||
from synapse.types import RoomStreamToken
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -156,3 +158,39 @@ class StateDeltasStore(SQLBaseStore):
|
||||
"get_max_stream_id_in_current_state_deltas",
|
||||
self._get_max_stream_id_in_current_state_deltas_txn,
|
||||
)
|
||||
|
||||
async def get_current_state_deltas_for_room(
|
||||
self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
|
||||
) -> List[StateDelta]:
|
||||
"""Get the state deltas between that have happened between two
|
||||
tokens."""
|
||||
|
||||
def get_current_state_deltas_for_room_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> List[StateDelta]:
|
||||
sql = """
|
||||
SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
|
||||
FROM current_state_delta_stream
|
||||
WHERE room_id = ? AND ? < stream_id AND stream_id <= ?
|
||||
ORDER BY stream_id ASC
|
||||
"""
|
||||
txn.execute(
|
||||
sql, (room_id, from_token.stream, to_token.get_max_stream_pos())
|
||||
)
|
||||
|
||||
return [
|
||||
StateDelta(
|
||||
stream_id=row[1],
|
||||
room_id=room_id,
|
||||
event_type=row[2],
|
||||
state_key=row[3],
|
||||
event_id=row[4],
|
||||
prev_event_id=row[5],
|
||||
)
|
||||
for row in txn
|
||||
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
|
||||
]
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
|
||||
)
|
||||
|
||||
@@ -1137,6 +1137,43 @@ StreamToken.START = StreamToken(
|
||||
)
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class SlidingSyncStreamToken:
|
||||
"""The same as a `StreamToken`, but includes an extra field at the start for
|
||||
the sliding sync connection token (separated by a '/'). This is used to
|
||||
store per-connection state.
|
||||
|
||||
This then looks something like:
|
||||
5/s2633508_17_338_6732159_1082514_541479_274711_265584_1_379
|
||||
"""
|
||||
|
||||
stream_token: StreamToken
|
||||
connection_token: int
|
||||
|
||||
@staticmethod
|
||||
@cancellable
|
||||
async def from_string(store: "DataStore", string: str) -> "SlidingSyncStreamToken":
|
||||
"""Creates a SlidingSyncStreamToken from its textual representation."""
|
||||
try:
|
||||
connection_token_str, stream_token_str = string.split("/", 1)
|
||||
connection_token = int(connection_token_str)
|
||||
stream_token = await StreamToken.from_string(store, stream_token_str)
|
||||
|
||||
return SlidingSyncStreamToken(
|
||||
stream_token=stream_token,
|
||||
connection_token=connection_token,
|
||||
)
|
||||
except CancelledError:
|
||||
raise
|
||||
except Exception:
|
||||
raise SynapseError(400, "Invalid stream token")
|
||||
|
||||
async def to_string(self, store: "DataStore") -> str:
|
||||
"""Serializes the token to a string"""
|
||||
stream_token_str = await self.stream_token.to_string(store)
|
||||
return f"{self.connection_token}/{stream_token_str}"
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class PersistedPosition:
|
||||
"""Position of a newly persisted row with instance that persisted it."""
|
||||
|
||||
@@ -31,7 +31,14 @@ else:
|
||||
from pydantic import Extra
|
||||
|
||||
from synapse.events import EventBase
|
||||
from synapse.types import JsonDict, JsonMapping, StreamToken, UserID
|
||||
from synapse.types import (
|
||||
JsonDict,
|
||||
JsonMapping,
|
||||
Requester,
|
||||
SlidingSyncStreamToken,
|
||||
StreamToken,
|
||||
UserID,
|
||||
)
|
||||
from synapse.types.rest.client import SlidingSyncBody
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -102,7 +109,7 @@ class SlidingSyncConfig(SlidingSyncBody):
|
||||
"""
|
||||
|
||||
user: UserID
|
||||
device_id: Optional[str]
|
||||
requester: Requester
|
||||
|
||||
# Pydantic config
|
||||
class Config:
|
||||
@@ -113,6 +120,31 @@ class SlidingSyncConfig(SlidingSyncBody):
|
||||
# Allow custom types like `UserID` to be used in the model
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
def connection_id(self) -> str:
|
||||
"""Return a string identifier for this connection. May clash with
|
||||
connection IDs from different users.
|
||||
|
||||
This is generally a combination of device ID and conn_id. However, both
|
||||
these two are optional (e.g. puppet access tokens don't have device
|
||||
IDs), so this handles those edge cases.
|
||||
"""
|
||||
|
||||
# `conn_id` can be null, in which case we default to the empty string
|
||||
# (if conn ID is empty then the client can't have multiple sync loops)
|
||||
conn_id = self.conn_id or ""
|
||||
|
||||
if self.requester.device_id:
|
||||
return f"D/{self.requester.device_id}/{conn_id}"
|
||||
|
||||
if self.requester.access_token_id:
|
||||
# If we don't have a device, then the access token ID should be a
|
||||
# stable ID.
|
||||
return f"A/{self.requester.access_token_id}/{conn_id}"
|
||||
|
||||
# If we have neither then its likely an AS or some weird token. Either
|
||||
# way we can just fail here.
|
||||
raise Exception("Cannot use sliding sync with access token type")
|
||||
|
||||
|
||||
class OperationType(Enum):
|
||||
"""
|
||||
@@ -287,7 +319,7 @@ class SlidingSyncResult:
|
||||
def __bool__(self) -> bool:
|
||||
return bool(self.to_device)
|
||||
|
||||
next_pos: StreamToken
|
||||
next_pos: SlidingSyncStreamToken
|
||||
lists: Dict[str, SlidingWindowList]
|
||||
rooms: Dict[str, RoomResult]
|
||||
extensions: Extensions
|
||||
@@ -300,7 +332,7 @@ class SlidingSyncResult:
|
||||
return bool(self.lists or self.rooms or self.extensions)
|
||||
|
||||
@staticmethod
|
||||
def empty(next_pos: StreamToken) -> "SlidingSyncResult":
|
||||
def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
|
||||
"Return a new empty result"
|
||||
return SlidingSyncResult(
|
||||
next_pos=next_pos,
|
||||
|
||||
@@ -120,6 +120,9 @@ class SlidingSyncBody(RequestBodyModel):
|
||||
Sliding Sync API request body.
|
||||
|
||||
Attributes:
|
||||
conn_id: An optional string to identify this connection to the server. If this
|
||||
is missing, only 1 sliding sync connection can be made to the server at
|
||||
any one time.
|
||||
lists: Sliding window API. A map of list key to list information
|
||||
(:class:`SlidingSyncList`). Max lists: 100. The list keys should be
|
||||
arbitrary strings which the client is using to refer to the list. Keep this
|
||||
@@ -315,6 +318,8 @@ class SlidingSyncBody(RequestBodyModel):
|
||||
|
||||
to_device: Optional[ToDeviceExtension] = None
|
||||
|
||||
conn_id: Optional[str]
|
||||
|
||||
# mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
|
||||
if TYPE_CHECKING:
|
||||
lists: Optional[Dict[str, SlidingSyncList]] = None
|
||||
|
||||
@@ -50,7 +50,14 @@ from synapse.rest.client import (
|
||||
sync,
|
||||
)
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken, UserID
|
||||
from synapse.types import (
|
||||
JsonDict,
|
||||
RoomStreamToken,
|
||||
SlidingSyncStreamToken,
|
||||
StreamKeyType,
|
||||
StreamToken,
|
||||
UserID,
|
||||
)
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests import unittest
|
||||
@@ -1448,7 +1455,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
|
||||
future_position_token_serialized = self.get_success(
|
||||
future_position_token.to_string(self.store)
|
||||
SlidingSyncStreamToken(future_position_token, 0).to_string(self.store)
|
||||
)
|
||||
|
||||
# Make the Sliding Sync request
|
||||
@@ -2608,7 +2615,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
|
||||
room_id1, "activity before token2", tok=user2_tok
|
||||
)
|
||||
|
||||
from_token = self.event_sources.get_current_token()
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint,
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 4,
|
||||
}
|
||||
}
|
||||
},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
from_token = channel.json_body["pos"]
|
||||
|
||||
# Join the room after the `from_token` which will make us consider this room as
|
||||
# `newly_joined`.
|
||||
@@ -2630,8 +2652,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
|
||||
# Make an incremental Sliding Sync request (what we're trying to test)
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint
|
||||
+ f"?pos={self.get_success(from_token.to_string(self.store))}",
|
||||
self.sync_endpoint + f"?pos={from_token}",
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
@@ -2817,7 +2838,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
|
||||
self.helper.send(room_id1, "activity after invite3", tok=user2_tok)
|
||||
self.helper.send(room_id1, "activity after invite4", tok=user2_tok)
|
||||
|
||||
from_token = self.event_sources.get_current_token()
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint,
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 4,
|
||||
}
|
||||
}
|
||||
},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
from_token = channel.json_body["pos"]
|
||||
|
||||
self.helper.send(room_id1, "activity after token5", tok=user2_tok)
|
||||
self.helper.send(room_id1, "activity after toekn6", tok=user2_tok)
|
||||
@@ -2825,8 +2861,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
|
||||
# Make the Sliding Sync request
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint
|
||||
+ f"?pos={self.get_success(from_token.to_string(self.store))}",
|
||||
self.sync_endpoint + f"?pos={from_token}",
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
@@ -3074,7 +3109,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
|
||||
self.helper.send(room_id1, "activity after invite3", tok=user2_tok)
|
||||
self.helper.send(room_id1, "activity after invite4", tok=user2_tok)
|
||||
|
||||
from_token = self.event_sources.get_current_token()
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint,
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 4,
|
||||
}
|
||||
}
|
||||
},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
from_token = channel.json_body["pos"]
|
||||
|
||||
self.helper.send(room_id1, "activity after token5", tok=user2_tok)
|
||||
self.helper.send(room_id1, "activity after toekn6", tok=user2_tok)
|
||||
@@ -3082,8 +3132,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
|
||||
# Make the Sliding Sync request
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint
|
||||
+ f"?pos={self.get_success(from_token.to_string(self.store))}",
|
||||
self.sync_endpoint + f"?pos={from_token}",
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
@@ -3239,7 +3288,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
|
||||
self.helper.send(room_id1, "activity before2", tok=user2_tok)
|
||||
self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||
|
||||
from_token = self.event_sources.get_current_token()
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint,
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 4,
|
||||
}
|
||||
}
|
||||
},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
from_token = channel.json_body["pos"]
|
||||
|
||||
event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok)
|
||||
event_response4 = self.helper.send(room_id1, "activity after4", tok=user2_tok)
|
||||
@@ -3255,8 +3319,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
|
||||
# Make the Sliding Sync request
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint
|
||||
+ f"?pos={self.get_success(from_token.to_string(self.store))}",
|
||||
self.sync_endpoint + f"?pos={from_token}",
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
@@ -3316,15 +3379,29 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
self.helper.send(room_id1, "activity after3", tok=user2_tok)
|
||||
|
||||
from_token = self.event_sources.get_current_token()
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint,
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 4,
|
||||
}
|
||||
}
|
||||
},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
from_token = channel.json_body["pos"]
|
||||
|
||||
self.helper.send(room_id1, "activity after4", tok=user2_tok)
|
||||
|
||||
# Make the Sliding Sync request
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint
|
||||
+ f"?pos={self.get_success(from_token.to_string(self.store))}",
|
||||
self.sync_endpoint + f"?pos={from_token}",
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
@@ -3451,13 +3528,27 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
|
||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||
|
||||
after_room_token = self.event_sources.get_current_token()
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint,
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 4,
|
||||
}
|
||||
}
|
||||
},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
after_room_token = channel.json_body["pos"]
|
||||
|
||||
# Make the Sliding Sync request
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint
|
||||
+ f"?pos={self.get_success(after_room_token.to_string(self.store))}",
|
||||
self.sync_endpoint + f"?pos={after_room_token}",
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
@@ -3476,22 +3567,9 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
)
|
||||
|
||||
# The returned state doesn't change from initial to incremental sync. In the
|
||||
# future, we will only return updates but only if we've sent the room down the
|
||||
# We only return updates but only if we've sent the room down the
|
||||
# connection before.
|
||||
self._assertRequiredStateIncludes(
|
||||
channel.json_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Create, "")],
|
||||
state_map[(EventTypes.RoomHistoryVisibility, "")],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
|
||||
self.assertNotIn(room_id1, channel.json_body["rooms"])
|
||||
|
||||
def test_rooms_required_state_wildcard(self) -> None:
|
||||
"""
|
||||
@@ -3729,7 +3807,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
|
||||
user3_id = self.register_user("user3", "pass")
|
||||
user3_tok = self.login(user3_id, "pass")
|
||||
|
||||
from_token = self.event_sources.get_current_token()
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint,
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [],
|
||||
"timeline_limit": 4,
|
||||
}
|
||||
}
|
||||
},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
from_token = channel.json_body["pos"]
|
||||
|
||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||
@@ -3767,8 +3860,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
|
||||
# Make the Sliding Sync request with lazy loading for the room members
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint
|
||||
+ f"?pos={self.get_success(from_token.to_string(self.store))}",
|
||||
self.sync_endpoint + f"?pos={from_token}",
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
@@ -4230,6 +4322,187 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
|
||||
channel.json_body["rooms"].get(room_id1), channel.json_body["rooms"]
|
||||
)
|
||||
|
||||
def test_incremental_sync_incremental_state(self) -> None:
|
||||
"""Test that we only get state updates in incremental sync for rooms
|
||||
we've already seen.
|
||||
"""
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||
|
||||
# Make the Sliding Sync request
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint,
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [
|
||||
[EventTypes.Create, ""],
|
||||
[EventTypes.RoomHistoryVisibility, ""],
|
||||
# This one doesn't exist in the room
|
||||
[EventTypes.Name, ""],
|
||||
],
|
||||
"timeline_limit": 0,
|
||||
}
|
||||
}
|
||||
},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
from_token = channel.json_body["pos"]
|
||||
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
)
|
||||
|
||||
self._assertRequiredStateIncludes(
|
||||
channel.json_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Create, "")],
|
||||
state_map[(EventTypes.RoomHistoryVisibility, "")],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Send a state event
|
||||
self.helper.send_state(
|
||||
room_id1, EventTypes.Name, body={"name": "foo"}, tok=user2_tok
|
||||
)
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint + f"?pos={from_token}",
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [
|
||||
[EventTypes.Create, ""],
|
||||
[EventTypes.RoomHistoryVisibility, ""],
|
||||
[EventTypes.Name, ""],
|
||||
],
|
||||
"timeline_limit": 0,
|
||||
}
|
||||
}
|
||||
},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
)
|
||||
|
||||
self._assertRequiredStateIncludes(
|
||||
channel.json_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Name, "")],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
def test_incremental_sync_full_state_new_room(self) -> None:
|
||||
"""Test that we get state all state in incremental sync for rooms that
|
||||
we haven't seen before.
|
||||
"""
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||
|
||||
room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.join(room_id2, user1_id, tok=user1_tok)
|
||||
|
||||
# Make the Sliding Sync request, we'll only receive room_id2
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint,
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 0]],
|
||||
"required_state": [
|
||||
[EventTypes.Create, ""],
|
||||
[EventTypes.RoomHistoryVisibility, ""],
|
||||
# This one doesn't exist in the room
|
||||
[EventTypes.Name, ""],
|
||||
],
|
||||
"timeline_limit": 0,
|
||||
}
|
||||
}
|
||||
},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
from_token = channel.json_body["pos"]
|
||||
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id2)
|
||||
)
|
||||
|
||||
self._assertRequiredStateIncludes(
|
||||
channel.json_body["rooms"][room_id2]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Create, "")],
|
||||
state_map[(EventTypes.RoomHistoryVisibility, "")],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
self.assertNotIn(room_id1, channel.json_body["rooms"])
|
||||
|
||||
# Send a state event in room 1
|
||||
self.helper.send_state(
|
||||
room_id1, EventTypes.Name, body={"name": "foo"}, tok=user2_tok
|
||||
)
|
||||
|
||||
# We should get room_id1 down sync, with the full state.
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.sync_endpoint + f"?pos={from_token}",
|
||||
{
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 0]],
|
||||
"required_state": [
|
||||
[EventTypes.Create, ""],
|
||||
[EventTypes.RoomHistoryVisibility, ""],
|
||||
[EventTypes.Name, ""],
|
||||
],
|
||||
"timeline_limit": 0,
|
||||
}
|
||||
}
|
||||
},
|
||||
access_token=user1_tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
)
|
||||
|
||||
self._assertRequiredStateIncludes(
|
||||
channel.json_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Create, "")],
|
||||
state_map[(EventTypes.RoomHistoryVisibility, "")],
|
||||
state_map[(EventTypes.Name, "")],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
|
||||
class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase):
|
||||
"""Tests for the to-device sliding sync extension"""
|
||||
|
||||
Reference in New Issue
Block a user