Compare commits

...

6 Commits

Author SHA1 Message Date
Andrew Morgan
f3e4cf3758 Fix get_users_who_share_room_with_user SQL
Three fixes here:

* Using UNION only selects DISTINCT rows from each query. However, if we DISTINCT the rows during the query, and then use UNION ALL which doesn't attempt to select distinct rows, we get a 1/3rd speedup in query time!
* We should select other_user_id instead of user_id from users_who_share_private_rooms, as user_id will always be the requesting user.
* Added p1.user_id != p2.user_id to filter out the same entries between each table in the users_in_public_rooms query.
2021-02-15 17:41:26 +00:00
Andrew Morgan
32e41cc8e0 Add instead of update requesting user_id to set; invalidate cache context
We were set.update'ing a user_id, instead of set.add. The former treats
user_id as an iterable, and thus adds every individual letter of the
requesting user to the set. Fun!
2021-02-15 17:33:35 +00:00
Andrew Morgan
27b0a44a18 Fix server name typo 2021-02-12 22:12:56 +00:00
Andrew Morgan
e10b3b377b Changelog 2021-02-12 20:11:38 +00:00
Andrew Morgan
72c1e61f41 Remove duplicate where we filter ourselves out of destinations to send presence to
send_presence_to_destinations will already filter out the current server from the
given list. No need to do it again beforehand.
2021-02-12 20:11:37 +00:00
Andrew Morgan
c57f436515 Speed up get_users_who_share_room_with_user with a more efficient query
The old code was pulling all rooms for a given user, *then* looping over each
one in Python and pulling all users from those rooms.

This commit replaces that with a single query which makes use of existing
tables which keep track of user relationships.
2021-02-12 20:11:10 +00:00
5 changed files with 36 additions and 24 deletions

1
changelog.d/9399.misc Normal file
View File

@@ -0,0 +1 @@
Optimise some code behind the user presence feature.

View File

@@ -472,7 +472,7 @@ class FederationSender:
self._processing_pending_presence = False
def send_presence_to_destinations(
self, states: List[UserPresenceState], destinations: List[str]
self, states: List[UserPresenceState], destinations: Iterable[str]
) -> None:
"""Send the given presence states to the given destinations.
destinations (list[str])

View File

@@ -909,9 +909,6 @@ class PresenceHandler(BasePresenceHandler):
state = await self.current_state_for_user(user_id)
hosts = await self.state.get_current_hosts_in_room(room_id)
# Filter out ourselves.
hosts = {host for host in hosts if host != self.server_name}
self.federation.send_presence_to_destinations(
states=[state], destinations=hosts
)
@@ -1115,13 +1112,11 @@ class PresenceEventSource:
updates for
"""
user_id = user.to_string()
users_interested_in = set()
users_interested_in.add(user_id) # So that we receive our own presence
users_who_share_room = await self.store.get_users_who_share_room_with_user(
users_interested_in = await self.store.get_users_who_share_room_with_user(
user_id, on_invalidate=cache_context.invalidate
)
users_interested_in.update(users_who_share_room)
users_interested_in.add(user_id) # So that we receive our own presence
if explicit_room_id:
user_ids = await self.store.get_users_in_room(

View File

@@ -37,7 +37,7 @@ from synapse.storage.roommember import (
from synapse.types import Collection, PersistedEventPosition, get_domain_from_id
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.metrics import Measure
if TYPE_CHECKING:
@@ -484,24 +484,36 @@ class RoomMemberWorkerStore(EventsWorkerStore):
)
return frozenset(r.room_id for r in rooms)
@cached(max_entries=500000, cache_context=True, iterable=True)
async def get_users_who_share_room_with_user(
self, user_id: str, cache_context: _CacheContext
) -> Set[str]:
@cached(max_entries=500000, iterable=True)
async def get_users_who_share_room_with_user(self, user_id: str) -> Set[str]:
"""Returns the set of users who share a room with `user_id`
"""
room_ids = await self.get_rooms_for_user(
user_id, on_invalidate=cache_context.invalidate
def _get_users_who_share_room_with_user(txn):
txn.execute(
"""
SELECT DISTINCT p2.user_id
FROM users_in_public_rooms as p1
INNER JOIN users_in_public_rooms as p2
ON p1.room_id = p2.room_id
AND p1.user_id != p2.user_id
AND p1.user_id = ?
UNION ALL
SELECT DISTINCT other_user_id
FROM users_who_share_private_rooms
WHERE
user_id = ?
""",
(user_id, user_id),
)
rows = self.db_pool.cursor_to_dict(txn)
return rows
rows = await self.db_pool.runInteraction(
"get_users_who_share_room_with_user", _get_users_who_share_room_with_user
)
user_who_share_room = set()
for room_id in room_ids:
user_ids = await self.get_users_in_room(
room_id, on_invalidate=cache_context.invalidate
)
user_who_share_room.update(user_ids)
return user_who_share_room
return {row.get("user_id") or row.get("other_user_id") for row in rows}
async def get_joined_users_from_context(
self, event: EventBase, context: EventContext

View File

@@ -585,7 +585,11 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
)
self.assertEqual(expected_state.state, PresenceState.ONLINE)
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
destinations={"server2", "server3"}, states=[expected_state]
# "server" is included here as it appears when getting the current hosts for
# the room. send_presence_to_destinations will remove the host server before
# sending out presence.
destinations=frozenset({"server", "server2", "server3"}),
states=[expected_state],
)
def _add_new_user(self, room_id, user_id):