Compare commits

...

3 Commits

Author SHA1 Message Date
Erik Johnston
80db47ea40 Fixup 2023-07-20 13:26:14 +01:00
Erik Johnston
38f16214ec Newsfile 2023-07-20 13:21:58 +01:00
Erik Johnston
c0b02ed93e Speed up calculation of remote hosts 2023-07-20 13:19:16 +01:00
4 changed files with 88 additions and 5 deletions

1
changelog.d/15970.misc Normal file
View File

@@ -0,0 +1 @@
Speed up calculation of remote hosts.

View File

@@ -1565,12 +1565,14 @@ class EventCreationHandler:
if state_entry.state_group in self._external_cache_joined_hosts_updates:
return
state = await state_entry.get_state(
await state_entry.get_state(
self._storage_controllers.state, StateFilter.all()
)
with opentracing.start_active_span("get_joined_hosts"):
joined_hosts = await self.store.get_joined_hosts(
event.room_id, state, state_entry
stream_ordering = event.internal_metadata.stream_ordering
assert stream_ordering is not None
joined_hosts = await self._storage_controllers.state.get_joined_remote_hosts_for_event(
event.room_id, event.event_id, stream_ordering
)
# Note that the expiry times must be larger than the expiry time in

View File

@@ -23,10 +23,11 @@ from typing import (
List,
Mapping,
Optional,
Set,
Tuple,
)
from synapse.api.constants import EventTypes
from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.logging.opentracing import tag_args, trace
from synapse.storage.roommember import ProfileInfo
@@ -34,7 +35,7 @@ from synapse.storage.util.partial_state_events_tracker import (
PartialCurrentStateTracker,
PartialStateEventsTracker,
)
from synapse.types import MutableStateMap, StateMap
from synapse.types import MutableStateMap, StateMap, get_domain_from_id
from synapse.types.state import StateFilter
from synapse.util.cancellation import cancellable
@@ -627,3 +628,56 @@ class StateStorageController:
await self._partial_state_room_tracker.await_full_state(room_id)
return await self.stores.main.get_users_in_room_with_profiles(room_id)
async def get_joined_remote_hosts_for_event(
self,
room_id: str,
event_id: str,
stream_ordering: int,
) -> Set[str]:
"""Get the remote hosts that are in the room at the given event / stream ordering"""
joined_users = set(await self.stores.main.get_users_in_room(room_id))
changed_users: AbstractSet[
str
] = await self.stores.main.get_changed_remote_users_after_event(
room_id, stream_ordering
)
known_joined_hosts = {
get_domain_from_id(u)
for u in joined_users - changed_users
if not self._is_mine_id(u)
}
if not changed_users:
return known_joined_hosts
potentially_changed_hosts = {get_domain_from_id(u) for u in changed_users}
if not potentially_changed_hosts - known_joined_hosts:
return known_joined_hosts
changed_users = {
user_id
for user_id in changed_users
if get_domain_from_id(user_id) not in known_joined_hosts
}
state_ids = await self.get_state_ids_for_event(
event_id,
StateFilter.from_types(
[(EventTypes.Member, user_id) for user_id in changed_users]
),
)
membership_map = await self.stores.main.get_membership_from_event_ids(
state_ids.values()
)
known_joined_hosts.update(
user_id
for user_id, membership in membership_map.items()
if membership and membership.membership == Membership.JOIN
)
return known_joined_hosts

View File

@@ -1057,6 +1057,32 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
"get_current_hosts_in_room_ordered", get_current_hosts_in_room_ordered_txn
)
async def get_changed_remote_users_after_event(
self, room_id: str, stream_ordering: int
) -> FrozenSet[str]:
"""Get the users in the room that may have changed since the stream
ordering."""
return await self.db_pool.runInteraction(
"get_changed_remote_users_after_event",
self._get_changed_remote_users_after_event_txn,
room_id,
stream_ordering,
)
def _get_changed_remote_users_after_event_txn(
self, txn: LoggingTransaction, room_id: str, stream_ordering: int
) -> FrozenSet[str]:
sql = """
SELECT state_key
FROM current_state_delta_stream
WHERE room_id = ? AND stream_ordering >= ? AND type = ?
GROUP BY state_key
"""
txn.execute(sql, (room_id, stream_ordering, EventTypes.Member))
return frozenset(user_id for user_id, in txn if not self.hs.is_mine_id(user_id))
async def get_joined_hosts(
self, room_id: str, state: StateMap[str], state_entry: "_StateCacheEntry"
) -> FrozenSet[str]: