Compare commits

...

8 Commits

Author SHA1 Message Date
Sean Quah
6fd9f45619 Merge remote-tracking branch 'origin/develop' into squah/faster_room_joins_unblock_lazy_loading_sync_2 2022-08-17 16:18:39 +01:00
Sean Quah
ad8a2b3c6e Add newsfile 2022-08-10 13:08:02 +01:00
Sean Quah
04bee9e709 Dig up memberships for lazy-loading syncs in partial state rooms
Signed-off-by: Sean Quah <seanq@matrix.org>
2022-08-10 13:08:02 +01:00
Sean Quah
5274c8779b Do not wait for full state in compute_state_delta
Signed-off-by: Sean Quah <seanq@matrix.org>
2022-08-10 13:08:02 +01:00
Sean Quah
10013eaacc Do not wait for full state in a few cases in _get_rooms_changed 2022-08-10 13:08:02 +01:00
Sean Quah
ad8bab8548 Do not wait for full state in a few cases in _load_filtered_recents 2022-08-10 13:08:02 +01:00
Sean Quah
0a539b9b57 Add option to get_state_at/after_event to return partial state
Signed-off-by: Sean Quah <seanq@matrix.org>
2022-08-10 12:59:01 +01:00
Sean Quah
31a2e5e417 Add option for get_state_ids_for_event(s) to return partial state
Signed-off-by: Sean Quah <seanq@matrix.org>
2022-08-10 12:59:01 +01:00
3 changed files with 197 additions and 36 deletions

1
changelog.d/13477.misc Normal file
View File

@@ -0,0 +1 @@
Faster room joins: Avoid blocking lazy-loading `/sync`s during partial joins due to remote memberships. Pull remote memberships from auth events instead of the room state.

View File

@@ -517,10 +517,17 @@ class SyncHandler:
# ensure that we always include current state in the timeline
current_state_ids: FrozenSet[str] = frozenset()
if any(e.is_state() for e in recents):
# FIXME(faster_joins): We use the partial state here as
# we don't want to block `/sync` on finishing a lazy join.
# Which should be fine once
# https://github.com/matrix-org/synapse/issues/12989 is resolved,
# since we shouldn't reach here anymore?
# Note that we use the current state as a whitelist for filtering
# `recents`, so partial state is only a problem when a membership
# event turns up in `recents` but has not made it into the current
# state.
current_state_ids_map = (
await self._state_storage_controller.get_current_state_ids(
room_id
)
await self.store.get_partial_current_state_ids(room_id)
)
current_state_ids = frozenset(current_state_ids_map.values())
@@ -589,7 +596,13 @@ class SyncHandler:
if any(e.is_state() for e in loaded_recents):
# FIXME(faster_joins): We use the partial state here as
# we don't want to block `/sync` on finishing a lazy join.
# Is this the correct way of doing it?
# Which should be fine once
# https://github.com/matrix-org/synapse/issues/12989 is resolved,
# since we shouldn't reach here anymore?
# Note that we use the current state as a whitelist for filtering
# `loaded_recents`, so partial state is only a problem when a
# membership event turns up in `loaded_recents` but has not made it
# into the current state.
current_state_ids_map = (
await self.store.get_partial_current_state_ids(room_id)
)
@@ -637,7 +650,10 @@ class SyncHandler:
)
async def get_state_after_event(
self, event_id: str, state_filter: Optional[StateFilter] = None
self,
event_id: str,
state_filter: Optional[StateFilter] = None,
await_full_state: Optional[bool] = None,
) -> StateMap[str]:
"""
Get the room state after the given event
@@ -645,9 +661,14 @@ class SyncHandler:
Args:
event_id: event of interest
state_filter: The state filter used to fetch state from the database.
await_full_state: if `True`, will block if we do not yet have complete state
at the event. Defaults to `True` unless `state_filter` can be completely
satisfied with partial state.
"""
state_ids = await self._state_storage_controller.get_state_ids_for_event(
event_id, state_filter=state_filter or StateFilter.all()
event_id,
state_filter=state_filter or StateFilter.all(),
await_full_state=await_full_state,
)
# using get_metadata_for_events here (instead of get_event) sidesteps an issue
@@ -670,6 +691,7 @@ class SyncHandler:
room_id: str,
stream_position: StreamToken,
state_filter: Optional[StateFilter] = None,
await_full_state: Optional[bool] = None,
) -> StateMap[str]:
"""Get the room state at a particular stream position
@@ -677,6 +699,10 @@ class SyncHandler:
room_id: room for which to get state
stream_position: point at which to get state
state_filter: The state filter used to fetch state from the database.
await_full_state: if `True`, will block if we do not yet have complete state
at the last event in the room before `stream_position`. Defaults to
`True` unless `state_filter` can be completely satisfied with partial
state.
"""
# FIXME: This gets the state at the latest event before the stream ordering,
# which might not be the same as the "current state" of the room at the time
@@ -688,7 +714,9 @@ class SyncHandler:
if last_event_id:
state = await self.get_state_after_event(
last_event_id, state_filter=state_filter or StateFilter.all()
last_event_id,
state_filter=state_filter or StateFilter.all(),
await_full_state=await_full_state,
)
else:
@@ -890,8 +918,14 @@ class SyncHandler:
with Measure(self.clock, "compute_state_delta"):
# The memberships needed for events in the timeline.
# A dictionary with user IDs as keys and the first event in the timeline
# requiring each member as values.
# Only calculated when `lazy_load_members` is on.
members_to_fetch = None
members_to_fetch: Optional[Dict[str, Optional[EventBase]]] = None
# The contribution to the room state from state events in the timeline.
# Only contains the last event for any given state key.
timeline_state: StateMap[str]
lazy_load_members = sync_config.filter_collection.lazy_load_members()
include_redundant_members = (
@@ -902,29 +936,38 @@ class SyncHandler:
# We only request state for the members needed to display the
# timeline:
members_to_fetch = {
event.sender # FIXME: we also care about invite targets etc.
for event in batch.events
}
timeline_state = {}
members_to_fetch = {}
for event in batch.events:
# We need the event's sender, unless their membership was in a
# previous timeline event.
if (
EventTypes.Member,
event.sender,
) not in timeline_state and event.sender not in members_to_fetch:
members_to_fetch[event.sender] = event
# FIXME: we also care about invite targets etc.
if event.is_state():
timeline_state[(event.type, event.state_key)] = event.event_id
if full_state:
# always make sure we LL ourselves so we know we're in the room
# (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
# We only need apply this on full state syncs given we disabled
# LL for incr syncs in #3840.
members_to_fetch.add(sync_config.user.to_string())
members_to_fetch[sync_config.user.to_string()] = None
state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
else:
state_filter = StateFilter.all()
timeline_state = {
(event.type, event.state_key): event.event_id
for event in batch.events
if event.is_state()
}
# The contribution to the room state from state events in the timeline.
# Only contains the last event for any given state key.
timeline_state = {
(event.type, event.state_key): event.event_id
for event in batch.events
if event.is_state()
}
state_filter = StateFilter.all()
# Now calculate the state to return in the sync response for the room.
# This is more or less the change in state between the end of the previous
@@ -936,19 +979,26 @@ class SyncHandler:
if batch:
state_at_timeline_end = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[-1].event_id, state_filter=state_filter
batch.events[-1].event_id,
state_filter=state_filter,
await_full_state=not lazy_load_members,
)
)
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id, state_filter=state_filter
batch.events[0].event_id,
state_filter=state_filter,
await_full_state=not lazy_load_members,
)
)
else:
state_at_timeline_end = await self.get_state_at(
room_id, stream_position=now_token, state_filter=state_filter
room_id,
stream_position=now_token,
state_filter=state_filter,
await_full_state=not lazy_load_members,
)
state_at_timeline_start = state_at_timeline_end
@@ -964,14 +1014,19 @@ class SyncHandler:
if batch:
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id, state_filter=state_filter
batch.events[0].event_id,
state_filter=state_filter,
await_full_state=not lazy_load_members,
)
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_start = await self.get_state_at(
room_id, stream_position=now_token, state_filter=state_filter
room_id,
stream_position=now_token,
state_filter=state_filter,
await_full_state=not lazy_load_members,
)
# for now, we disable LL for gappy syncs - see
@@ -993,20 +1048,28 @@ class SyncHandler:
# is indeed the case.
assert since_token is not None
state_at_previous_sync = await self.get_state_at(
room_id, stream_position=since_token, state_filter=state_filter
room_id,
stream_position=since_token,
state_filter=state_filter,
await_full_state=not lazy_load_members,
)
if batch:
state_at_timeline_end = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[-1].event_id, state_filter=state_filter
batch.events[-1].event_id,
state_filter=state_filter,
await_full_state=not lazy_load_members,
)
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_end = await self.get_state_at(
room_id, stream_position=now_token, state_filter=state_filter
room_id,
stream_position=now_token,
state_filter=state_filter,
await_full_state=not lazy_load_members,
)
state_ids = _calculate_state(
@@ -1036,6 +1099,77 @@ class SyncHandler:
(EventTypes.Member, member)
for member in members_to_fetch
),
await_full_state=False,
)
# If we only have partial state for the room, `state_ids` may be missing the
# memberships we wanted. We attempt to find some by digging through the auth
# events of timeline events.
if lazy_load_members:
assert members_to_fetch is not None
is_partial_state = await self.store.is_partial_state_room(room_id)
if is_partial_state:
additional_state_ids: MutableStateMap[str] = {}
# Tracks the missing members for logging purposes.
missing_members = {}
# Pick out the auth events of timeline events whose sender
# memberships are missing.
auth_event_ids: Set[str] = set()
for member, first_referencing_event in members_to_fetch.items():
if (
first_referencing_event is None
or (EventTypes.Member, member) in state_ids
):
continue
missing_members[member] = first_referencing_event
auth_event_ids.update(first_referencing_event.auth_event_ids())
auth_events = await self.store.get_events(auth_event_ids)
# Run through the events with missing sender memberships once more,
# picking out the memberships from the pile of auth events we have
# just fetched.
for member, first_referencing_event in members_to_fetch.items():
if (
first_referencing_event is None
or (EventTypes.Member, member) in state_ids
):
continue
# Dig through the auth events to find the sender's membership.
for auth_event_id in first_referencing_event.auth_event_ids():
# We only store events once we have all their auth events,
# so the auth event must be in the pile we have just
# fetched.
auth_event = auth_events[auth_event_id]
if (
auth_event.type == EventTypes.Member
and auth_event.state_key == event.sender
):
missing_members.pop(member)
additional_state_ids[
(EventTypes.Member, event.sender)
] = auth_event.event_id
break
# Now merge in the state we have scrounged up.
state_ids = {**state_ids, **additional_state_ids}
if missing_members:
# There really shouldn't be any missing memberships now.
# For an event to appear in the timeline, we must have its auth
# events, which must include its sender's membership.
logger.error(
"Failed to find memberships for %s in partial state room "
"%s in the auth events of %s.",
list(missing_members.keys()),
room_id,
list(missing_members.values()),
)
# At this point, if `lazy_load_members` is enabled, `state_ids` includes
@@ -1730,7 +1864,11 @@ class SyncHandler:
continue
if room_id in sync_result_builder.joined_room_ids or has_join:
old_state_ids = await self.get_state_at(room_id, since_token)
old_state_ids = await self.get_state_at(
room_id,
since_token,
state_filter=StateFilter.from_types([(EventTypes.Member, user_id)]),
)
old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
old_mem_ev = None
if old_mem_ev_id:
@@ -1756,7 +1894,13 @@ class SyncHandler:
newly_left_rooms.append(room_id)
else:
if not old_state_ids:
old_state_ids = await self.get_state_at(room_id, since_token)
old_state_ids = await self.get_state_at(
room_id,
since_token,
state_filter=StateFilter.from_types(
[(EventTypes.Member, user_id)]
),
)
old_mem_ev_id = old_state_ids.get(
(EventTypes.Member, user_id), None
)

View File

@@ -234,6 +234,7 @@ class StateStorageController:
self,
event_ids: Collection[str],
state_filter: Optional[StateFilter] = None,
await_full_state: Optional[bool] = None,
) -> Dict[str, StateMap[str]]:
"""
Get the state dicts corresponding to a list of events, containing the event_ids
@@ -242,6 +243,9 @@ class StateStorageController:
Args:
event_ids: events whose state should be returned
state_filter: The state filter used to fetch state from the database.
await_full_state: if `True`, will block if we do not yet have complete state
at these events. Defaults to `True` unless `state_filter` can be
completely satisfied with partial state.
Returns:
A dict from event_id -> (type, state_key) -> event_id
@@ -250,9 +254,13 @@ class StateStorageController:
RuntimeError if we don't have a state group for one or more of the events
(ie they are outliers or unknown)
"""
await_full_state = True
if state_filter and not state_filter.must_await_full_state(self._is_mine_id):
await_full_state = False
if await_full_state is None:
if state_filter and not state_filter.must_await_full_state(
self._is_mine_id
):
await_full_state = False
else:
await_full_state = True
event_to_groups = await self.get_state_group_for_events(
event_ids, await_full_state=await_full_state
@@ -294,7 +302,10 @@ class StateStorageController:
@trace
async def get_state_ids_for_event(
self, event_id: str, state_filter: Optional[StateFilter] = None
self,
event_id: str,
state_filter: Optional[StateFilter] = None,
await_full_state: Optional[bool] = None,
) -> StateMap[str]:
"""
Get the state dict corresponding to a particular event
@@ -302,6 +313,9 @@ class StateStorageController:
Args:
event_id: event whose state should be returned
state_filter: The state filter used to fetch state from the database.
await_full_state: if `True`, will block if we do not yet have complete state
at the event. Defaults to `True` unless `state_filter` can be completely
satisfied with partial state.
Returns:
A dict from (type, state_key) -> state_event_id
@@ -311,7 +325,9 @@ class StateStorageController:
outlier or is unknown)
"""
state_map = await self.get_state_ids_for_events(
[event_id], state_filter or StateFilter.all()
[event_id],
state_filter or StateFilter.all(),
await_full_state=await_full_state,
)
return state_map[event_id]