mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Compare commits
25 Commits
madlittlem
...
erikj/ss_r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
52f4253c50 | ||
|
|
2bba63ef21 | ||
|
|
ba4e63b948 | ||
|
|
299ab1b945 | ||
|
|
49c4645ab6 | ||
|
|
0e8feedc8d | ||
|
|
a4ad443bbf | ||
|
|
891ce47ab0 | ||
|
|
a63261d83a | ||
|
|
768d150b04 | ||
|
|
33ec15b62e | ||
|
|
aea946be8b | ||
|
|
b23231e9e4 | ||
|
|
009af0e560 | ||
|
|
ee6efa2c66 | ||
|
|
70d32fba83 | ||
|
|
100927dde1 | ||
|
|
614c0d73de | ||
|
|
55feaae9ea | ||
|
|
6b9d24451f | ||
|
|
a1b75f76f7 | ||
|
|
c15b8b39cd | ||
|
|
0561c86c5d | ||
|
|
baac6c550e | ||
|
|
da5339dc54 |
1
changelog.d/17579.misc
Normal file
1
changelog.d/17579.misc
Normal file
@@ -0,0 +1 @@
|
||||
Handle changes in `timeline_limit` in experimental sliding sync.
|
||||
@@ -787,7 +787,19 @@ class SlidingSyncHandler:
|
||||
# subscription and have updates we need to send (i.e. either because
|
||||
# we haven't sent the room down, or we have but there are missing
|
||||
# updates).
|
||||
for room_id in relevant_room_map:
|
||||
for room_id, room_config in relevant_room_map.items():
|
||||
prev_room_sync_config = previous_connection_state.room_configs.get(
|
||||
room_id
|
||||
)
|
||||
if prev_room_sync_config is not None:
|
||||
# Always include rooms whose timeline limit has increased.
|
||||
if (
|
||||
prev_room_sync_config.timeline_limit
|
||||
< room_config.timeline_limit
|
||||
):
|
||||
rooms_should_send.add(room_id)
|
||||
continue
|
||||
|
||||
status = previous_connection_state.rooms.have_sent_room(room_id)
|
||||
if (
|
||||
# The room was never sent down before so the client needs to know
|
||||
@@ -819,12 +831,15 @@ class SlidingSyncHandler:
|
||||
if room_id in rooms_should_send
|
||||
}
|
||||
|
||||
new_connection_state = previous_connection_state.get_mutable()
|
||||
|
||||
@trace
|
||||
@tag_args
|
||||
async def handle_room(room_id: str) -> None:
|
||||
room_sync_result = await self.get_room_sync_data(
|
||||
sync_config=sync_config,
|
||||
previous_connection_state=previous_connection_state,
|
||||
new_connection_state=new_connection_state,
|
||||
room_id=room_id,
|
||||
room_sync_config=relevant_rooms_to_send_map[room_id],
|
||||
room_membership_for_user_at_to_token=room_membership_for_user_map[
|
||||
@@ -842,8 +857,6 @@ class SlidingSyncHandler:
|
||||
with start_active_span("sliding_sync.generate_room_entries"):
|
||||
await concurrently_execute(handle_room, relevant_rooms_to_send_map, 10)
|
||||
|
||||
new_connection_state = previous_connection_state.get_mutable()
|
||||
|
||||
extensions = await self.get_extensions_response(
|
||||
sync_config=sync_config,
|
||||
actual_lists=lists,
|
||||
@@ -1955,6 +1968,7 @@ class SlidingSyncHandler:
|
||||
self,
|
||||
sync_config: SlidingSyncConfig,
|
||||
previous_connection_state: "PerConnectionState",
|
||||
new_connection_state: "MutablePerConnectionState",
|
||||
room_id: str,
|
||||
room_sync_config: RoomSyncConfig,
|
||||
room_membership_for_user_at_to_token: _RoomMembershipForUser,
|
||||
@@ -1998,9 +2012,21 @@ class SlidingSyncHandler:
|
||||
# - For an incremental sync where we haven't sent it down this
|
||||
# connection before
|
||||
#
|
||||
# Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
|
||||
# Relevant spec issue:
|
||||
# https://github.com/matrix-org/matrix-spec/issues/1917
|
||||
#
|
||||
# XXX: Odd behavior - We also check if the `timeline_limit` has
|
||||
# increased, if so we ignore the from bound for the timeline to send
|
||||
# down a larger chunk of history and set `unstable_expanded_timeline` to
|
||||
# true. This is a bit different to the behavior of the Sliding Sync
|
||||
# proxy (which sets initial=true, but then doesn't send down the full
|
||||
# state again), but existing apps, e.g. ElementX, just need `limited`
|
||||
# set. In future this behavior is almost certainly going to change.
|
||||
#
|
||||
# TODO: Also handle changes to `required_state`
|
||||
from_bound = None
|
||||
initial = True
|
||||
ignore_timeline_bound = False
|
||||
if from_token and not room_membership_for_user_at_to_token.newly_joined:
|
||||
room_status = previous_connection_state.rooms.have_sent_room(room_id)
|
||||
if room_status.status == HaveSentRoomFlag.LIVE:
|
||||
@@ -2018,7 +2044,26 @@ class SlidingSyncHandler:
|
||||
|
||||
log_kv({"sliding_sync.room_status": room_status})
|
||||
|
||||
log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial})
|
||||
prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
|
||||
if prev_room_sync_config is not None:
|
||||
# Check if the timeline limit has increased, if so ignore the
|
||||
# timeline bound and record the change (see "XXX: Odd behavior"
|
||||
# above).
|
||||
if (
|
||||
prev_room_sync_config.timeline_limit
|
||||
< room_sync_config.timeline_limit
|
||||
):
|
||||
ignore_timeline_bound = True
|
||||
|
||||
# TODO: Check for changes in `required_state``
|
||||
|
||||
log_kv(
|
||||
{
|
||||
"sliding_sync.from_bound": from_bound,
|
||||
"sliding_sync.initial": initial,
|
||||
"sliding_sync.ignore_timeline_bound": ignore_timeline_bound,
|
||||
}
|
||||
)
|
||||
|
||||
# Assemble the list of timeline events
|
||||
#
|
||||
@@ -2055,6 +2100,10 @@ class SlidingSyncHandler:
|
||||
room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
|
||||
)
|
||||
|
||||
timeline_from_bound = from_bound
|
||||
if ignore_timeline_bound:
|
||||
timeline_from_bound = None
|
||||
|
||||
# For initial `/sync` (and other historical scenarios mentioned above), we
|
||||
# want to view a historical section of the timeline; to fetch events by
|
||||
# `topological_ordering` (best representation of the room DAG as others were
|
||||
@@ -2080,7 +2129,7 @@ class SlidingSyncHandler:
|
||||
pagination_method: PaginateFunction = (
|
||||
# Use `topographical_ordering` for historical events
|
||||
paginate_room_events_by_topological_ordering
|
||||
if from_bound is None
|
||||
if timeline_from_bound is None
|
||||
# Use `stream_ordering` for updates
|
||||
else paginate_room_events_by_stream_ordering
|
||||
)
|
||||
@@ -2090,7 +2139,7 @@ class SlidingSyncHandler:
|
||||
# (from newer to older events) starting at to_bound.
|
||||
# This ensures we fill the `limit` with the newest events first,
|
||||
from_key=to_bound,
|
||||
to_key=from_bound,
|
||||
to_key=timeline_from_bound,
|
||||
direction=Direction.BACKWARDS,
|
||||
# We add one so we can determine if there are enough events to saturate
|
||||
# the limit or not (see `limited`)
|
||||
@@ -2448,6 +2497,49 @@ class SlidingSyncHandler:
|
||||
if new_bump_event_pos.stream > 0:
|
||||
bump_stamp = new_bump_event_pos.stream
|
||||
|
||||
unstable_expanded_timeline = False
|
||||
prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
|
||||
if ignore_timeline_bound:
|
||||
# FIXME: We signal the fact that we're sending down more events to
|
||||
# the client by setting `unstable_expanded_timeline` to true (see
|
||||
# "XXX: Odd behavior" above).
|
||||
unstable_expanded_timeline = True
|
||||
|
||||
new_connection_state.room_configs[room_id] = RoomSyncConfig(
|
||||
timeline_limit=room_sync_config.timeline_limit,
|
||||
required_state_map=room_sync_config.required_state_map,
|
||||
)
|
||||
elif prev_room_sync_config is not None:
|
||||
# If the result is limited then we need to record that the timeline
|
||||
# limit has been reduced, as if the client later requests more
|
||||
# timeline then we have more data to send.
|
||||
#
|
||||
# Otherwise we don't need to record that the timeline_limit has been
|
||||
# reduced, as the *effective* timeline limit (i.e. the amount of
|
||||
# timeline we have previously sent) is at least the previous
|
||||
# timeline limit.
|
||||
#
|
||||
# This is to handle the case where the timeline limit e.g. goes from
|
||||
# 10 to 5 to 10 again (without any timeline gaps), where there's no
|
||||
# point sending down extra events when the timeline limit is
|
||||
# increased as the client already has the 10 previous events.
|
||||
# However, if is a gap (i.e. limited is True), then we *do* need to
|
||||
# record the reduced timeline.
|
||||
if (
|
||||
limited
|
||||
and prev_room_sync_config.timeline_limit
|
||||
> room_sync_config.timeline_limit
|
||||
):
|
||||
new_connection_state.room_configs[room_id] = RoomSyncConfig(
|
||||
timeline_limit=room_sync_config.timeline_limit,
|
||||
required_state_map=room_sync_config.required_state_map,
|
||||
)
|
||||
|
||||
# TODO: Record changes in required_state.
|
||||
|
||||
else:
|
||||
new_connection_state.room_configs[room_id] = room_sync_config
|
||||
|
||||
set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
|
||||
|
||||
return SlidingSyncResult.RoomResult(
|
||||
@@ -2462,6 +2554,7 @@ class SlidingSyncHandler:
|
||||
stripped_state=stripped_state,
|
||||
prev_batch=prev_batch_token,
|
||||
limited=limited,
|
||||
unstable_expanded_timeline=unstable_expanded_timeline,
|
||||
num_live=num_live,
|
||||
bump_stamp=bump_stamp,
|
||||
joined_count=room_membership_summary.get(
|
||||
@@ -3262,16 +3355,30 @@ class PerConnectionState:
|
||||
Attributes:
|
||||
rooms: The status of each room for the events stream.
|
||||
receipts: The status of each room for the receipts stream.
|
||||
room_configs: Map from room_id to the `RoomSyncConfig` of all
|
||||
rooms that we have previously sent down.
|
||||
"""
|
||||
|
||||
rooms: RoomStatusMap[RoomStreamToken] = attr.Factory(RoomStatusMap)
|
||||
receipts: RoomStatusMap[MultiWriterStreamToken] = attr.Factory(RoomStatusMap)
|
||||
|
||||
room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict)
|
||||
|
||||
def get_mutable(self) -> "MutablePerConnectionState":
|
||||
"""Get a mutable copy of this state."""
|
||||
room_configs = cast(MutableMapping[str, RoomSyncConfig], self.room_configs)
|
||||
|
||||
return MutablePerConnectionState(
|
||||
rooms=self.rooms.get_mutable(),
|
||||
receipts=self.receipts.get_mutable(),
|
||||
room_configs=ChainMap({}, room_configs),
|
||||
)
|
||||
|
||||
def copy(self) -> "PerConnectionState":
|
||||
return PerConnectionState(
|
||||
rooms=self.rooms.copy(),
|
||||
receipts=self.receipts.copy(),
|
||||
room_configs=dict(self.room_configs),
|
||||
)
|
||||
|
||||
|
||||
@@ -3282,8 +3389,18 @@ class MutablePerConnectionState(PerConnectionState):
|
||||
rooms: MutableRoomStatusMap[RoomStreamToken]
|
||||
receipts: MutableRoomStatusMap[MultiWriterStreamToken]
|
||||
|
||||
room_configs: typing.ChainMap[str, RoomSyncConfig]
|
||||
|
||||
def has_updates(self) -> bool:
|
||||
return bool(self.rooms.get_updates()) or bool(self.receipts.get_updates())
|
||||
return (
|
||||
bool(self.rooms.get_updates())
|
||||
or bool(self.receipts.get_updates())
|
||||
or bool(self.get_room_config_updates())
|
||||
)
|
||||
|
||||
def get_room_config_updates(self) -> Mapping[str, RoomSyncConfig]:
|
||||
"""Get updates to the room sync config"""
|
||||
return self.room_configs.maps[0]
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True)
|
||||
@@ -3367,7 +3484,6 @@ class SlidingSyncConnectionStore:
|
||||
) -> int:
|
||||
"""Record updated per-connection state, returning the connection
|
||||
position associated with the new state.
|
||||
|
||||
If there are no changes to the state this may return the same token as
|
||||
the existing per-connection state.
|
||||
"""
|
||||
@@ -3388,10 +3504,7 @@ class SlidingSyncConnectionStore:
|
||||
|
||||
# We copy the `MutablePerConnectionState` so that the inner `ChainMap`s
|
||||
# don't grow forever.
|
||||
sync_statuses[new_store_token] = PerConnectionState(
|
||||
rooms=new_connection_state.rooms.copy(),
|
||||
receipts=new_connection_state.receipts.copy(),
|
||||
)
|
||||
sync_statuses[new_store_token] = new_connection_state.copy()
|
||||
|
||||
return new_store_token
|
||||
|
||||
|
||||
@@ -1044,6 +1044,11 @@ class SlidingSyncRestServlet(RestServlet):
|
||||
if room_result.initial:
|
||||
serialized_rooms[room_id]["initial"] = room_result.initial
|
||||
|
||||
if room_result.unstable_expanded_timeline:
|
||||
serialized_rooms[room_id][
|
||||
"unstable_expanded_timeline"
|
||||
] = room_result.unstable_expanded_timeline
|
||||
|
||||
# This will be omitted for invite/knock rooms with `stripped_state`
|
||||
if (
|
||||
room_result.required_state is not None
|
||||
|
||||
@@ -171,6 +171,9 @@ class SlidingSyncResult:
|
||||
their local state. When there is an update, servers MUST omit this flag
|
||||
entirely and NOT send "initial":false as this is wasteful on bandwidth. The
|
||||
absence of this flag means 'false'.
|
||||
unstable_expanded_timeline: Flag which is set if we're returning more historic
|
||||
events due to the timeline limit having increased. See "XXX: Odd behavior"
|
||||
comment ing `synapse.handlers.sliding_sync`.
|
||||
required_state: The current state of the room
|
||||
timeline: Latest events in the room. The last event is the most recent.
|
||||
bundled_aggregations: A mapping of event ID to the bundled aggregations for
|
||||
@@ -219,6 +222,7 @@ class SlidingSyncResult:
|
||||
heroes: Optional[List[StrippedHero]]
|
||||
is_dm: bool
|
||||
initial: bool
|
||||
unstable_expanded_timeline: bool
|
||||
# Should be empty for invite/knock rooms with `stripped_state`
|
||||
required_state: List[EventBase]
|
||||
# Should be empty for invite/knock rooms with `stripped_state`
|
||||
|
||||
@@ -17,6 +17,7 @@ from typing import List, Optional
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
import synapse.rest.admin
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.rest.client import login, room, sync
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import StreamToken, StrSequence
|
||||
@@ -573,3 +574,138 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase):
|
||||
|
||||
# Nothing to see for this banned user in the room in the token range
|
||||
self.assertIsNone(response_body["rooms"].get(room_id1))
|
||||
|
||||
def test_increasing_timeline_range_sends_more_messages(self) -> None:
|
||||
"""
|
||||
Test that increasing the timeline limit via room subscriptions sends the
|
||||
room down with more messages in a limited sync.
|
||||
"""
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [[EventTypes.Create, ""]],
|
||||
"timeline_limit": 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
message_events = []
|
||||
for _ in range(10):
|
||||
resp = self.helper.send(room_id1, "msg", tok=user1_tok)
|
||||
message_events.append(resp["event_id"])
|
||||
|
||||
# Make the first Sliding Sync request
|
||||
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
room_response = response_body["rooms"][room_id1]
|
||||
|
||||
self.assertEqual(room_response["initial"], True)
|
||||
self.assertNotIn("unstable_expanded_timeline", room_response)
|
||||
self.assertEqual(room_response["limited"], True)
|
||||
|
||||
# We only expect the last message at first
|
||||
self._assertTimelineEqual(
|
||||
room_id=room_id1,
|
||||
actual_event_ids=[event["event_id"] for event in room_response["timeline"]],
|
||||
expected_event_ids=message_events[-1:],
|
||||
message=str(room_response["timeline"]),
|
||||
)
|
||||
|
||||
# We also expect to get the create event state.
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
room_response["required_state"],
|
||||
{state_map[(EventTypes.Create, "")]},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Now do another request with a room subscription with an increased timeline limit
|
||||
sync_body["room_subscriptions"] = {
|
||||
room_id1: {
|
||||
"required_state": [],
|
||||
"timeline_limit": 10,
|
||||
}
|
||||
}
|
||||
|
||||
response_body, from_token = self.do_sync(
|
||||
sync_body, since=from_token, tok=user1_tok
|
||||
)
|
||||
room_response = response_body["rooms"][room_id1]
|
||||
|
||||
self.assertNotIn("initial", room_response)
|
||||
self.assertEqual(room_response["unstable_expanded_timeline"], True)
|
||||
self.assertEqual(room_response["limited"], True)
|
||||
|
||||
# Now we expect all the messages
|
||||
self._assertTimelineEqual(
|
||||
room_id=room_id1,
|
||||
actual_event_ids=[event["event_id"] for event in room_response["timeline"]],
|
||||
expected_event_ids=message_events,
|
||||
message=str(room_response["timeline"]),
|
||||
)
|
||||
|
||||
# We don't expect to get the room create down, as nothing has changed.
|
||||
self.assertNotIn("required_state", room_response)
|
||||
|
||||
# Decreasing the timeline limit shouldn't resend any events
|
||||
sync_body["room_subscriptions"] = {
|
||||
room_id1: {
|
||||
"required_state": [],
|
||||
"timeline_limit": 5,
|
||||
}
|
||||
}
|
||||
|
||||
event_response = self.helper.send(room_id1, "msg", tok=user1_tok)
|
||||
latest_event_id = event_response["event_id"]
|
||||
|
||||
response_body, from_token = self.do_sync(
|
||||
sync_body, since=from_token, tok=user1_tok
|
||||
)
|
||||
room_response = response_body["rooms"][room_id1]
|
||||
|
||||
self.assertNotIn("initial", room_response)
|
||||
self.assertNotIn("unstable_expanded_timeline", room_response)
|
||||
self.assertEqual(room_response["limited"], False)
|
||||
|
||||
self._assertTimelineEqual(
|
||||
room_id=room_id1,
|
||||
actual_event_ids=[event["event_id"] for event in room_response["timeline"]],
|
||||
expected_event_ids=[latest_event_id],
|
||||
message=str(room_response["timeline"]),
|
||||
)
|
||||
|
||||
# Increasing the limit to what it was before also should not resend any
|
||||
# events
|
||||
sync_body["room_subscriptions"] = {
|
||||
room_id1: {
|
||||
"required_state": [],
|
||||
"timeline_limit": 10,
|
||||
}
|
||||
}
|
||||
|
||||
event_response = self.helper.send(room_id1, "msg", tok=user1_tok)
|
||||
latest_event_id = event_response["event_id"]
|
||||
|
||||
response_body, from_token = self.do_sync(
|
||||
sync_body, since=from_token, tok=user1_tok
|
||||
)
|
||||
room_response = response_body["rooms"][room_id1]
|
||||
|
||||
self.assertNotIn("initial", room_response)
|
||||
self.assertNotIn("unstable_expanded_timeline", room_response)
|
||||
self.assertEqual(room_response["limited"], False)
|
||||
|
||||
self._assertTimelineEqual(
|
||||
room_id=room_id1,
|
||||
actual_event_ids=[event["event_id"] for event in room_response["timeline"]],
|
||||
expected_event_ids=[latest_event_id],
|
||||
message=str(room_response["timeline"]),
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user