mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Compare commits
3 Commits
madlittlem
...
hughns/ind
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
98c27ab824 | ||
|
|
70d166e7b5 | ||
|
|
96425d4071 |
@@ -1397,6 +1397,7 @@ class SyncHandler:
|
||||
timeline_contains=timeline_state,
|
||||
timeline_start=state_at_timeline_start,
|
||||
timeline_end=state_at_timeline_end,
|
||||
previous_timeline_start={},
|
||||
previous_timeline_end={},
|
||||
lazy_load_members=lazy_load_members,
|
||||
)
|
||||
@@ -1535,6 +1536,17 @@ class SyncHandler:
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
|
||||
state_at_previous_sync_start = (
|
||||
{}
|
||||
if since_token.prev_batch is None
|
||||
else await self._state_storage_controller.get_state_ids_at(
|
||||
room_id,
|
||||
stream_position=since_token.prev_batch,
|
||||
state_filter=state_filter,
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
)
|
||||
|
||||
state_at_timeline_end = await self._state_storage_controller.get_state_ids_at(
|
||||
room_id,
|
||||
stream_position=end_token,
|
||||
@@ -1546,6 +1558,7 @@ class SyncHandler:
|
||||
timeline_contains=timeline_state,
|
||||
timeline_start=state_at_timeline_start,
|
||||
timeline_end=state_at_timeline_end,
|
||||
previous_timeline_start=state_at_previous_sync_start,
|
||||
previous_timeline_end=state_at_previous_sync,
|
||||
lazy_load_members=lazy_load_members,
|
||||
)
|
||||
@@ -1965,7 +1978,7 @@ class SyncHandler:
|
||||
# this is due to some of the underlying streams not supporting the ability
|
||||
# to query up to a given point.
|
||||
# Always use the `now_token` in `SyncResultBuilder`
|
||||
now_token = self.event_sources.get_current_token()
|
||||
now_token = self.event_sources.get_current_token(prev_batch=since_token)
|
||||
log_kv({"now_token": now_token})
|
||||
|
||||
# Since we fetched the users room list before calculating the `now_token` (see
|
||||
@@ -2980,6 +2993,7 @@ def _calculate_state(
|
||||
timeline_contains: StateMap[str],
|
||||
timeline_start: StateMap[str],
|
||||
timeline_end: StateMap[str],
|
||||
previous_timeline_start: StateMap[str],
|
||||
previous_timeline_end: StateMap[str],
|
||||
lazy_load_members: bool,
|
||||
) -> StateMap[str]:
|
||||
@@ -3007,6 +3021,7 @@ def _calculate_state(
|
||||
|
||||
timeline_end_ids = set(timeline_end.values())
|
||||
timeline_start_ids = set(timeline_start.values())
|
||||
previous_timeline_start_ids = set(previous_timeline_start.values())
|
||||
previous_timeline_end_ids = set(previous_timeline_end.values())
|
||||
timeline_contains_ids = set(timeline_contains.values())
|
||||
|
||||
@@ -3082,7 +3097,7 @@ def _calculate_state(
|
||||
|
||||
state_ids = (
|
||||
(timeline_end_ids | timeline_start_ids)
|
||||
- previous_timeline_end_ids
|
||||
- (previous_timeline_end_ids | previous_timeline_start_ids)
|
||||
- timeline_contains_ids
|
||||
)
|
||||
|
||||
|
||||
@@ -77,7 +77,7 @@ class EventSources:
|
||||
self.store = hs.get_datastores().main
|
||||
self._instance_name = hs.get_instance_name()
|
||||
|
||||
def get_current_token(self) -> StreamToken:
|
||||
def get_current_token(self, prev_batch: StreamToken = None) -> StreamToken:
|
||||
push_rules_key = self.store.get_max_push_rules_stream_id()
|
||||
to_device_key = self.store.get_to_device_stream_token()
|
||||
device_list_key = self.store.get_device_stream_token()
|
||||
@@ -97,6 +97,7 @@ class EventSources:
|
||||
# Groups key is unused.
|
||||
groups_key=0,
|
||||
un_partial_stated_rooms_key=un_partial_stated_rooms_key,
|
||||
prev_batch=prev_batch,
|
||||
)
|
||||
return token
|
||||
|
||||
|
||||
@@ -980,16 +980,30 @@ class StreamToken:
|
||||
groups_key: int
|
||||
un_partial_stated_rooms_key: int
|
||||
|
||||
prev_batch: Optional["StreamToken"] = None
|
||||
_BATCH_SEPARATOR = "~"
|
||||
|
||||
_SEPARATOR = "_"
|
||||
START: ClassVar["StreamToken"]
|
||||
|
||||
@classmethod
|
||||
@cancellable
|
||||
async def from_string(cls, store: "DataStore", string: str) -> "StreamToken":
|
||||
async def from_string(
|
||||
cls, store: "DataStore", string: str, prev_batch: Optional["StreamToken"] = None
|
||||
) -> "StreamToken":
|
||||
"""
|
||||
Creates a RoomStreamToken from its textual representation.
|
||||
"""
|
||||
try:
|
||||
if string.count(cls._BATCH_SEPARATOR) == 1:
|
||||
# We have a prev_token
|
||||
batches = string.split(cls._BATCH_SEPARATOR)
|
||||
prev_batch = await StreamToken.from_string(store, batches[1])
|
||||
batch = await StreamToken.from_string(
|
||||
store, batches[0], prev_batch=prev_batch
|
||||
)
|
||||
return batch
|
||||
|
||||
keys = string.split(cls._SEPARATOR)
|
||||
while len(keys) < len(attr.fields(cls)):
|
||||
# i.e. old token from before receipt_key
|
||||
@@ -1006,6 +1020,7 @@ class StreamToken:
|
||||
device_list_key,
|
||||
groups_key,
|
||||
un_partial_stated_rooms_key,
|
||||
prev_batch,
|
||||
) = keys
|
||||
|
||||
return cls(
|
||||
@@ -1025,24 +1040,34 @@ class StreamToken:
|
||||
except Exception:
|
||||
raise SynapseError(400, "Invalid stream token")
|
||||
|
||||
async def to_string(self, store: "DataStore") -> str:
|
||||
return self._SEPARATOR.join(
|
||||
[
|
||||
await self.room_key.to_string(store),
|
||||
str(self.presence_key),
|
||||
str(self.typing_key),
|
||||
await self.receipt_key.to_string(store),
|
||||
str(self.account_data_key),
|
||||
str(self.push_rules_key),
|
||||
str(self.to_device_key),
|
||||
str(self.device_list_key),
|
||||
# Note that the groups key is no longer used, but it is still
|
||||
# serialized so that there will not be confusion in the future
|
||||
# if additional tokens are added.
|
||||
str(self.groups_key),
|
||||
str(self.un_partial_stated_rooms_key),
|
||||
]
|
||||
)
|
||||
async def to_string(
|
||||
self, store: "DataStore", include_prev_batch: bool = True
|
||||
) -> str:
|
||||
if include_prev_batch and self.prev_batch:
|
||||
return self._BATCH_SEPARATOR.join(
|
||||
[
|
||||
await self.to_string(store, include_prev_batch=False),
|
||||
await self.prev_batch.to_string(store, include_prev_batch=False),
|
||||
]
|
||||
)
|
||||
else:
|
||||
return self._SEPARATOR.join(
|
||||
[
|
||||
await self.room_key.to_string(store),
|
||||
str(self.presence_key),
|
||||
str(self.typing_key),
|
||||
await self.receipt_key.to_string(store),
|
||||
str(self.account_data_key),
|
||||
str(self.push_rules_key),
|
||||
str(self.to_device_key),
|
||||
str(self.device_list_key),
|
||||
# Note that the groups key is no longer used, but it is still
|
||||
# serialized so that there will not be confusion in the future
|
||||
# if additional tokens are added.
|
||||
str(self.groups_key),
|
||||
str(self.un_partial_stated_rooms_key),
|
||||
]
|
||||
)
|
||||
|
||||
@property
|
||||
def room_stream_id(self) -> int:
|
||||
|
||||
@@ -710,9 +710,293 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
[e.event_id for e in room_sync.timeline.events],
|
||||
[e4_event, e5_event],
|
||||
)
|
||||
|
||||
def test_state_after_on_branches_winner_at_end_of_timeline(self) -> None:
|
||||
r"""Test `state` and `state_after` where not all information is in `state` + `timeline`.
|
||||
|
||||
-----|---------- initial sync
|
||||
|
|
||||
unrelated state event
|
||||
|
|
||||
S1
|
||||
-----|---------- incremental sync 1
|
||||
↗ ↖
|
||||
| S2
|
||||
--|------|------ incremental sync 2
|
||||
E3 E4
|
||||
--|------|------ incremental sync 3
|
||||
| |
|
||||
\ ↗ S2 wins
|
||||
E5
|
||||
-----|---------- incremental sync 4
|
||||
|
||||
The "interesting" sync is sync 3. At the end of sync 3 the server doesn't know which branch will win.
|
||||
|
||||
"""
|
||||
alice = self.register_user("alice", "password")
|
||||
alice_tok = self.login(alice, "password")
|
||||
alice_requester = create_requester(alice)
|
||||
room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok)
|
||||
|
||||
# Do an initial sync to get a known starting point.
|
||||
initial_sync_result = self.get_success(
|
||||
self.sync_handler.wait_for_sync_for_user(
|
||||
alice_requester,
|
||||
generate_sync_config(alice),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
)
|
||||
)
|
||||
|
||||
# Send an unrelated state event which doesn't change across the branches
|
||||
unrelated_state_event = self.helper.send_state(
|
||||
room_id, "m.something.else", {"node": "S1"}, tok=alice_tok
|
||||
)["event_id"]
|
||||
|
||||
# Send S1
|
||||
s1_event = self.helper.send_state(
|
||||
room_id, "m.call.member", {"node": "S1"}, tok=alice_tok
|
||||
)["event_id"]
|
||||
|
||||
# Incremental sync 1
|
||||
incremental_sync = self.get_success(
|
||||
self.sync_handler.wait_for_sync_for_user(
|
||||
alice_requester,
|
||||
generate_sync_config(alice),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
since_token=initial_sync_result.next_batch,
|
||||
)
|
||||
)
|
||||
room_sync = incremental_sync.joined[0]
|
||||
|
||||
self.assertEqual(room_sync.room_id, room_id)
|
||||
self.assertEqual(room_sync.state, {})
|
||||
self.assertEqual(
|
||||
[e.event_id for e in room_sync.timeline.events],
|
||||
[unrelated_state_event, s1_event],
|
||||
)
|
||||
|
||||
# Send S2 -> S1
|
||||
s2_event = self.helper.send_state(
|
||||
room_id, "m.call.member", {"node": "S2"}, tok=alice_tok
|
||||
)["event_id"]
|
||||
|
||||
# Incremental sync 2
|
||||
incremental_sync = self.get_success(
|
||||
self.sync_handler.wait_for_sync_for_user(
|
||||
alice_requester,
|
||||
generate_sync_config(alice),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
since_token=incremental_sync.next_batch,
|
||||
)
|
||||
)
|
||||
room_sync = incremental_sync.joined[0]
|
||||
|
||||
self.assertEqual(room_sync.room_id, room_id)
|
||||
self.assertEqual(room_sync.state, {})
|
||||
self.assertEqual(
|
||||
[e.event_id for e in room_sync.timeline.events],
|
||||
[s2_event],
|
||||
)
|
||||
|
||||
# Send two regular events on different branches:
|
||||
# E3 -> S1
|
||||
# E4 -> S2
|
||||
with self._patch_get_latest_events([s1_event]):
|
||||
e3_event = self.helper.send(room_id, "E3", tok=alice_tok)["event_id"]
|
||||
with self._patch_get_latest_events([s2_event]):
|
||||
e4_event = self.helper.send(room_id, "E4", tok=alice_tok)["event_id"]
|
||||
|
||||
# Incremental sync 3
|
||||
incremental_sync = self.get_success(
|
||||
self.sync_handler.wait_for_sync_for_user(
|
||||
alice_requester,
|
||||
generate_sync_config(alice),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
since_token=incremental_sync.next_batch,
|
||||
)
|
||||
)
|
||||
room_sync = incremental_sync.joined[0]
|
||||
|
||||
self.assertEqual(room_sync.room_id, room_id)
|
||||
self.assertEqual(room_sync.state, {})
|
||||
self.assertEqual(
|
||||
[e.event_id for e in room_sync.timeline.events],
|
||||
[
|
||||
e3_event,
|
||||
e4_event,
|
||||
], # We have two events from different timelines neither of which are state events
|
||||
)
|
||||
|
||||
# Send E5 which resolves the branches
|
||||
e5_event = self.helper.send(room_id, "E5", tok=alice_tok)["event_id"]
|
||||
|
||||
# Incremental sync 4
|
||||
incremental_sync = self.get_success(
|
||||
self.sync_handler.wait_for_sync_for_user(
|
||||
alice_requester,
|
||||
generate_sync_config(alice),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
since_token=incremental_sync.next_batch,
|
||||
)
|
||||
)
|
||||
room_sync = incremental_sync.joined[0]
|
||||
|
||||
self.assertEqual(room_sync.room_id, room_id)
|
||||
self.assertEqual(room_sync.state, {})
|
||||
self.assertEqual(
|
||||
[e.event_id for e in room_sync.timeline.events],
|
||||
[e5_event],
|
||||
)
|
||||
# FIXED: S2 is the winning state event but and the last that the client saw!
|
||||
|
||||
def test_state_after_on_branches_winner_at_start_of_timeline(self) -> None:
|
||||
r"""Test `state` and `state_after` where not all information is in `state` + `timeline`.
|
||||
|
||||
-----|---------- initial sync
|
||||
|
|
||||
S1
|
||||
-----|---------- incremental sync 1
|
||||
↗ ↖
|
||||
| S2
|
||||
--|------|------ incremental sync 2
|
||||
S3 E4
|
||||
--|------|------ incremental sync 3
|
||||
| |
|
||||
↖ / S3 wins
|
||||
E5
|
||||
-----|---------- incremental sync 4
|
||||
|
||||
The "interesting" sync is sync 3. At the end of sync 3 the server doesn't know which branch will win.
|
||||
|
||||
"""
|
||||
alice = self.register_user("alice", "password")
|
||||
alice_tok = self.login(alice, "password")
|
||||
alice_requester = create_requester(alice)
|
||||
room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok)
|
||||
|
||||
# Do an initial sync to get a known starting point.
|
||||
initial_sync_result = self.get_success(
|
||||
self.sync_handler.wait_for_sync_for_user(
|
||||
alice_requester,
|
||||
generate_sync_config(alice),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
)
|
||||
)
|
||||
|
||||
# Send an unrelated state event which doesn't change across the branches
|
||||
unrelated_state_event = self.helper.send_state(
|
||||
room_id, "m.something.else", {"node": "S1"}, tok=alice_tok
|
||||
)["event_id"]
|
||||
|
||||
# Send S1
|
||||
s1_event = self.helper.send_state(
|
||||
room_id, "m.call.member", {"node": "S1"}, tok=alice_tok
|
||||
)["event_id"]
|
||||
|
||||
# Incremental sync 1
|
||||
incremental_sync = self.get_success(
|
||||
self.sync_handler.wait_for_sync_for_user(
|
||||
alice_requester,
|
||||
generate_sync_config(alice),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
since_token=initial_sync_result.next_batch,
|
||||
)
|
||||
)
|
||||
room_sync = incremental_sync.joined[0]
|
||||
|
||||
self.assertEqual(room_sync.room_id, room_id)
|
||||
self.assertEqual(room_sync.state, {})
|
||||
self.assertEqual(
|
||||
[e.event_id for e in room_sync.timeline.events],
|
||||
[unrelated_state_event, s1_event],
|
||||
)
|
||||
|
||||
# Send S2 -> S1
|
||||
s2_event = self.helper.send_state(
|
||||
room_id, "m.call.member", {"node": "S2"}, tok=alice_tok
|
||||
)["event_id"]
|
||||
|
||||
# Incremental sync 2
|
||||
incremental_sync = self.get_success(
|
||||
self.sync_handler.wait_for_sync_for_user(
|
||||
alice_requester,
|
||||
generate_sync_config(alice),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
since_token=incremental_sync.next_batch,
|
||||
)
|
||||
)
|
||||
room_sync = incremental_sync.joined[0]
|
||||
|
||||
self.assertEqual(room_sync.room_id, room_id)
|
||||
self.assertEqual(room_sync.state, {})
|
||||
self.assertEqual(
|
||||
[e.event_id for e in room_sync.timeline.events],
|
||||
[s2_event],
|
||||
)
|
||||
|
||||
# Send two events on different branches:
|
||||
# S3 -> S1
|
||||
# E4 -> S2
|
||||
with self._patch_get_latest_events([s1_event]):
|
||||
s3_event = self.helper.send_state(
|
||||
room_id, "m.call.member", {"node": "S3"}, tok=alice_tok
|
||||
)["event_id"]
|
||||
with self._patch_get_latest_events([s2_event]):
|
||||
e4_event = self.helper.send(room_id, "E4", tok=alice_tok)["event_id"]
|
||||
|
||||
# Incremental sync 3
|
||||
incremental_sync = self.get_success(
|
||||
self.sync_handler.wait_for_sync_for_user(
|
||||
alice_requester,
|
||||
generate_sync_config(alice),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
since_token=incremental_sync.next_batch,
|
||||
)
|
||||
)
|
||||
room_sync = incremental_sync.joined[0]
|
||||
|
||||
self.assertEqual(room_sync.room_id, room_id)
|
||||
self.assertEqual(room_sync.state, {})
|
||||
self.assertEqual(
|
||||
[e.event_id for e in room_sync.timeline.events],
|
||||
[
|
||||
s3_event,
|
||||
e4_event,
|
||||
], # We have two events from different timelines
|
||||
)
|
||||
|
||||
# Send E5 which resolves the branches with S3 winning
|
||||
e5_event = self.helper.send(room_id, "E5", tok=alice_tok)["event_id"]
|
||||
|
||||
# Incremental sync 4
|
||||
incremental_sync = self.get_success(
|
||||
self.sync_handler.wait_for_sync_for_user(
|
||||
alice_requester,
|
||||
generate_sync_config(alice),
|
||||
sync_version=SyncVersion.SYNC_V2,
|
||||
request_key=generate_request_key(),
|
||||
since_token=incremental_sync.next_batch,
|
||||
)
|
||||
)
|
||||
room_sync = incremental_sync.joined[0]
|
||||
|
||||
self.assertEqual(room_sync.room_id, room_id)
|
||||
self.assertEqual(
|
||||
[e.event_id for e in room_sync.state.values()],
|
||||
[s2_event],
|
||||
[s3_event], # S3 is the winning state event
|
||||
)
|
||||
self.assertEqual(
|
||||
[e.event_id for e in room_sync.timeline.events],
|
||||
[e5_event],
|
||||
)
|
||||
|
||||
@parameterized.expand(
|
||||
|
||||
Reference in New Issue
Block a user