mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-07 01:20:16 +00:00
Compare commits
11 Commits
anoa/allow
...
mv/non-ll-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
46062d8019 | ||
|
|
7b60abea53 | ||
|
|
6472178a41 | ||
|
|
fbfafca0ab | ||
|
|
1cca9eb32b | ||
|
|
6023bee27f | ||
|
|
33d9642c0e | ||
|
|
4441f5b991 | ||
|
|
b01ec64600 | ||
|
|
661e25bfa4 | ||
|
|
3bad02fbfe |
1
changelog.d/14831.feature
Normal file
1
changelog.d/14831.feature
Normal file
@@ -0,0 +1 @@
|
||||
Faster joins: non lazy-loading syncs will return immediately after a faster join, by omitting partial state rooms until we acquire their full state.
|
||||
@@ -1726,15 +1726,16 @@ class FederationHandler:
|
||||
await self._device_handler.handle_room_un_partial_stated(room_id)
|
||||
|
||||
logger.info("Clearing partial-state flag for %s", room_id)
|
||||
success = await self.store.clear_partial_state_room(room_id)
|
||||
if success:
|
||||
new_stream_id = await self.store.clear_partial_state_room(room_id)
|
||||
if new_stream_id is not None:
|
||||
logger.info("State resync complete for %s", room_id)
|
||||
self._storage_controllers.state.notify_room_un_partial_stated(
|
||||
room_id
|
||||
)
|
||||
# Poke the notifier so that other workers see the write to
|
||||
# the un-partial-stated rooms stream.
|
||||
self._notifier.notify_replication()
|
||||
|
||||
await self._notifier.on_un_partial_stated_room(
|
||||
room_id, new_stream_id
|
||||
)
|
||||
|
||||
# TODO(faster_joins) update room stats and user directory?
|
||||
# https://github.com/matrix-org/synapse/issues/12814
|
||||
|
||||
@@ -1613,9 +1613,9 @@ class SyncHandler:
|
||||
now_token = sync_result_builder.now_token
|
||||
since_stream_id = 0
|
||||
if sync_result_builder.since_token is not None:
|
||||
since_stream_id = int(sync_result_builder.since_token.to_device_key)
|
||||
since_stream_id = sync_result_builder.since_token.to_device_key
|
||||
|
||||
if device_id is not None and since_stream_id != int(now_token.to_device_key):
|
||||
if device_id is not None and since_stream_id != now_token.to_device_key:
|
||||
messages, stream_id = await self.store.get_messages_for_device(
|
||||
user_id, device_id, since_stream_id, now_token.to_device_key
|
||||
)
|
||||
@@ -1684,7 +1684,7 @@ class SyncHandler:
|
||||
)
|
||||
|
||||
push_rules_changed = await self.store.have_push_rules_changed_for_user(
|
||||
user_id, int(since_token.push_rules_key)
|
||||
user_id, since_token.push_rules_key
|
||||
)
|
||||
|
||||
if push_rules_changed:
|
||||
@@ -1817,11 +1817,35 @@ class SyncHandler:
|
||||
)
|
||||
sync_result_builder.now_token = now_token
|
||||
|
||||
# Retrieve rooms that got un partial stated in the meantime, only useful in case
|
||||
# of a non lazy-loading-members sync.
|
||||
# We also skip calculating that in case of initial sync since we don't need it.
|
||||
un_partial_stated_rooms = set()
|
||||
if (
|
||||
since_token
|
||||
and not sync_result_builder.sync_config.filter_collection.lazy_load_members()
|
||||
):
|
||||
un_partial_stated_rooms_since = 0
|
||||
if sync_result_builder.since_token is not None:
|
||||
un_partial_stated_rooms_since = (
|
||||
sync_result_builder.since_token.un_partial_stated_rooms_key
|
||||
)
|
||||
|
||||
un_partial_stated_rooms = (
|
||||
await self.store.get_un_partial_stated_rooms_between(
|
||||
un_partial_stated_rooms_since,
|
||||
sync_result_builder.now_token.un_partial_stated_rooms_key,
|
||||
sync_result_builder.joined_room_ids,
|
||||
)
|
||||
)
|
||||
|
||||
# 2. We check up front if anything has changed, if it hasn't then there is
|
||||
# no point in going further.
|
||||
if not sync_result_builder.full_state:
|
||||
if since_token and not ephemeral_by_room and not account_data_by_room:
|
||||
have_changed = await self._have_rooms_changed(sync_result_builder)
|
||||
have_changed = await self._have_rooms_changed(
|
||||
sync_result_builder, un_partial_stated_rooms
|
||||
)
|
||||
log_kv({"rooms_have_changed": have_changed})
|
||||
if not have_changed:
|
||||
tags_by_room = await self.store.get_updated_tags(
|
||||
@@ -1835,7 +1859,7 @@ class SyncHandler:
|
||||
ignored_users = await self.store.ignored_users(user_id)
|
||||
if since_token:
|
||||
room_changes = await self._get_rooms_changed(
|
||||
sync_result_builder, ignored_users
|
||||
sync_result_builder, ignored_users, un_partial_stated_rooms
|
||||
)
|
||||
tags_by_room = await self.store.get_updated_tags(
|
||||
user_id, since_token.account_data_key
|
||||
@@ -1888,7 +1912,9 @@ class SyncHandler:
|
||||
)
|
||||
|
||||
async def _have_rooms_changed(
|
||||
self, sync_result_builder: "SyncResultBuilder"
|
||||
self,
|
||||
sync_result_builder: "SyncResultBuilder",
|
||||
un_partial_stated_rooms: AbstractSet[str],
|
||||
) -> bool:
|
||||
"""Returns whether there may be any new events that should be sent down
|
||||
the sync. Returns True if there are.
|
||||
@@ -1905,6 +1931,11 @@ class SyncHandler:
|
||||
|
||||
stream_id = since_token.room_key.stream
|
||||
for room_id in sync_result_builder.joined_room_ids:
|
||||
# If a room has been un partial stated during the sync period,
|
||||
# assume it has seen some kind of change. We'll process that
|
||||
# change later, in _get_rooms_changed.
|
||||
if room_id in un_partial_stated_rooms:
|
||||
return True
|
||||
if self.store.has_room_changed_since(room_id, stream_id):
|
||||
return True
|
||||
return False
|
||||
@@ -1913,6 +1944,7 @@ class SyncHandler:
|
||||
self,
|
||||
sync_result_builder: "SyncResultBuilder",
|
||||
ignored_users: FrozenSet[str],
|
||||
un_partial_stated_rooms: AbstractSet[str],
|
||||
) -> _RoomChanges:
|
||||
"""Determine the changes in rooms to report to the user.
|
||||
|
||||
@@ -2116,7 +2148,25 @@ class SyncHandler:
|
||||
room_entry = room_to_events.get(room_id, None)
|
||||
|
||||
newly_joined = room_id in newly_joined_rooms
|
||||
if room_entry:
|
||||
|
||||
# Partially joined rooms are omitted from non lazy-loading-members
|
||||
# syncs until the resync completes and that room is fully stated.
|
||||
# When that happens, we need to include their full state in
|
||||
# the next non-lazy-loading sync.
|
||||
if (
|
||||
not sync_config.filter_collection.lazy_load_members()
|
||||
and room_id in un_partial_stated_rooms
|
||||
):
|
||||
entry = RoomSyncResultBuilder(
|
||||
room_id=room_id,
|
||||
rtype="joined",
|
||||
events=None,
|
||||
newly_joined=True,
|
||||
full_state=True,
|
||||
since_token=None,
|
||||
upto_token=now_token,
|
||||
)
|
||||
elif room_entry:
|
||||
events, start_key = room_entry
|
||||
|
||||
prev_batch_token = now_token.copy_and_replace(
|
||||
@@ -2186,6 +2236,13 @@ class SyncHandler:
|
||||
knocked = []
|
||||
|
||||
for event in room_list:
|
||||
# Do not include rooms that we don't have the full state yet
|
||||
# in case of non lazy-loading-members sync.
|
||||
if (
|
||||
not sync_config.filter_collection.lazy_load_members()
|
||||
) and await self.store.is_partial_state_room(event.room_id):
|
||||
continue
|
||||
|
||||
if event.room_version_id not in KNOWN_ROOM_VERSIONS:
|
||||
continue
|
||||
|
||||
|
||||
@@ -315,6 +315,32 @@ class Notifier:
|
||||
event_entries.append((entry, event.event_id))
|
||||
await self.notify_new_room_events(event_entries, max_room_stream_token)
|
||||
|
||||
async def on_un_partial_stated_room(
|
||||
self,
|
||||
room_id: str,
|
||||
new_token: int,
|
||||
) -> None:
|
||||
"""Used by the resync background processes to wake up all listeners
|
||||
of this room that it just got un-partial-stated.
|
||||
|
||||
It will also notify replication listeners of the change in stream.
|
||||
"""
|
||||
|
||||
# Wake up all related user stream notifiers
|
||||
user_streams = self.room_to_user_streams.get(room_id, set())
|
||||
time_now_ms = self.clock.time_msec()
|
||||
for user_stream in user_streams:
|
||||
try:
|
||||
user_stream.notify(
|
||||
StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to notify listener")
|
||||
|
||||
# Poke the replication so that other workers also see the write to
|
||||
# the un-partial-stated rooms stream.
|
||||
self.notify_replication()
|
||||
|
||||
async def notify_new_room_events(
|
||||
self,
|
||||
event_entries: List[Tuple[_PendingRoomEventEntry, str]],
|
||||
|
||||
@@ -292,6 +292,7 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||
to_device_key=0,
|
||||
device_list_key=0,
|
||||
groups_key=0,
|
||||
un_partial_stated_rooms_key=0,
|
||||
)
|
||||
|
||||
return events[:limit], next_token
|
||||
|
||||
@@ -26,6 +26,7 @@ from typing import (
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Set,
|
||||
Tuple,
|
||||
Union,
|
||||
cast,
|
||||
@@ -1285,10 +1286,44 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
||||
# explanation.)
|
||||
return self._un_partial_stated_rooms_stream_id_gen.get_current_token()
|
||||
|
||||
async def get_un_partial_stated_rooms_between(
|
||||
self, last_id: int, current_id: int, room_ids: Collection[str]
|
||||
) -> Set[str]:
|
||||
"""Get all rooms that got un partial stated between `last_id` exclusive and
|
||||
`current_id` inclusive.
|
||||
|
||||
Returns:
|
||||
The list of room ids.
|
||||
"""
|
||||
|
||||
if last_id == current_id:
|
||||
return set()
|
||||
|
||||
def _get_un_partial_stated_rooms_between_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Set[str]:
|
||||
sql = """
|
||||
SELECT DISTINCT room_id FROM un_partial_stated_room_stream
|
||||
WHERE ? < stream_id AND stream_id <= ? AND
|
||||
"""
|
||||
|
||||
clause, args = make_in_list_sql_clause(
|
||||
self.database_engine, "room_id", room_ids
|
||||
)
|
||||
|
||||
txn.execute(sql + clause, [last_id, current_id] + list(args))
|
||||
|
||||
return {r[0] for r in txn}
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_un_partial_stated_rooms_between",
|
||||
_get_un_partial_stated_rooms_between_txn,
|
||||
)
|
||||
|
||||
async def get_un_partial_stated_rooms_from_stream(
|
||||
self, instance_name: str, last_id: int, current_id: int, limit: int
|
||||
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
|
||||
"""Get updates for caches replication stream.
|
||||
"""Get updates for un partial stated rooms replication stream.
|
||||
|
||||
Args:
|
||||
instance_name: The writer we want to fetch updates from. Unused
|
||||
@@ -2295,16 +2330,16 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
|
||||
(room_id,),
|
||||
)
|
||||
|
||||
async def clear_partial_state_room(self, room_id: str) -> bool:
|
||||
async def clear_partial_state_room(self, room_id: str) -> Optional[int]:
|
||||
"""Clears the partial state flag for a room.
|
||||
|
||||
Args:
|
||||
room_id: The room whose partial state flag is to be cleared.
|
||||
|
||||
Returns:
|
||||
`True` if the partial state flag has been cleared successfully.
|
||||
The corresponding stream id for the un-partial-stated rooms stream.
|
||||
|
||||
`False` if the partial state flag could not be cleared because the room
|
||||
`None` if the partial state flag could not be cleared because the room
|
||||
still contains events with partial state.
|
||||
"""
|
||||
try:
|
||||
@@ -2315,7 +2350,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
|
||||
room_id,
|
||||
un_partial_state_room_stream_id,
|
||||
)
|
||||
return True
|
||||
return un_partial_state_room_stream_id
|
||||
except self.db_pool.engine.module.IntegrityError as e:
|
||||
# Assume that any `IntegrityError`s are due to partial state events.
|
||||
logger.info(
|
||||
@@ -2323,7 +2358,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
|
||||
room_id,
|
||||
e,
|
||||
)
|
||||
return False
|
||||
return None
|
||||
|
||||
def _clear_partial_state_room_txn(
|
||||
self,
|
||||
|
||||
@@ -58,6 +58,7 @@ class EventSources:
|
||||
push_rules_key = self.store.get_max_push_rules_stream_id()
|
||||
to_device_key = self.store.get_to_device_stream_token()
|
||||
device_list_key = self.store.get_device_stream_token()
|
||||
un_partial_stated_rooms_key = self.store.get_un_partial_stated_rooms_token()
|
||||
|
||||
token = StreamToken(
|
||||
room_key=self.sources.room.get_current_key(),
|
||||
@@ -70,6 +71,7 @@ class EventSources:
|
||||
device_list_key=device_list_key,
|
||||
# Groups key is unused.
|
||||
groups_key=0,
|
||||
un_partial_stated_rooms_key=un_partial_stated_rooms_key,
|
||||
)
|
||||
return token
|
||||
|
||||
@@ -107,5 +109,6 @@ class EventSources:
|
||||
to_device_key=0,
|
||||
device_list_key=0,
|
||||
groups_key=0,
|
||||
un_partial_stated_rooms_key=0,
|
||||
)
|
||||
return token
|
||||
|
||||
@@ -627,6 +627,7 @@ class StreamKeyType:
|
||||
PUSH_RULES: Final = "push_rules_key"
|
||||
TO_DEVICE: Final = "to_device_key"
|
||||
DEVICE_LIST: Final = "device_list_key"
|
||||
UN_PARTIAL_STATED_ROOMS = "un_partial_stated_rooms_key"
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
@@ -634,7 +635,7 @@ class StreamToken:
|
||||
"""A collection of keys joined together by underscores in the following
|
||||
order and which represent the position in their respective streams.
|
||||
|
||||
ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1`
|
||||
ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1_379`
|
||||
1. `room_key`: `s2633508` which is a `RoomStreamToken`
|
||||
- `RoomStreamToken`'s can also look like `t426-2633508` or `m56~2.58~3.59`
|
||||
- See the docstring for `RoomStreamToken` for more details.
|
||||
@@ -646,12 +647,13 @@ class StreamToken:
|
||||
7. `to_device_key`: `274711`
|
||||
8. `device_list_key`: `265584`
|
||||
9. `groups_key`: `1` (note that this key is now unused)
|
||||
10. `un_partial_stated_rooms_key`: `379`
|
||||
|
||||
You can see how many of these keys correspond to the various
|
||||
fields in a "/sync" response:
|
||||
```json
|
||||
{
|
||||
"next_batch": "s12_4_0_1_1_1_1_4_1",
|
||||
"next_batch": "s12_4_0_1_1_1_1_4_1_1",
|
||||
"presence": {
|
||||
"events": []
|
||||
},
|
||||
@@ -663,7 +665,7 @@ class StreamToken:
|
||||
"!QrZlfIDQLNLdZHqTnt:hs1": {
|
||||
"timeline": {
|
||||
"events": [],
|
||||
"prev_batch": "s10_4_0_1_1_1_1_4_1",
|
||||
"prev_batch": "s10_4_0_1_1_1_1_4_1_1",
|
||||
"limited": false
|
||||
},
|
||||
"state": {
|
||||
@@ -699,6 +701,7 @@ class StreamToken:
|
||||
device_list_key: int
|
||||
# Note that the groups key is no longer used and may have bogus values.
|
||||
groups_key: int
|
||||
un_partial_stated_rooms_key: int
|
||||
|
||||
_SEPARATOR = "_"
|
||||
START: ClassVar["StreamToken"]
|
||||
@@ -737,6 +740,7 @@ class StreamToken:
|
||||
# serialized so that there will not be confusion in the future
|
||||
# if additional tokens are added.
|
||||
str(self.groups_key),
|
||||
str(self.un_partial_stated_rooms_key),
|
||||
]
|
||||
)
|
||||
|
||||
@@ -769,7 +773,7 @@ class StreamToken:
|
||||
return attr.evolve(self, **{key: new_value})
|
||||
|
||||
|
||||
StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0)
|
||||
StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0, 0)
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
|
||||
@@ -1831,7 +1831,7 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
def test_topo_token_is_accepted(self) -> None:
|
||||
"""Test Topo Token is accepted."""
|
||||
token = "t1-0_0_0_0_0_0_0_0_0"
|
||||
token = "t1-0_0_0_0_0_0_0_0_0_0"
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
|
||||
@@ -1845,7 +1845,7 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
|
||||
"""Test that stream token is accepted for forward pagination."""
|
||||
token = "s0_0_0_0_0_0_0_0_0"
|
||||
token = "s0_0_0_0_0_0_0_0_0_0"
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
|
||||
|
||||
@@ -1987,7 +1987,7 @@ class RoomMessageListTestCase(RoomBase):
|
||||
self.room_id = self.helper.create_room_as(self.user_id)
|
||||
|
||||
def test_topo_token_is_accepted(self) -> None:
|
||||
token = "t1-0_0_0_0_0_0_0_0_0"
|
||||
token = "t1-0_0_0_0_0_0_0_0_0_0"
|
||||
channel = self.make_request(
|
||||
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
|
||||
)
|
||||
@@ -1998,7 +1998,7 @@ class RoomMessageListTestCase(RoomBase):
|
||||
self.assertTrue("end" in channel.json_body)
|
||||
|
||||
def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
|
||||
token = "s0_0_0_0_0_0_0_0_0"
|
||||
token = "s0_0_0_0_0_0_0_0_0_0"
|
||||
channel = self.make_request(
|
||||
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
|
||||
)
|
||||
@@ -2728,7 +2728,7 @@ class LabelsTestCase(unittest.HomeserverTestCase):
|
||||
"""Test that we can filter by a label on a /messages request."""
|
||||
self._send_labelled_messages_in_room()
|
||||
|
||||
token = "s0_0_0_0_0_0_0_0_0"
|
||||
token = "s0_0_0_0_0_0_0_0_0_0"
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
|
||||
@@ -2745,7 +2745,7 @@ class LabelsTestCase(unittest.HomeserverTestCase):
|
||||
"""Test that we can filter by the absence of a label on a /messages request."""
|
||||
self._send_labelled_messages_in_room()
|
||||
|
||||
token = "s0_0_0_0_0_0_0_0_0"
|
||||
token = "s0_0_0_0_0_0_0_0_0_0"
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
|
||||
@@ -2768,7 +2768,7 @@ class LabelsTestCase(unittest.HomeserverTestCase):
|
||||
"""
|
||||
self._send_labelled_messages_in_room()
|
||||
|
||||
token = "s0_0_0_0_0_0_0_0_0"
|
||||
token = "s0_0_0_0_0_0_0_0_0_0"
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
|
||||
|
||||
Reference in New Issue
Block a user