Compare commits

...

12 Commits

Author SHA1 Message Date
Andrew Morgan
88d8a7cd19 log invalidating 2020-05-07 15:33:57 +01:00
Andrew Morgan
14a9d59edf Up get_users_in_room cache size again 2020-05-07 15:08:52 +01:00
Andrew Morgan
fac30c0554 Don't async too many things 2020-05-07 15:00:02 +01:00
Andrew Morgan
908d15e904 Convert get_users_who_share_room_with_user to async/await 2020-05-07 14:56:01 +01:00
Andrew Morgan
0a1ebe5442 More logging, make get_users_who_share_room_with_user non-iterable 2020-05-07 14:46:47 +01:00
Andrew Morgan
9316091c8c Return get_users_in_room cache size 2020-05-07 14:26:04 +01:00
Andrew Morgan
7d54f2413f Increase get_users_in_room cache 2020-05-07 14:02:49 +01:00
Andrew Morgan
98a6910d94 Cache debugging 2020-05-07 13:57:11 +01:00
Andrew Morgan
c08273f529 undo more dbkr 2020-05-07 13:54:22 +01:00
Andrew Morgan
edca4cf1bb More debugging 2020-05-07 12:20:36 +01:00
Andrew Morgan
9d7fcb2fda Back out always telling user about their own devices 2020-05-07 12:06:32 +01:00
Andrew Morgan
1786b1ee0d Add debug logging 2020-05-07 11:58:28 +01:00
8 changed files with 31 additions and 17 deletions

View File

@@ -122,17 +122,17 @@ class DeviceWorkerHandler(BaseHandler):
# First we check if any devices have changed for users that we share
# rooms with.
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
users_who_share_room = yield defer.ensureDeferred(self.store.get_users_who_share_room_with_user(
user_id
)
))
tracked_users = set(users_who_share_room)
#tracked_users = set(users_who_share_room)
# Always tell the user about their own devices
tracked_users.add(user_id)
#tracked_users.add(user_id)
changed = yield self.store.get_users_whose_devices_changed(
from_token.device_list_key, tracked_users
from_token.device_list_key, users_who_share_room
)
# Then work out if any users have since joined
@@ -444,9 +444,10 @@ class DeviceHandler(DeviceWorkerHandler):
"""Notify that a user's device(s) has changed. Pokes the notifier, and
remote servers if the user is local.
"""
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
logger.info("get_users_who_share_room... called from notify_device_update")
users_who_share_room = yield defer.ensureDeferred(self.store.get_users_who_share_room_with_user(
user_id
)
))
hosts = set()
if self.hs.is_mine_id(user_id):

View File

@@ -172,6 +172,7 @@ class EventHandler(BaseHandler):
if not event:
return None
logger.info("get_users_in_room called from get_event!")
users = await self.store.get_users_in_room(event.room_id)
is_peeking = user.to_string() not in users

View File

@@ -1098,6 +1098,7 @@ class PresenceEventSource(object):
users_interested_in = set()
users_interested_in.add(user_id) # So that we receive our own presence
logger.info("get_users_who_share_room... _get_interested_in")
users_who_share_room = await self.store.get_users_who_share_room_with_user(
user_id, on_invalidate=cache_context.invalidate
)

View File

@@ -308,12 +308,14 @@ class SyncHandler(object):
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
logger.info("_wait_for_sync_for_user1")
result = await self.current_sync_for_user(
sync_config, since_token, full_state=full_state
)
else:
def current_sync_callback(before_token, after_token):
logger.info("_wait_for_sync_for_user2")
return self.current_sync_for_user(sync_config, since_token)
result = await self.notifier.wait_for_events(
@@ -340,6 +342,7 @@ class SyncHandler(object):
) -> SyncResult:
"""Get the sync for client needed to match what the server has now.
"""
logger.info("current_sync_for_user")
return await self.generate_sync_result(sync_config, since_token, full_state)
async def push_rules_for_user(self, user: UserID) -> JsonDict:
@@ -1139,6 +1142,8 @@ class SyncHandler(object):
# room with by looking at all users that have left a room plus users
# that were in a room we've left.
logger.info("get_users_who_share_room... called from _generate_sync_entry")
logger.info("*Called with %s", user_id)
users_who_share_room = await self.store.get_users_who_share_room_with_user(
user_id
)
@@ -1146,15 +1151,15 @@ class SyncHandler(object):
# Always tell the user about their own devices. We check as the user
# ID is almost certainly already included (unless they're not in any
# rooms) and taking a copy of the set is relatively expensive.
if user_id not in users_who_share_room:
users_who_share_room = set(users_who_share_room)
users_who_share_room.add(user_id)
#if user_id not in users_who_share_room:
# users_who_share_room = set(users_who_share_room)
# users_who_share_room.add(user_id)
tracked_users = users_who_share_room
#tracked_users = users_who_share_room
# Step 1a, check for changes in devices of users we share a room with
users_that_have_changed = await self.store.get_users_whose_devices_changed(
since_token.device_list_key, tracked_users
since_token.device_list_key, users_who_share_room
)
# Step 1b, check for newly joined rooms

View File

@@ -135,6 +135,7 @@ class SlavedEventStore(
)
if data.type == EventTypes.Member:
logger.info("INVALIDATING get_rooms_for_user_with_stream_ordering")
self.get_rooms_for_user_with_stream_ordering.invalidate(
(data.state_key,)
)

View File

@@ -171,6 +171,7 @@ class SyncRestServlet(RestServlet):
user.to_string(), affect_presence=affect_presence
)
with context:
logger.info("Sync servlet")
sync_result = await self.sync_handler.wait_for_sync_for_user(
sync_config,
since_token=since_token,

View File

@@ -605,6 +605,7 @@ class EventsStore(
}
for member in members_changed:
logger.info("INVALIDATING get_rooms_for_user_with_stream_ordering")
txn.call_after(
self.get_rooms_for_user_with_stream_ordering.invalidate, (member,)
)

View File

@@ -15,6 +15,7 @@
# limitations under the License.
import logging
import traceback
from typing import Iterable, List, Set
from six import iteritems, itervalues
@@ -163,8 +164,9 @@ class RoomMemberWorkerStore(EventsWorkerStore):
hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids)
return hosts
@cached(max_entries=100000, iterable=True)
@cached(max_entries=1000000, iterable=True)
def get_users_in_room(self, room_id):
logger.info("Traceback: %s", traceback.format_stack())
return self.db.runInteraction(
"get_users_in_room", self.get_users_in_room_txn, room_id
)
@@ -484,17 +486,18 @@ class RoomMemberWorkerStore(EventsWorkerStore):
)
return frozenset(r.room_id for r in rooms)
@cachedInlineCallbacks(max_entries=500000, cache_context=True, iterable=True)
def get_users_who_share_room_with_user(self, user_id, cache_context):
@cached(max_entries=500000, cache_context=True, iterable=True)
async def get_users_who_share_room_with_user(self, user_id, cache_context):
"""Returns the set of users who share a room with `user_id`
"""
room_ids = yield self.get_rooms_for_user(
logger.info("Called with %s %s", user_id, cache_context)
room_ids = await self.get_rooms_for_user(
user_id, on_invalidate=cache_context.invalidate
)
user_who_share_room = set()
for room_id in room_ids:
user_ids = yield self.get_users_in_room(
user_ids = await self.get_users_in_room(
room_id, on_invalidate=cache_context.invalidate
)
user_who_share_room.update(user_ids)