Compare commits

...

25 Commits

Author SHA1 Message Date
Erik Johnston
52f4253c50 Improve comment 2024-08-20 00:40:08 +01:00
Erik Johnston
2bba63ef21 Replace initial=true with unstable_expanded_timeline=true 2024-08-20 00:15:44 +01:00
Erik Johnston
ba4e63b948 Add comment explaining the odd behaviour 2024-08-19 23:35:21 +01:00
Erik Johnston
299ab1b945 Use timelime_limit not len(timeline) 2024-08-19 23:30:00 +01:00
Erik Johnston
49c4645ab6 Remove double insertion 2024-08-19 23:29:26 +01:00
Erik Johnston
0e8feedc8d Remove spurious set_tag 2024-08-19 23:22:37 +01:00
Erik Johnston
a4ad443bbf Use test helpers 2024-08-19 23:22:14 +01:00
Erik Johnston
891ce47ab0 Rename previous_room_configs 2024-08-19 23:17:18 +01:00
Erik Johnston
a63261d83a Restore comments 2024-08-19 21:31:56 +01:00
Erik Johnston
768d150b04 Add docstring 2024-08-19 21:31:36 +01:00
Erik Johnston
33ec15b62e Restore comments 2024-08-19 21:31:32 +01:00
Erik Johnston
aea946be8b Merge remote-tracking branch 'origin/develop' into erikj/ss_room_sub2 2024-08-19 21:25:50 +01:00
Erik Johnston
b23231e9e4 Newsfile 2024-08-16 14:05:33 +01:00
Erik Johnston
009af0e560 Handle timeline_limit changes 2024-08-16 14:00:07 +01:00
Erik Johnston
ee6efa2c66 Track room configs in per-connection state 2024-08-16 13:59:56 +01:00
Erik Johnston
70d32fba83 Add proper DB function for getting receipts between things 2024-08-15 14:31:31 +01:00
Erik Johnston
100927dde1 Comments 2024-08-15 13:15:49 +01:00
Erik Johnston
614c0d73de Newsfile 2024-08-15 12:42:07 +01:00
Erik Johnston
55feaae9ea Add tests 2024-08-15 12:34:17 +01:00
Erik Johnston
6b9d24451f Record state 2024-08-15 09:57:48 +01:00
Erik Johnston
a1b75f76f7 WIP comments 2024-08-15 09:40:47 +01:00
Erik Johnston
c15b8b39cd WIP receipts reading 2024-08-15 09:40:47 +01:00
Erik Johnston
0561c86c5d Revamp 2024-08-15 09:25:23 +01:00
Erik Johnston
baac6c550e Record with new class 2024-08-14 18:47:56 +01:00
Erik Johnston
da5339dc54 Migrate to per-connection state class 2024-08-14 18:44:51 +01:00
5 changed files with 272 additions and 13 deletions

1
changelog.d/17579.misc Normal file
View File

@@ -0,0 +1 @@
Handle changes in `timeline_limit` in experimental sliding sync.

View File

@@ -787,7 +787,19 @@ class SlidingSyncHandler:
# subscription and have updates we need to send (i.e. either because
# we haven't sent the room down, or we have but there are missing
# updates).
for room_id in relevant_room_map:
for room_id, room_config in relevant_room_map.items():
prev_room_sync_config = previous_connection_state.room_configs.get(
room_id
)
if prev_room_sync_config is not None:
# Always include rooms whose timeline limit has increased.
if (
prev_room_sync_config.timeline_limit
< room_config.timeline_limit
):
rooms_should_send.add(room_id)
continue
status = previous_connection_state.rooms.have_sent_room(room_id)
if (
# The room was never sent down before so the client needs to know
@@ -819,12 +831,15 @@ class SlidingSyncHandler:
if room_id in rooms_should_send
}
new_connection_state = previous_connection_state.get_mutable()
@trace
@tag_args
async def handle_room(room_id: str) -> None:
room_sync_result = await self.get_room_sync_data(
sync_config=sync_config,
previous_connection_state=previous_connection_state,
new_connection_state=new_connection_state,
room_id=room_id,
room_sync_config=relevant_rooms_to_send_map[room_id],
room_membership_for_user_at_to_token=room_membership_for_user_map[
@@ -842,8 +857,6 @@ class SlidingSyncHandler:
with start_active_span("sliding_sync.generate_room_entries"):
await concurrently_execute(handle_room, relevant_rooms_to_send_map, 10)
new_connection_state = previous_connection_state.get_mutable()
extensions = await self.get_extensions_response(
sync_config=sync_config,
actual_lists=lists,
@@ -1955,6 +1968,7 @@ class SlidingSyncHandler:
self,
sync_config: SlidingSyncConfig,
previous_connection_state: "PerConnectionState",
new_connection_state: "MutablePerConnectionState",
room_id: str,
room_sync_config: RoomSyncConfig,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
@@ -1998,9 +2012,21 @@ class SlidingSyncHandler:
# - For an incremental sync where we haven't sent it down this
# connection before
#
# Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
# Relevant spec issue:
# https://github.com/matrix-org/matrix-spec/issues/1917
#
# XXX: Odd behavior - We also check if the `timeline_limit` has
# increased, if so we ignore the from bound for the timeline to send
# down a larger chunk of history and set `unstable_expanded_timeline` to
# true. This is a bit different to the behavior of the Sliding Sync
# proxy (which sets initial=true, but then doesn't send down the full
# state again), but existing apps, e.g. ElementX, just need `limited`
# set. In future this behavior is almost certainly going to change.
#
# TODO: Also handle changes to `required_state`
from_bound = None
initial = True
ignore_timeline_bound = False
if from_token and not room_membership_for_user_at_to_token.newly_joined:
room_status = previous_connection_state.rooms.have_sent_room(room_id)
if room_status.status == HaveSentRoomFlag.LIVE:
@@ -2018,7 +2044,26 @@ class SlidingSyncHandler:
log_kv({"sliding_sync.room_status": room_status})
log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial})
prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
if prev_room_sync_config is not None:
# Check if the timeline limit has increased, if so ignore the
# timeline bound and record the change (see "XXX: Odd behavior"
# above).
if (
prev_room_sync_config.timeline_limit
< room_sync_config.timeline_limit
):
ignore_timeline_bound = True
# TODO: Check for changes in `required_state``
log_kv(
{
"sliding_sync.from_bound": from_bound,
"sliding_sync.initial": initial,
"sliding_sync.ignore_timeline_bound": ignore_timeline_bound,
}
)
# Assemble the list of timeline events
#
@@ -2055,6 +2100,10 @@ class SlidingSyncHandler:
room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
)
timeline_from_bound = from_bound
if ignore_timeline_bound:
timeline_from_bound = None
# For initial `/sync` (and other historical scenarios mentioned above), we
# want to view a historical section of the timeline; to fetch events by
# `topological_ordering` (best representation of the room DAG as others were
@@ -2080,7 +2129,7 @@ class SlidingSyncHandler:
pagination_method: PaginateFunction = (
# Use `topographical_ordering` for historical events
paginate_room_events_by_topological_ordering
if from_bound is None
if timeline_from_bound is None
# Use `stream_ordering` for updates
else paginate_room_events_by_stream_ordering
)
@@ -2090,7 +2139,7 @@ class SlidingSyncHandler:
# (from newer to older events) starting at to_bound.
# This ensures we fill the `limit` with the newest events first,
from_key=to_bound,
to_key=from_bound,
to_key=timeline_from_bound,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
@@ -2448,6 +2497,49 @@ class SlidingSyncHandler:
if new_bump_event_pos.stream > 0:
bump_stamp = new_bump_event_pos.stream
unstable_expanded_timeline = False
prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
if ignore_timeline_bound:
# FIXME: We signal the fact that we're sending down more events to
# the client by setting `unstable_expanded_timeline` to true (see
# "XXX: Odd behavior" above).
unstable_expanded_timeline = True
new_connection_state.room_configs[room_id] = RoomSyncConfig(
timeline_limit=room_sync_config.timeline_limit,
required_state_map=room_sync_config.required_state_map,
)
elif prev_room_sync_config is not None:
# If the result is limited then we need to record that the timeline
# limit has been reduced, as if the client later requests more
# timeline then we have more data to send.
#
# Otherwise we don't need to record that the timeline_limit has been
# reduced, as the *effective* timeline limit (i.e. the amount of
# timeline we have previously sent) is at least the previous
# timeline limit.
#
# This is to handle the case where the timeline limit e.g. goes from
# 10 to 5 to 10 again (without any timeline gaps), where there's no
# point sending down extra events when the timeline limit is
# increased as the client already has the 10 previous events.
# However, if is a gap (i.e. limited is True), then we *do* need to
# record the reduced timeline.
if (
limited
and prev_room_sync_config.timeline_limit
> room_sync_config.timeline_limit
):
new_connection_state.room_configs[room_id] = RoomSyncConfig(
timeline_limit=room_sync_config.timeline_limit,
required_state_map=room_sync_config.required_state_map,
)
# TODO: Record changes in required_state.
else:
new_connection_state.room_configs[room_id] = room_sync_config
set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
return SlidingSyncResult.RoomResult(
@@ -2462,6 +2554,7 @@ class SlidingSyncHandler:
stripped_state=stripped_state,
prev_batch=prev_batch_token,
limited=limited,
unstable_expanded_timeline=unstable_expanded_timeline,
num_live=num_live,
bump_stamp=bump_stamp,
joined_count=room_membership_summary.get(
@@ -3262,16 +3355,30 @@ class PerConnectionState:
Attributes:
rooms: The status of each room for the events stream.
receipts: The status of each room for the receipts stream.
room_configs: Map from room_id to the `RoomSyncConfig` of all
rooms that we have previously sent down.
"""
rooms: RoomStatusMap[RoomStreamToken] = attr.Factory(RoomStatusMap)
receipts: RoomStatusMap[MultiWriterStreamToken] = attr.Factory(RoomStatusMap)
room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict)
def get_mutable(self) -> "MutablePerConnectionState":
"""Get a mutable copy of this state."""
room_configs = cast(MutableMapping[str, RoomSyncConfig], self.room_configs)
return MutablePerConnectionState(
rooms=self.rooms.get_mutable(),
receipts=self.receipts.get_mutable(),
room_configs=ChainMap({}, room_configs),
)
def copy(self) -> "PerConnectionState":
return PerConnectionState(
rooms=self.rooms.copy(),
receipts=self.receipts.copy(),
room_configs=dict(self.room_configs),
)
@@ -3282,8 +3389,18 @@ class MutablePerConnectionState(PerConnectionState):
rooms: MutableRoomStatusMap[RoomStreamToken]
receipts: MutableRoomStatusMap[MultiWriterStreamToken]
room_configs: typing.ChainMap[str, RoomSyncConfig]
def has_updates(self) -> bool:
return bool(self.rooms.get_updates()) or bool(self.receipts.get_updates())
return (
bool(self.rooms.get_updates())
or bool(self.receipts.get_updates())
or bool(self.get_room_config_updates())
)
def get_room_config_updates(self) -> Mapping[str, RoomSyncConfig]:
"""Get updates to the room sync config"""
return self.room_configs.maps[0]
@attr.s(auto_attribs=True)
@@ -3367,7 +3484,6 @@ class SlidingSyncConnectionStore:
) -> int:
"""Record updated per-connection state, returning the connection
position associated with the new state.
If there are no changes to the state this may return the same token as
the existing per-connection state.
"""
@@ -3388,10 +3504,7 @@ class SlidingSyncConnectionStore:
# We copy the `MutablePerConnectionState` so that the inner `ChainMap`s
# don't grow forever.
sync_statuses[new_store_token] = PerConnectionState(
rooms=new_connection_state.rooms.copy(),
receipts=new_connection_state.receipts.copy(),
)
sync_statuses[new_store_token] = new_connection_state.copy()
return new_store_token

View File

@@ -1044,6 +1044,11 @@ class SlidingSyncRestServlet(RestServlet):
if room_result.initial:
serialized_rooms[room_id]["initial"] = room_result.initial
if room_result.unstable_expanded_timeline:
serialized_rooms[room_id][
"unstable_expanded_timeline"
] = room_result.unstable_expanded_timeline
# This will be omitted for invite/knock rooms with `stripped_state`
if (
room_result.required_state is not None

View File

@@ -171,6 +171,9 @@ class SlidingSyncResult:
their local state. When there is an update, servers MUST omit this flag
entirely and NOT send "initial":false as this is wasteful on bandwidth. The
absence of this flag means 'false'.
unstable_expanded_timeline: Flag which is set if we're returning more historic
events due to the timeline limit having increased. See "XXX: Odd behavior"
comment ing `synapse.handlers.sliding_sync`.
required_state: The current state of the room
timeline: Latest events in the room. The last event is the most recent.
bundled_aggregations: A mapping of event ID to the bundled aggregations for
@@ -219,6 +222,7 @@ class SlidingSyncResult:
heroes: Optional[List[StrippedHero]]
is_dm: bool
initial: bool
unstable_expanded_timeline: bool
# Should be empty for invite/knock rooms with `stripped_state`
required_state: List[EventBase]
# Should be empty for invite/knock rooms with `stripped_state`

View File

@@ -17,6 +17,7 @@ from typing import List, Optional
from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
from synapse.api.constants import EventTypes
from synapse.rest.client import login, room, sync
from synapse.server import HomeServer
from synapse.types import StreamToken, StrSequence
@@ -573,3 +574,138 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase):
# Nothing to see for this banned user in the room in the token range
self.assertIsNone(response_body["rooms"].get(room_id1))
def test_increasing_timeline_range_sends_more_messages(self) -> None:
"""
Test that increasing the timeline limit via room subscriptions sends the
room down with more messages in a limited sync.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [[EventTypes.Create, ""]],
"timeline_limit": 1,
}
}
}
message_events = []
for _ in range(10):
resp = self.helper.send(room_id1, "msg", tok=user1_tok)
message_events.append(resp["event_id"])
# Make the first Sliding Sync request
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
room_response = response_body["rooms"][room_id1]
self.assertEqual(room_response["initial"], True)
self.assertNotIn("unstable_expanded_timeline", room_response)
self.assertEqual(room_response["limited"], True)
# We only expect the last message at first
self._assertTimelineEqual(
room_id=room_id1,
actual_event_ids=[event["event_id"] for event in room_response["timeline"]],
expected_event_ids=message_events[-1:],
message=str(room_response["timeline"]),
)
# We also expect to get the create event state.
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
room_response["required_state"],
{state_map[(EventTypes.Create, "")]},
exact=True,
)
# Now do another request with a room subscription with an increased timeline limit
sync_body["room_subscriptions"] = {
room_id1: {
"required_state": [],
"timeline_limit": 10,
}
}
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
room_response = response_body["rooms"][room_id1]
self.assertNotIn("initial", room_response)
self.assertEqual(room_response["unstable_expanded_timeline"], True)
self.assertEqual(room_response["limited"], True)
# Now we expect all the messages
self._assertTimelineEqual(
room_id=room_id1,
actual_event_ids=[event["event_id"] for event in room_response["timeline"]],
expected_event_ids=message_events,
message=str(room_response["timeline"]),
)
# We don't expect to get the room create down, as nothing has changed.
self.assertNotIn("required_state", room_response)
# Decreasing the timeline limit shouldn't resend any events
sync_body["room_subscriptions"] = {
room_id1: {
"required_state": [],
"timeline_limit": 5,
}
}
event_response = self.helper.send(room_id1, "msg", tok=user1_tok)
latest_event_id = event_response["event_id"]
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
room_response = response_body["rooms"][room_id1]
self.assertNotIn("initial", room_response)
self.assertNotIn("unstable_expanded_timeline", room_response)
self.assertEqual(room_response["limited"], False)
self._assertTimelineEqual(
room_id=room_id1,
actual_event_ids=[event["event_id"] for event in room_response["timeline"]],
expected_event_ids=[latest_event_id],
message=str(room_response["timeline"]),
)
# Increasing the limit to what it was before also should not resend any
# events
sync_body["room_subscriptions"] = {
room_id1: {
"required_state": [],
"timeline_limit": 10,
}
}
event_response = self.helper.send(room_id1, "msg", tok=user1_tok)
latest_event_id = event_response["event_id"]
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
room_response = response_body["rooms"][room_id1]
self.assertNotIn("initial", room_response)
self.assertNotIn("unstable_expanded_timeline", room_response)
self.assertEqual(room_response["limited"], False)
self._assertTimelineEqual(
room_id=room_id1,
actual_event_ids=[event["event_id"] for event in room_response["timeline"]],
expected_event_ids=[latest_event_id],
message=str(room_response["timeline"]),
)