mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-09 01:30:18 +00:00
Compare commits
8 Commits
quenting/l
...
squah/fast
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6fd9f45619 | ||
|
|
ad8a2b3c6e | ||
|
|
04bee9e709 | ||
|
|
5274c8779b | ||
|
|
10013eaacc | ||
|
|
ad8bab8548 | ||
|
|
0a539b9b57 | ||
|
|
31a2e5e417 |
1
changelog.d/13477.misc
Normal file
1
changelog.d/13477.misc
Normal file
@@ -0,0 +1 @@
|
||||
Faster room joins: Avoid blocking lazy-loading `/sync`s during partial joins due to remote memberships. Pull remote memberships from auth events instead of the room state.
|
||||
@@ -517,10 +517,17 @@ class SyncHandler:
|
||||
# ensure that we always include current state in the timeline
|
||||
current_state_ids: FrozenSet[str] = frozenset()
|
||||
if any(e.is_state() for e in recents):
|
||||
# FIXME(faster_joins): We use the partial state here as
|
||||
# we don't want to block `/sync` on finishing a lazy join.
|
||||
# Which should be fine once
|
||||
# https://github.com/matrix-org/synapse/issues/12989 is resolved,
|
||||
# since we shouldn't reach here anymore?
|
||||
# Note that we use the current state as a whitelist for filtering
|
||||
# `recents`, so partial state is only a problem when a membership
|
||||
# event turns up in `recents` but has not made it into the current
|
||||
# state.
|
||||
current_state_ids_map = (
|
||||
await self._state_storage_controller.get_current_state_ids(
|
||||
room_id
|
||||
)
|
||||
await self.store.get_partial_current_state_ids(room_id)
|
||||
)
|
||||
current_state_ids = frozenset(current_state_ids_map.values())
|
||||
|
||||
@@ -589,7 +596,13 @@ class SyncHandler:
|
||||
if any(e.is_state() for e in loaded_recents):
|
||||
# FIXME(faster_joins): We use the partial state here as
|
||||
# we don't want to block `/sync` on finishing a lazy join.
|
||||
# Is this the correct way of doing it?
|
||||
# Which should be fine once
|
||||
# https://github.com/matrix-org/synapse/issues/12989 is resolved,
|
||||
# since we shouldn't reach here anymore?
|
||||
# Note that we use the current state as a whitelist for filtering
|
||||
# `loaded_recents`, so partial state is only a problem when a
|
||||
# membership event turns up in `loaded_recents` but has not made it
|
||||
# into the current state.
|
||||
current_state_ids_map = (
|
||||
await self.store.get_partial_current_state_ids(room_id)
|
||||
)
|
||||
@@ -637,7 +650,10 @@ class SyncHandler:
|
||||
)
|
||||
|
||||
async def get_state_after_event(
|
||||
self, event_id: str, state_filter: Optional[StateFilter] = None
|
||||
self,
|
||||
event_id: str,
|
||||
state_filter: Optional[StateFilter] = None,
|
||||
await_full_state: Optional[bool] = None,
|
||||
) -> StateMap[str]:
|
||||
"""
|
||||
Get the room state after the given event
|
||||
@@ -645,9 +661,14 @@ class SyncHandler:
|
||||
Args:
|
||||
event_id: event of interest
|
||||
state_filter: The state filter used to fetch state from the database.
|
||||
await_full_state: if `True`, will block if we do not yet have complete state
|
||||
at the event. Defaults to `True` unless `state_filter` can be completely
|
||||
satisfied with partial state.
|
||||
"""
|
||||
state_ids = await self._state_storage_controller.get_state_ids_for_event(
|
||||
event_id, state_filter=state_filter or StateFilter.all()
|
||||
event_id,
|
||||
state_filter=state_filter or StateFilter.all(),
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
|
||||
# using get_metadata_for_events here (instead of get_event) sidesteps an issue
|
||||
@@ -670,6 +691,7 @@ class SyncHandler:
|
||||
room_id: str,
|
||||
stream_position: StreamToken,
|
||||
state_filter: Optional[StateFilter] = None,
|
||||
await_full_state: Optional[bool] = None,
|
||||
) -> StateMap[str]:
|
||||
"""Get the room state at a particular stream position
|
||||
|
||||
@@ -677,6 +699,10 @@ class SyncHandler:
|
||||
room_id: room for which to get state
|
||||
stream_position: point at which to get state
|
||||
state_filter: The state filter used to fetch state from the database.
|
||||
await_full_state: if `True`, will block if we do not yet have complete state
|
||||
at the last event in the room before `stream_position`. Defaults to
|
||||
`True` unless `state_filter` can be completely satisfied with partial
|
||||
state.
|
||||
"""
|
||||
# FIXME: This gets the state at the latest event before the stream ordering,
|
||||
# which might not be the same as the "current state" of the room at the time
|
||||
@@ -688,7 +714,9 @@ class SyncHandler:
|
||||
|
||||
if last_event_id:
|
||||
state = await self.get_state_after_event(
|
||||
last_event_id, state_filter=state_filter or StateFilter.all()
|
||||
last_event_id,
|
||||
state_filter=state_filter or StateFilter.all(),
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
|
||||
else:
|
||||
@@ -890,8 +918,14 @@ class SyncHandler:
|
||||
|
||||
with Measure(self.clock, "compute_state_delta"):
|
||||
# The memberships needed for events in the timeline.
|
||||
# A dictionary with user IDs as keys and the first event in the timeline
|
||||
# requiring each member as values.
|
||||
# Only calculated when `lazy_load_members` is on.
|
||||
members_to_fetch = None
|
||||
members_to_fetch: Optional[Dict[str, Optional[EventBase]]] = None
|
||||
|
||||
# The contribution to the room state from state events in the timeline.
|
||||
# Only contains the last event for any given state key.
|
||||
timeline_state: StateMap[str]
|
||||
|
||||
lazy_load_members = sync_config.filter_collection.lazy_load_members()
|
||||
include_redundant_members = (
|
||||
@@ -902,29 +936,38 @@ class SyncHandler:
|
||||
# We only request state for the members needed to display the
|
||||
# timeline:
|
||||
|
||||
members_to_fetch = {
|
||||
event.sender # FIXME: we also care about invite targets etc.
|
||||
for event in batch.events
|
||||
}
|
||||
timeline_state = {}
|
||||
|
||||
members_to_fetch = {}
|
||||
for event in batch.events:
|
||||
# We need the event's sender, unless their membership was in a
|
||||
# previous timeline event.
|
||||
if (
|
||||
EventTypes.Member,
|
||||
event.sender,
|
||||
) not in timeline_state and event.sender not in members_to_fetch:
|
||||
members_to_fetch[event.sender] = event
|
||||
# FIXME: we also care about invite targets etc.
|
||||
|
||||
if event.is_state():
|
||||
timeline_state[(event.type, event.state_key)] = event.event_id
|
||||
|
||||
if full_state:
|
||||
# always make sure we LL ourselves so we know we're in the room
|
||||
# (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
|
||||
# We only need apply this on full state syncs given we disabled
|
||||
# LL for incr syncs in #3840.
|
||||
members_to_fetch.add(sync_config.user.to_string())
|
||||
members_to_fetch[sync_config.user.to_string()] = None
|
||||
|
||||
state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
|
||||
else:
|
||||
state_filter = StateFilter.all()
|
||||
timeline_state = {
|
||||
(event.type, event.state_key): event.event_id
|
||||
for event in batch.events
|
||||
if event.is_state()
|
||||
}
|
||||
|
||||
# The contribution to the room state from state events in the timeline.
|
||||
# Only contains the last event for any given state key.
|
||||
timeline_state = {
|
||||
(event.type, event.state_key): event.event_id
|
||||
for event in batch.events
|
||||
if event.is_state()
|
||||
}
|
||||
state_filter = StateFilter.all()
|
||||
|
||||
# Now calculate the state to return in the sync response for the room.
|
||||
# This is more or less the change in state between the end of the previous
|
||||
@@ -936,19 +979,26 @@ class SyncHandler:
|
||||
if batch:
|
||||
state_at_timeline_end = (
|
||||
await self._state_storage_controller.get_state_ids_for_event(
|
||||
batch.events[-1].event_id, state_filter=state_filter
|
||||
batch.events[-1].event_id,
|
||||
state_filter=state_filter,
|
||||
await_full_state=not lazy_load_members,
|
||||
)
|
||||
)
|
||||
|
||||
state_at_timeline_start = (
|
||||
await self._state_storage_controller.get_state_ids_for_event(
|
||||
batch.events[0].event_id, state_filter=state_filter
|
||||
batch.events[0].event_id,
|
||||
state_filter=state_filter,
|
||||
await_full_state=not lazy_load_members,
|
||||
)
|
||||
)
|
||||
|
||||
else:
|
||||
state_at_timeline_end = await self.get_state_at(
|
||||
room_id, stream_position=now_token, state_filter=state_filter
|
||||
room_id,
|
||||
stream_position=now_token,
|
||||
state_filter=state_filter,
|
||||
await_full_state=not lazy_load_members,
|
||||
)
|
||||
|
||||
state_at_timeline_start = state_at_timeline_end
|
||||
@@ -964,14 +1014,19 @@ class SyncHandler:
|
||||
if batch:
|
||||
state_at_timeline_start = (
|
||||
await self._state_storage_controller.get_state_ids_for_event(
|
||||
batch.events[0].event_id, state_filter=state_filter
|
||||
batch.events[0].event_id,
|
||||
state_filter=state_filter,
|
||||
await_full_state=not lazy_load_members,
|
||||
)
|
||||
)
|
||||
else:
|
||||
# We can get here if the user has ignored the senders of all
|
||||
# the recent events.
|
||||
state_at_timeline_start = await self.get_state_at(
|
||||
room_id, stream_position=now_token, state_filter=state_filter
|
||||
room_id,
|
||||
stream_position=now_token,
|
||||
state_filter=state_filter,
|
||||
await_full_state=not lazy_load_members,
|
||||
)
|
||||
|
||||
# for now, we disable LL for gappy syncs - see
|
||||
@@ -993,20 +1048,28 @@ class SyncHandler:
|
||||
# is indeed the case.
|
||||
assert since_token is not None
|
||||
state_at_previous_sync = await self.get_state_at(
|
||||
room_id, stream_position=since_token, state_filter=state_filter
|
||||
room_id,
|
||||
stream_position=since_token,
|
||||
state_filter=state_filter,
|
||||
await_full_state=not lazy_load_members,
|
||||
)
|
||||
|
||||
if batch:
|
||||
state_at_timeline_end = (
|
||||
await self._state_storage_controller.get_state_ids_for_event(
|
||||
batch.events[-1].event_id, state_filter=state_filter
|
||||
batch.events[-1].event_id,
|
||||
state_filter=state_filter,
|
||||
await_full_state=not lazy_load_members,
|
||||
)
|
||||
)
|
||||
else:
|
||||
# We can get here if the user has ignored the senders of all
|
||||
# the recent events.
|
||||
state_at_timeline_end = await self.get_state_at(
|
||||
room_id, stream_position=now_token, state_filter=state_filter
|
||||
room_id,
|
||||
stream_position=now_token,
|
||||
state_filter=state_filter,
|
||||
await_full_state=not lazy_load_members,
|
||||
)
|
||||
|
||||
state_ids = _calculate_state(
|
||||
@@ -1036,6 +1099,77 @@ class SyncHandler:
|
||||
(EventTypes.Member, member)
|
||||
for member in members_to_fetch
|
||||
),
|
||||
await_full_state=False,
|
||||
)
|
||||
|
||||
# If we only have partial state for the room, `state_ids` may be missing the
|
||||
# memberships we wanted. We attempt to find some by digging through the auth
|
||||
# events of timeline events.
|
||||
if lazy_load_members:
|
||||
assert members_to_fetch is not None
|
||||
|
||||
is_partial_state = await self.store.is_partial_state_room(room_id)
|
||||
if is_partial_state:
|
||||
additional_state_ids: MutableStateMap[str] = {}
|
||||
|
||||
# Tracks the missing members for logging purposes.
|
||||
missing_members = {}
|
||||
|
||||
# Pick out the auth events of timeline events whose sender
|
||||
# memberships are missing.
|
||||
auth_event_ids: Set[str] = set()
|
||||
for member, first_referencing_event in members_to_fetch.items():
|
||||
if (
|
||||
first_referencing_event is None
|
||||
or (EventTypes.Member, member) in state_ids
|
||||
):
|
||||
continue
|
||||
|
||||
missing_members[member] = first_referencing_event
|
||||
auth_event_ids.update(first_referencing_event.auth_event_ids())
|
||||
|
||||
auth_events = await self.store.get_events(auth_event_ids)
|
||||
|
||||
# Run through the events with missing sender memberships once more,
|
||||
# picking out the memberships from the pile of auth events we have
|
||||
# just fetched.
|
||||
for member, first_referencing_event in members_to_fetch.items():
|
||||
if (
|
||||
first_referencing_event is None
|
||||
or (EventTypes.Member, member) in state_ids
|
||||
):
|
||||
continue
|
||||
|
||||
# Dig through the auth events to find the sender's membership.
|
||||
for auth_event_id in first_referencing_event.auth_event_ids():
|
||||
# We only store events once we have all their auth events,
|
||||
# so the auth event must be in the pile we have just
|
||||
# fetched.
|
||||
auth_event = auth_events[auth_event_id]
|
||||
|
||||
if (
|
||||
auth_event.type == EventTypes.Member
|
||||
and auth_event.state_key == event.sender
|
||||
):
|
||||
missing_members.pop(member)
|
||||
additional_state_ids[
|
||||
(EventTypes.Member, event.sender)
|
||||
] = auth_event.event_id
|
||||
break
|
||||
|
||||
# Now merge in the state we have scrounged up.
|
||||
state_ids = {**state_ids, **additional_state_ids}
|
||||
|
||||
if missing_members:
|
||||
# There really shouldn't be any missing memberships now.
|
||||
# For an event to appear in the timeline, we must have its auth
|
||||
# events, which must include its sender's membership.
|
||||
logger.error(
|
||||
"Failed to find memberships for %s in partial state room "
|
||||
"%s in the auth events of %s.",
|
||||
list(missing_members.keys()),
|
||||
room_id,
|
||||
list(missing_members.values()),
|
||||
)
|
||||
|
||||
# At this point, if `lazy_load_members` is enabled, `state_ids` includes
|
||||
@@ -1730,7 +1864,11 @@ class SyncHandler:
|
||||
continue
|
||||
|
||||
if room_id in sync_result_builder.joined_room_ids or has_join:
|
||||
old_state_ids = await self.get_state_at(room_id, since_token)
|
||||
old_state_ids = await self.get_state_at(
|
||||
room_id,
|
||||
since_token,
|
||||
state_filter=StateFilter.from_types([(EventTypes.Member, user_id)]),
|
||||
)
|
||||
old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
|
||||
old_mem_ev = None
|
||||
if old_mem_ev_id:
|
||||
@@ -1756,7 +1894,13 @@ class SyncHandler:
|
||||
newly_left_rooms.append(room_id)
|
||||
else:
|
||||
if not old_state_ids:
|
||||
old_state_ids = await self.get_state_at(room_id, since_token)
|
||||
old_state_ids = await self.get_state_at(
|
||||
room_id,
|
||||
since_token,
|
||||
state_filter=StateFilter.from_types(
|
||||
[(EventTypes.Member, user_id)]
|
||||
),
|
||||
)
|
||||
old_mem_ev_id = old_state_ids.get(
|
||||
(EventTypes.Member, user_id), None
|
||||
)
|
||||
|
||||
@@ -234,6 +234,7 @@ class StateStorageController:
|
||||
self,
|
||||
event_ids: Collection[str],
|
||||
state_filter: Optional[StateFilter] = None,
|
||||
await_full_state: Optional[bool] = None,
|
||||
) -> Dict[str, StateMap[str]]:
|
||||
"""
|
||||
Get the state dicts corresponding to a list of events, containing the event_ids
|
||||
@@ -242,6 +243,9 @@ class StateStorageController:
|
||||
Args:
|
||||
event_ids: events whose state should be returned
|
||||
state_filter: The state filter used to fetch state from the database.
|
||||
await_full_state: if `True`, will block if we do not yet have complete state
|
||||
at these events. Defaults to `True` unless `state_filter` can be
|
||||
completely satisfied with partial state.
|
||||
|
||||
Returns:
|
||||
A dict from event_id -> (type, state_key) -> event_id
|
||||
@@ -250,9 +254,13 @@ class StateStorageController:
|
||||
RuntimeError if we don't have a state group for one or more of the events
|
||||
(ie they are outliers or unknown)
|
||||
"""
|
||||
await_full_state = True
|
||||
if state_filter and not state_filter.must_await_full_state(self._is_mine_id):
|
||||
await_full_state = False
|
||||
if await_full_state is None:
|
||||
if state_filter and not state_filter.must_await_full_state(
|
||||
self._is_mine_id
|
||||
):
|
||||
await_full_state = False
|
||||
else:
|
||||
await_full_state = True
|
||||
|
||||
event_to_groups = await self.get_state_group_for_events(
|
||||
event_ids, await_full_state=await_full_state
|
||||
@@ -294,7 +302,10 @@ class StateStorageController:
|
||||
|
||||
@trace
|
||||
async def get_state_ids_for_event(
|
||||
self, event_id: str, state_filter: Optional[StateFilter] = None
|
||||
self,
|
||||
event_id: str,
|
||||
state_filter: Optional[StateFilter] = None,
|
||||
await_full_state: Optional[bool] = None,
|
||||
) -> StateMap[str]:
|
||||
"""
|
||||
Get the state dict corresponding to a particular event
|
||||
@@ -302,6 +313,9 @@ class StateStorageController:
|
||||
Args:
|
||||
event_id: event whose state should be returned
|
||||
state_filter: The state filter used to fetch state from the database.
|
||||
await_full_state: if `True`, will block if we do not yet have complete state
|
||||
at the event. Defaults to `True` unless `state_filter` can be completely
|
||||
satisfied with partial state.
|
||||
|
||||
Returns:
|
||||
A dict from (type, state_key) -> state_event_id
|
||||
@@ -311,7 +325,9 @@ class StateStorageController:
|
||||
outlier or is unknown)
|
||||
"""
|
||||
state_map = await self.get_state_ids_for_events(
|
||||
[event_id], state_filter or StateFilter.all()
|
||||
[event_id],
|
||||
state_filter or StateFilter.all(),
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
return state_map[event_id]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user