Compare commits

...

15 Commits

Author SHA1 Message Date
Erik Johnston
234e4cb83d Only return changed rooms 2024-07-17 15:27:37 +01:00
Erik Johnston
27848818c2 Add tests 2024-07-17 14:27:55 +01:00
Erik Johnston
2968f2e3b8 Bump typing-extensions for 'assert_never' 2024-07-17 14:10:37 +01:00
Erik Johnston
a90c40812a Newsfile 2024-07-17 14:02:17 +01:00
Erik Johnston
e2c47bf4e8 Fix tests 2024-07-17 13:59:35 +01:00
Erik Johnston
de6e3bdee8 Handle state deltas in non-initial rooms 2024-07-17 13:59:35 +01:00
Erik Johnston
185831754e Handle initial flag correctly 2024-07-17 13:59:34 +01:00
Erik Johnston
e2a88e44ef Use new room store to track if we've sent a room down 2024-07-17 13:59:34 +01:00
Erik Johnston
53273db3e8 Add conn_id field 2024-07-17 13:55:49 +01:00
Erik Johnston
d44f7e12b1 WIP/PoC of storing whether we have sent rooms down to clients 2024-07-17 13:47:30 +01:00
Erik Johnston
f3a4cfb8b4 Newsfile 2024-07-17 12:16:54 +01:00
Erik Johnston
f3030af575 Fix to use new token format 2024-07-17 12:14:28 +01:00
Erik Johnston
e8df0d78a2 Don't create tokens manually in SSS tests 2024-07-17 12:14:28 +01:00
Erik Johnston
1ad1cce3f2 Pass throught SlidingSyncStreamToken 2024-07-17 12:14:28 +01:00
Erik Johnston
30263b43c2 Add SlidingSyncStreamToken 2024-07-17 12:14:28 +01:00
10 changed files with 739 additions and 85 deletions

View File

@@ -0,0 +1 @@
Track which rooms have been sent to clients in the experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.

1
changelog.d/17452.misc Normal file
View File

@@ -0,0 +1 @@
Change sliding sync to use their own token format in preparation for storing per-connection state.

View File

@@ -201,8 +201,8 @@ netaddr = ">=0.7.18"
# add a lower bound to the Jinja2 dependency.
Jinja2 = ">=3.0"
bleach = ">=1.4.3"
# We use `Self`, which were added in `typing-extensions` 4.0.
typing-extensions = ">=4.0"
# We use `assert_never`, which were added in `typing-extensions` 4.1.
typing-extensions = ">=4.1"
# We enforce that we have a `cryptography` version that bundles an `openssl`
# with the latest security patches.
cryptography = ">=3.4.7"

View File

@@ -18,11 +18,13 @@
#
#
import logging
from enum import Enum
from itertools import chain
from typing import TYPE_CHECKING, Any, Dict, Final, List, Mapping, Optional, Set, Tuple
import attr
from immutabledict import immutabledict
from typing_extensions import assert_never
from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
from synapse.events import EventBase
@@ -36,7 +38,9 @@ from synapse.types import (
PersistedEventPosition,
Requester,
RoomStreamToken,
SlidingSyncStreamToken,
StateMap,
StrCollection,
StreamKeyType,
StreamToken,
UserID,
@@ -339,11 +343,13 @@ class SlidingSyncHandler:
self.relations_handler = hs.get_relations_handler()
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
self.connection_store = SlidingSyncConnectionStore()
async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SlidingSyncConfig,
from_token: Optional[StreamToken] = None,
from_token: Optional[SlidingSyncStreamToken] = None,
timeout_ms: int = 0,
) -> SlidingSyncResult:
"""
@@ -378,7 +384,7 @@ class SlidingSyncHandler:
# this returns false, it means we timed out waiting, and we should
# just return an empty response.
before_wait_ts = self.clock.time_msec()
if not await self.notifier.wait_for_stream_token(from_token):
if not await self.notifier.wait_for_stream_token(from_token.stream_token):
logger.warning(
"Timed out waiting for worker to catch up. Returning empty response"
)
@@ -416,7 +422,7 @@ class SlidingSyncHandler:
sync_config.user.to_string(),
timeout_ms,
current_sync_callback,
from_token=from_token,
from_token=from_token.stream_token,
)
return result
@@ -425,7 +431,7 @@ class SlidingSyncHandler:
self,
sync_config: SlidingSyncConfig,
to_token: StreamToken,
from_token: Optional[StreamToken] = None,
from_token: Optional[SlidingSyncStreamToken] = None,
) -> SlidingSyncResult:
"""
Generates the response body of a Sliding Sync result, represented as a
@@ -446,6 +452,12 @@ class SlidingSyncHandler:
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()
await self.connection_store.mark_token_seen(
user_id,
conn_id=sync_config.connection_id(),
from_token=from_token,
)
# Get all of the room IDs that the user should be able to see in the sync
# response
has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
@@ -458,7 +470,7 @@ class SlidingSyncHandler:
await self.get_room_membership_for_user_at_to_token(
user=sync_config.user,
to_token=to_token,
from_token=from_token,
from_token=from_token.stream_token if from_token else None,
)
)
@@ -589,11 +601,36 @@ class SlidingSyncHandler:
else:
relevant_room_map[room_id] = room_sync_config
# Filter out rooms that haven't received updates and we've sent down
# previously.
if from_token:
rooms_should_send = set()
for room_id in relevant_room_map:
status = await self.connection_store.have_sent_room(
user_id,
sync_config.connection_id(),
from_token.connection_token,
room_id,
)
if status.status != HaveSentRoomFlag.LIVE:
rooms_should_send.add(room_id)
# TODO: Also check current state delta stream
rooms_that_have_updates = (
self.store._events_stream_cache.get_entities_changed(
relevant_room_map, from_token.stream_token.room_key.stream
)
)
rooms_should_send.update(rooms_that_have_updates)
relevant_room_map = {
r: c for r, c in relevant_room_map.items() if r in rooms_should_send
}
# Fetch room data
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
for room_id, room_sync_config in relevant_room_map.items():
room_sync_result = await self.get_room_sync_data(
user=sync_config.user,
sync_config=sync_config,
room_id=room_id,
room_sync_config=room_sync_config,
room_membership_for_user_at_to_token=room_membership_for_user_map[
@@ -609,8 +646,21 @@ class SlidingSyncHandler:
sync_config=sync_config, to_token=to_token
)
if has_lists or has_room_subscriptions:
connection_token = await self.connection_store.record_rooms(
user_id,
conn_id=sync_config.connection_id(),
from_token=from_token,
sent_room_ids=relevant_room_map.keys(),
unsent_room_ids=[], # TODO: We currently ssume that we have sent down all updates.
)
elif from_token:
connection_token = from_token.connection_token
else:
connection_token = 0
return SlidingSyncResult(
next_pos=to_token,
next_pos=SlidingSyncStreamToken(to_token, connection_token),
lists=lists,
rooms=rooms,
extensions=extensions,
@@ -1342,11 +1392,11 @@ class SlidingSyncHandler:
async def get_room_sync_data(
self,
user: UserID,
sync_config: SlidingSyncConfig,
room_id: str,
room_sync_config: RoomSyncConfig,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
from_token: Optional[StreamToken],
from_token: Optional[SlidingSyncStreamToken],
to_token: StreamToken,
) -> SlidingSyncResult.RoomResult:
"""
@@ -1364,6 +1414,38 @@ class SlidingSyncHandler:
from_token: The point in the stream to sync from.
to_token: The point in the stream to sync up to.
"""
user = sync_config.user
# Determine whether we should limit the timeline to the token range.
#
# We should return historical messages (before token range) in the
# following cases because we want clients to be able to show a basic
# screen of information:
# - Initial sync (because no `from_token` to limit us anyway)
# - When users `newly_joined`
# - For an incremental sync where we haven't sent it down this
# connection before
to_bound = None
initial = True
if from_token and not room_membership_for_user_at_to_token.newly_joined:
room_status = await self.connection_store.have_sent_room(
user_id=user.to_string(),
conn_id=sync_config.connection_id(),
connection_token=from_token.connection_token,
room_id=room_id,
)
if room_status.status == HaveSentRoomFlag.LIVE:
to_bound = from_token.stream_token.room_key
initial = False
elif room_status.status == HaveSentRoomFlag.PREVIOUSLY:
assert room_status.last_token is not None
to_bound = room_status.last_token
initial = False
elif room_status.status == HaveSentRoomFlag.NEVER:
to_bound = None
initial = True
else:
assert_never(room_status.status)
# Assemble the list of timeline events
#
@@ -1400,22 +1482,6 @@ class SlidingSyncHandler:
room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
)
# Determine whether we should limit the timeline to the token range.
#
# We should return historical messages (before token range) in the
# following cases because we want clients to be able to show a basic
# screen of information:
# - Initial sync (because no `from_token` to limit us anyway)
# - When users `newly_joined`
# - TODO: For an incremental sync where we haven't sent it down this
# connection before
to_bound = (
from_token.room_key
if from_token is not None
and not room_membership_for_user_at_to_token.newly_joined
else None
)
timeline_events, new_room_key = await self.store.paginate_room_events(
room_id=room_id,
from_key=from_bound,
@@ -1477,7 +1543,9 @@ class SlidingSyncHandler:
instance_name=timeline_event.internal_metadata.instance_name,
stream=timeline_event.internal_metadata.stream_ordering,
)
if persisted_position.persisted_after(from_token.room_key):
if persisted_position.persisted_after(
from_token.stream_token.room_key
):
num_live += 1
else:
# Since we're iterating over the timeline events in
@@ -1534,12 +1602,6 @@ class SlidingSyncHandler:
# indicate to the client that a state reset happened. Perhaps we should indicate
# this by setting `initial: True` and empty `required_state`.
# TODO: Since we can't determine whether we've already sent a room down this
# Sliding Sync connection before (we plan to add this optimization in the
# future), we're always returning the requested room state instead of
# updates.
initial = True
# Check whether the room has a name set
name_state_ids = await self.get_current_state_ids_at(
room_id=room_id,
@@ -1683,9 +1745,17 @@ class SlidingSyncHandler:
to_token=to_token,
)
else:
# TODO: Once we can figure out if we've sent a room down this connection before,
# we can return updates instead of the full required state.
raise NotImplementedError()
assert to_bound is not None
deltas = await self.store.get_current_state_deltas_for_room(
room_id, to_bound, to_token.room_key
)
# TODO: Filter room state before fetching events
# TODO: Handle state resets where event_id is None
events = await self.store.get_events(
[d.event_id for d in deltas if d.event_id]
)
room_state = {(s.type, s.state_key): s for s in events.values()}
required_room_state: StateMap[EventBase] = {}
if required_state_filter != StateFilter.none():
@@ -1799,7 +1869,7 @@ class SlidingSyncHandler:
"""
user_id = sync_config.user.to_string()
device_id = sync_config.device_id
device_id = sync_config.requester.device_id
# Check that this request has a valid device ID (not all requests have
# to belong to a device, and so device_id is None), and that the
@@ -1855,3 +1925,198 @@ class SlidingSyncHandler:
next_batch=f"{stream_id}",
events=messages,
)
class HaveSentRoomFlag(Enum):
"""Flag for whether we have sent the room down a sliding sync connection.
The valid state changes here are:
NEVER -> LIVE
LIVE -> PREVIOUSLY
PREVIOUSLY -> LIVE
"""
# The room has never been sent down (or we have forgotten we have sent it
# down).
NEVER = 1
# We have previously sent the room down, but there are updates that we
# haven't sent down.
PREVIOUSLY = 2
# We have sent the room down and the client has received all updates.
LIVE = 3
@attr.s(auto_attribs=True, slots=True, frozen=True)
class HaveSentRoom:
"""Whether we have sent the room down a sliding sync connection.
Attributes:
status: Flag of if we have or haven't sent down the room
last_token: If the flag is `PREVIOUSLY` then this is non-null and
contains the last stream token of the last updates we sent down
the room, i.e. we still need to send everything since then to the
client.
"""
status: HaveSentRoomFlag
last_token: Optional[RoomStreamToken]
@staticmethod
def previously(last_token: RoomStreamToken) -> "HaveSentRoom":
"""Constructor for `PREVIOUSLY` flag."""
return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token)
HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None)
HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None)
@attr.s(auto_attribs=True)
class SlidingSyncConnectionStore:
"""In-memory store of per-connection state, including what rooms we have
previously sent down a sliding sync connection.
Note: This is NOT safe to run in a worker setup.
The complication here is that we need to handle requests being resent, i.e.
if we sent down a room in a response that the client received, we must
consider the room *not* sent when we get the request again.
This is handled by using an integer "token", which is returned to the client
as part of the sync token. For each connection we store a mapping from
tokens to the room states, and create a new entry when we send down new
rooms.
Note that for any given sliding sync connection we will only store a maximum
of two different tokens: the previous token from the request and a new token
sent in the response. When we receive a request with a given token, we then
clear out all other entries with a different token.
Attributes:
_connections: Mapping from `(user_id, conn_id)` to mapping of `token`
to mapping of room ID to `HaveSentRoom`.
"""
# `(user_id, conn_id)` -> `token` -> `room_id` -> `HaveSentRoom`
_connections: Dict[Tuple[str, str], Dict[int, Dict[str, HaveSentRoom]]] = (
attr.Factory(dict)
)
async def have_sent_room(
self, user_id: str, conn_id: str, connection_token: int, room_id: str
) -> HaveSentRoom:
"""Whether for the given user_id/conn_id/token, return whether we have
previously sent the room down
"""
sync_statuses = self._connections.setdefault((user_id, conn_id), {})
room_status = sync_statuses.get(connection_token, {}).get(
room_id, HAVE_SENT_ROOM_NEVER
)
return room_status
async def record_rooms(
self,
user_id: str,
conn_id: str,
from_token: Optional[SlidingSyncStreamToken],
*,
sent_room_ids: StrCollection,
unsent_room_ids: StrCollection,
) -> int:
"""Record which rooms we have/haven't sent down in a new response
Attributes:
user_id
conn_id
from_token: The since token from the request, if any
sent_room_ids: The set of room IDs that we have sent down as
part of this request (only needs to be ones we didn't
previously sent down).
unsent_room_ids: The set of room IDs that have had updates
since the `last_room_token`, but which were not included in
this request
"""
prev_connection_token = 0
if from_token is not None:
prev_connection_token = from_token.connection_token
# If there are no changes then this is a noop.
if not sent_room_ids and not unsent_room_ids:
return prev_connection_token
sync_statuses = self._connections.setdefault((user_id, conn_id), {})
# Generate a new token, removing any existing entries in that token
# (which can happen if requests get resent).
new_store_token = prev_connection_token + 1
sync_statuses.pop(new_store_token, None)
# Copy over and update the room mappings.
new_room_statuses = dict(sync_statuses.get(prev_connection_token, {}))
# Whether we have updated the `new_room_statuses`, if we don't by the
# end we can treat this as a noop.
have_updated = False
for room_id in sent_room_ids:
new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
have_updated = True
# Whether we add/update the entries for unsent rooms depends on the
# existing entry:
# - LIVE: We have previously sent down everything up to
# `last_room_token, so we update the entry to be `PREVIOUSLY` with
# `last_room_token`.
# - PREVIOUSLY: We have previously sent down everything up to *a*
# given token, so we don't need to update the entry.
# - NEVER: We have never previously sent down the room, and we haven't
# sent anything down this time either so we leave it as NEVER.
# Work out the new state for unsent rooms that were `LIVE`.
if from_token:
new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
else:
new_unsent_state = HAVE_SENT_ROOM_NEVER
for room_id in unsent_room_ids:
prev_state = new_room_statuses.get(room_id)
if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE:
new_room_statuses[room_id] = new_unsent_state
have_updated = True
if not have_updated:
return prev_connection_token
sync_statuses[new_store_token] = new_room_statuses
return new_store_token
async def mark_token_seen(
self,
user_id: str,
conn_id: str,
from_token: Optional[SlidingSyncStreamToken],
) -> None:
"""We have received a request with the given token, so we can clear out
any other tokens associated with the connection.
If there is no from token then we have started afresh, and so we delete
all tokens associated with the device.
"""
# Clear out any tokens for the connection that doesn't match the one
# from the request.
sync_statuses = self._connections.pop((user_id, conn_id), {})
if from_token is None:
return
sync_statuses = {
i: room_statuses
for i, room_statuses in sync_statuses.items()
if i == from_token.connection_token
}
if sync_statuses:
self._connections[(user_id, conn_id)] = sync_statuses

View File

@@ -54,7 +54,7 @@ from synapse.http.servlet import (
from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import trace_with_opname
from synapse.rest.admin.experimental_features import ExperimentalFeature
from synapse.types import JsonDict, Requester, StreamToken
from synapse.types import JsonDict, Requester, SlidingSyncStreamToken, StreamToken
from synapse.types.rest.client import SlidingSyncBody
from synapse.util import json_decoder
from synapse.util.caches.lrucache import LruCache
@@ -881,7 +881,6 @@ class SlidingSyncRestServlet(RestServlet):
)
user = requester.user
device_id = requester.device_id
timeout = parse_integer(request, "timeout", default=0)
# Position in the stream
@@ -889,7 +888,9 @@ class SlidingSyncRestServlet(RestServlet):
from_token = None
if from_token_string is not None:
from_token = await StreamToken.from_string(self.store, from_token_string)
from_token = await SlidingSyncStreamToken.from_string(
self.store, from_token_string
)
# TODO: We currently don't know whether we're going to use sticky params or
# maybe some filters like sync v2 where they are built up once and referenced
@@ -900,11 +901,12 @@ class SlidingSyncRestServlet(RestServlet):
sync_config = SlidingSyncConfig(
user=user,
device_id=device_id,
requester=requester,
# FIXME: Currently, we're just manually copying the fields from the
# `SlidingSyncBody` into the config. How can we gurantee into the future
# `SlidingSyncBody` into the config. How can we guarantee into the future
# that we don't forget any? I would like something more structured like
# `copy_attributes(from=body, to=config)`
conn_id=body.conn_id,
lists=body.lists,
room_subscriptions=body.room_subscriptions,
extensions=body.extensions,

View File

@@ -26,6 +26,8 @@ import attr
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main.stream import _filter_results_by_stream
from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
logger = logging.getLogger(__name__)
@@ -156,3 +158,39 @@ class StateDeltasStore(SQLBaseStore):
"get_max_stream_id_in_current_state_deltas",
self._get_max_stream_id_in_current_state_deltas_txn,
)
async def get_current_state_deltas_for_room(
self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
) -> List[StateDelta]:
"""Get the state deltas between that have happened between two
tokens."""
def get_current_state_deltas_for_room_txn(
txn: LoggingTransaction,
) -> List[StateDelta]:
sql = """
SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
FROM current_state_delta_stream
WHERE room_id = ? AND ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC
"""
txn.execute(
sql, (room_id, from_token.stream, to_token.get_max_stream_pos())
)
return [
StateDelta(
stream_id=row[1],
room_id=room_id,
event_type=row[2],
state_key=row[3],
event_id=row[4],
prev_event_id=row[5],
)
for row in txn
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
]
return await self.db_pool.runInteraction(
"get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
)

View File

@@ -1137,6 +1137,43 @@ StreamToken.START = StreamToken(
)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class SlidingSyncStreamToken:
"""The same as a `StreamToken`, but includes an extra field at the start for
the sliding sync connection token (separated by a '/'). This is used to
store per-connection state.
This then looks something like:
5/s2633508_17_338_6732159_1082514_541479_274711_265584_1_379
"""
stream_token: StreamToken
connection_token: int
@staticmethod
@cancellable
async def from_string(store: "DataStore", string: str) -> "SlidingSyncStreamToken":
"""Creates a SlidingSyncStreamToken from its textual representation."""
try:
connection_token_str, stream_token_str = string.split("/", 1)
connection_token = int(connection_token_str)
stream_token = await StreamToken.from_string(store, stream_token_str)
return SlidingSyncStreamToken(
stream_token=stream_token,
connection_token=connection_token,
)
except CancelledError:
raise
except Exception:
raise SynapseError(400, "Invalid stream token")
async def to_string(self, store: "DataStore") -> str:
"""Serializes the token to a string"""
stream_token_str = await self.stream_token.to_string(store)
return f"{self.connection_token}/{stream_token_str}"
@attr.s(slots=True, frozen=True, auto_attribs=True)
class PersistedPosition:
"""Position of a newly persisted row with instance that persisted it."""

View File

@@ -31,7 +31,14 @@ else:
from pydantic import Extra
from synapse.events import EventBase
from synapse.types import JsonDict, JsonMapping, StreamToken, UserID
from synapse.types import (
JsonDict,
JsonMapping,
Requester,
SlidingSyncStreamToken,
StreamToken,
UserID,
)
from synapse.types.rest.client import SlidingSyncBody
if TYPE_CHECKING:
@@ -102,7 +109,7 @@ class SlidingSyncConfig(SlidingSyncBody):
"""
user: UserID
device_id: Optional[str]
requester: Requester
# Pydantic config
class Config:
@@ -113,6 +120,31 @@ class SlidingSyncConfig(SlidingSyncBody):
# Allow custom types like `UserID` to be used in the model
arbitrary_types_allowed = True
def connection_id(self) -> str:
"""Return a string identifier for this connection. May clash with
connection IDs from different users.
This is generally a combination of device ID and conn_id. However, both
these two are optional (e.g. puppet access tokens don't have device
IDs), so this handles those edge cases.
"""
# `conn_id` can be null, in which case we default to the empty string
# (if conn ID is empty then the client can't have multiple sync loops)
conn_id = self.conn_id or ""
if self.requester.device_id:
return f"D/{self.requester.device_id}/{conn_id}"
if self.requester.access_token_id:
# If we don't have a device, then the access token ID should be a
# stable ID.
return f"A/{self.requester.access_token_id}/{conn_id}"
# If we have neither then its likely an AS or some weird token. Either
# way we can just fail here.
raise Exception("Cannot use sliding sync with access token type")
class OperationType(Enum):
"""
@@ -287,7 +319,7 @@ class SlidingSyncResult:
def __bool__(self) -> bool:
return bool(self.to_device)
next_pos: StreamToken
next_pos: SlidingSyncStreamToken
lists: Dict[str, SlidingWindowList]
rooms: Dict[str, RoomResult]
extensions: Extensions
@@ -300,7 +332,7 @@ class SlidingSyncResult:
return bool(self.lists or self.rooms or self.extensions)
@staticmethod
def empty(next_pos: StreamToken) -> "SlidingSyncResult":
def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
"Return a new empty result"
return SlidingSyncResult(
next_pos=next_pos,

View File

@@ -120,6 +120,9 @@ class SlidingSyncBody(RequestBodyModel):
Sliding Sync API request body.
Attributes:
conn_id: An optional string to identify this connection to the server. If this
is missing, only 1 sliding sync connection can be made to the server at
any one time.
lists: Sliding window API. A map of list key to list information
(:class:`SlidingSyncList`). Max lists: 100. The list keys should be
arbitrary strings which the client is using to refer to the list. Keep this
@@ -315,6 +318,8 @@ class SlidingSyncBody(RequestBodyModel):
to_device: Optional[ToDeviceExtension] = None
conn_id: Optional[str]
# mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
if TYPE_CHECKING:
lists: Optional[Dict[str, SlidingSyncList]] = None

View File

@@ -50,7 +50,14 @@ from synapse.rest.client import (
sync,
)
from synapse.server import HomeServer
from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken, UserID
from synapse.types import (
JsonDict,
RoomStreamToken,
SlidingSyncStreamToken,
StreamKeyType,
StreamToken,
UserID,
)
from synapse.util import Clock
from tests import unittest
@@ -1448,7 +1455,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
)
future_position_token_serialized = self.get_success(
future_position_token.to_string(self.store)
SlidingSyncStreamToken(future_position_token, 0).to_string(self.store)
)
# Make the Sliding Sync request
@@ -2608,7 +2615,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
room_id1, "activity before token2", tok=user2_tok
)
from_token = self.event_sources.get_current_token()
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 4,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
from_token = channel.json_body["pos"]
# Join the room after the `from_token` which will make us consider this room as
# `newly_joined`.
@@ -2630,8 +2652,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
# Make an incremental Sliding Sync request (what we're trying to test)
channel = self.make_request(
"POST",
self.sync_endpoint
+ f"?pos={self.get_success(from_token.to_string(self.store))}",
self.sync_endpoint + f"?pos={from_token}",
{
"lists": {
"foo-list": {
@@ -2817,7 +2838,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
self.helper.send(room_id1, "activity after invite3", tok=user2_tok)
self.helper.send(room_id1, "activity after invite4", tok=user2_tok)
from_token = self.event_sources.get_current_token()
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 4,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
from_token = channel.json_body["pos"]
self.helper.send(room_id1, "activity after token5", tok=user2_tok)
self.helper.send(room_id1, "activity after toekn6", tok=user2_tok)
@@ -2825,8 +2861,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
# Make the Sliding Sync request
channel = self.make_request(
"POST",
self.sync_endpoint
+ f"?pos={self.get_success(from_token.to_string(self.store))}",
self.sync_endpoint + f"?pos={from_token}",
{
"lists": {
"foo-list": {
@@ -3074,7 +3109,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
self.helper.send(room_id1, "activity after invite3", tok=user2_tok)
self.helper.send(room_id1, "activity after invite4", tok=user2_tok)
from_token = self.event_sources.get_current_token()
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 4,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
from_token = channel.json_body["pos"]
self.helper.send(room_id1, "activity after token5", tok=user2_tok)
self.helper.send(room_id1, "activity after toekn6", tok=user2_tok)
@@ -3082,8 +3132,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
# Make the Sliding Sync request
channel = self.make_request(
"POST",
self.sync_endpoint
+ f"?pos={self.get_success(from_token.to_string(self.store))}",
self.sync_endpoint + f"?pos={from_token}",
{
"lists": {
"foo-list": {
@@ -3239,7 +3288,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
self.helper.send(room_id1, "activity before2", tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
from_token = self.event_sources.get_current_token()
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 4,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
from_token = channel.json_body["pos"]
event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok)
event_response4 = self.helper.send(room_id1, "activity after4", tok=user2_tok)
@@ -3255,8 +3319,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
# Make the Sliding Sync request
channel = self.make_request(
"POST",
self.sync_endpoint
+ f"?pos={self.get_success(from_token.to_string(self.store))}",
self.sync_endpoint + f"?pos={from_token}",
{
"lists": {
"foo-list": {
@@ -3316,15 +3379,29 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
self.helper.send(room_id1, "activity after3", tok=user2_tok)
from_token = self.event_sources.get_current_token()
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 4,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
from_token = channel.json_body["pos"]
self.helper.send(room_id1, "activity after4", tok=user2_tok)
# Make the Sliding Sync request
channel = self.make_request(
"POST",
self.sync_endpoint
+ f"?pos={self.get_success(from_token.to_string(self.store))}",
self.sync_endpoint + f"?pos={from_token}",
{
"lists": {
"foo-list": {
@@ -3451,13 +3528,27 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
after_room_token = self.event_sources.get_current_token()
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 4,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
after_room_token = channel.json_body["pos"]
# Make the Sliding Sync request
channel = self.make_request(
"POST",
self.sync_endpoint
+ f"?pos={self.get_success(after_room_token.to_string(self.store))}",
self.sync_endpoint + f"?pos={after_room_token}",
{
"lists": {
"foo-list": {
@@ -3476,22 +3567,9 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
)
self.assertEqual(channel.code, 200, channel.json_body)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
# The returned state doesn't change from initial to incremental sync. In the
# future, we will only return updates but only if we've sent the room down the
# We only return updates but only if we've sent the room down the
# connection before.
self._assertRequiredStateIncludes(
channel.json_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Create, "")],
state_map[(EventTypes.RoomHistoryVisibility, "")],
},
exact=True,
)
self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
self.assertNotIn(room_id1, channel.json_body["rooms"])
def test_rooms_required_state_wildcard(self) -> None:
"""
@@ -3729,7 +3807,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
user3_id = self.register_user("user3", "pass")
user3_tok = self.login(user3_id, "pass")
from_token = self.event_sources.get_current_token()
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 4,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
from_token = channel.json_body["pos"]
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
@@ -3767,8 +3860,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
# Make the Sliding Sync request with lazy loading for the room members
channel = self.make_request(
"POST",
self.sync_endpoint
+ f"?pos={self.get_success(from_token.to_string(self.store))}",
self.sync_endpoint + f"?pos={from_token}",
{
"lists": {
"foo-list": {
@@ -4230,6 +4322,187 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
channel.json_body["rooms"].get(room_id1), channel.json_body["rooms"]
)
def test_incremental_sync_incremental_state(self) -> None:
"""Test that we only get state updates in incremental sync for rooms
we've already seen.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
# Make the Sliding Sync request
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Create, ""],
[EventTypes.RoomHistoryVisibility, ""],
# This one doesn't exist in the room
[EventTypes.Name, ""],
],
"timeline_limit": 0,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
from_token = channel.json_body["pos"]
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
channel.json_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Create, "")],
state_map[(EventTypes.RoomHistoryVisibility, "")],
},
exact=True,
)
# Send a state event
self.helper.send_state(
room_id1, EventTypes.Name, body={"name": "foo"}, tok=user2_tok
)
channel = self.make_request(
"POST",
self.sync_endpoint + f"?pos={from_token}",
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Create, ""],
[EventTypes.RoomHistoryVisibility, ""],
[EventTypes.Name, ""],
],
"timeline_limit": 0,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
channel.json_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Name, "")],
},
exact=True,
)
def test_incremental_sync_full_state_new_room(self) -> None:
"""Test that we get state all state in incremental sync for rooms that
we haven't seen before.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id2, user1_id, tok=user1_tok)
# Make the Sliding Sync request, we'll only receive room_id2
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 0]],
"required_state": [
[EventTypes.Create, ""],
[EventTypes.RoomHistoryVisibility, ""],
# This one doesn't exist in the room
[EventTypes.Name, ""],
],
"timeline_limit": 0,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
from_token = channel.json_body["pos"]
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id2)
)
self._assertRequiredStateIncludes(
channel.json_body["rooms"][room_id2]["required_state"],
{
state_map[(EventTypes.Create, "")],
state_map[(EventTypes.RoomHistoryVisibility, "")],
},
exact=True,
)
self.assertNotIn(room_id1, channel.json_body["rooms"])
# Send a state event in room 1
self.helper.send_state(
room_id1, EventTypes.Name, body={"name": "foo"}, tok=user2_tok
)
# We should get room_id1 down sync, with the full state.
channel = self.make_request(
"POST",
self.sync_endpoint + f"?pos={from_token}",
{
"lists": {
"foo-list": {
"ranges": [[0, 0]],
"required_state": [
[EventTypes.Create, ""],
[EventTypes.RoomHistoryVisibility, ""],
[EventTypes.Name, ""],
],
"timeline_limit": 0,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
channel.json_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Create, "")],
state_map[(EventTypes.RoomHistoryVisibility, "")],
state_map[(EventTypes.Name, "")],
},
exact=True,
)
class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase):
"""Tests for the to-device sliding sync extension"""