mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Compare commits
11 Commits
shay/add_c
...
rei/userdi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4fdd64799e | ||
|
|
d4df71d857 | ||
|
|
fc6c5ae213 | ||
|
|
0db4dc8132 | ||
|
|
d49f230249 | ||
|
|
48a637a6ff | ||
|
|
1552fa44db | ||
|
|
d448469ea7 | ||
|
|
461cdb631f | ||
|
|
04d091fbcb | ||
|
|
ac566f45f6 |
1
changelog.d/15091.bugfix
Normal file
1
changelog.d/15091.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix a long-standing bug in which the user directory would assume any remote membership state events represent a profile change.
|
||||
@@ -283,6 +283,13 @@ class MockHomeserver:
|
||||
def get_replication_notifier(self) -> ReplicationNotifier:
|
||||
return ReplicationNotifier()
|
||||
|
||||
def get_user_directory_handler(self) -> object:
|
||||
class FakeUserDirectoryHandler:
|
||||
def kick_off_remote_profile_refresh_process(self) -> None:
|
||||
pass
|
||||
|
||||
return FakeUserDirectoryHandler()
|
||||
|
||||
|
||||
class Porter:
|
||||
def __init__(
|
||||
|
||||
@@ -422,6 +422,21 @@ class RemoteServerUpCommand(_SimpleCommand):
|
||||
NAME = "REMOTE_SERVER_UP"
|
||||
|
||||
|
||||
class ReadyToRefreshStaleUserDirectoryProfilesCommand(_SimpleCommand):
|
||||
"""
|
||||
Sent when a worker needs to tell the user directory worker that there are
|
||||
stale remote user profiles that require refreshing.
|
||||
|
||||
Triggered when the user directory background update has been completed.
|
||||
|
||||
Format::
|
||||
|
||||
USER_DIRECTORY_READY_TO_REFRESH_STALE_REMOTE_PROFILES ''
|
||||
"""
|
||||
|
||||
NAME = "USER_DIRECTORY_READY_TO_REFRESH_STALE_REMOTE_PROFILES"
|
||||
|
||||
|
||||
_COMMANDS: Tuple[Type[Command], ...] = (
|
||||
ServerCommand,
|
||||
RdataCommand,
|
||||
@@ -435,6 +450,7 @@ _COMMANDS: Tuple[Type[Command], ...] = (
|
||||
UserIpCommand,
|
||||
RemoteServerUpCommand,
|
||||
ClearUserSyncsCommand,
|
||||
ReadyToRefreshStaleUserDirectoryProfilesCommand,
|
||||
)
|
||||
|
||||
# Map of command name to command type.
|
||||
@@ -448,6 +464,7 @@ VALID_SERVER_COMMANDS = (
|
||||
ErrorCommand.NAME,
|
||||
PingCommand.NAME,
|
||||
RemoteServerUpCommand.NAME,
|
||||
ReadyToRefreshStaleUserDirectoryProfilesCommand.NAME,
|
||||
)
|
||||
|
||||
# The commands the client is allowed to send
|
||||
@@ -461,6 +478,7 @@ VALID_CLIENT_COMMANDS = (
|
||||
UserIpCommand.NAME,
|
||||
ErrorCommand.NAME,
|
||||
RemoteServerUpCommand.NAME,
|
||||
ReadyToRefreshStaleUserDirectoryProfilesCommand.NAME,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -138,10 +138,15 @@ class BackgroundUpdateStartJobRestServlet(RestServlet):
|
||||
"populate_user_directory_process_rooms",
|
||||
),
|
||||
(
|
||||
"populate_user_directory_cleanup",
|
||||
"populate_user_directory_process_remote_users",
|
||||
"{}",
|
||||
"populate_user_directory_process_users",
|
||||
),
|
||||
(
|
||||
"populate_user_directory_cleanup",
|
||||
"{}",
|
||||
"populate_user_directory_process_remote_users",
|
||||
),
|
||||
]
|
||||
else:
|
||||
raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name")
|
||||
|
||||
@@ -27,6 +27,10 @@ from typing import (
|
||||
cast,
|
||||
)
|
||||
|
||||
from synapse.replication.tcp.commands import (
|
||||
ReadyToRefreshStaleUserDirectoryProfilesCommand,
|
||||
)
|
||||
|
||||
try:
|
||||
# Figure out if ICU support is available for searching users.
|
||||
import icu
|
||||
@@ -91,17 +95,32 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
||||
)
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
"populate_user_directory_process_users",
|
||||
self._populate_user_directory_process_users,
|
||||
self._populate_user_directory_process_local_users,
|
||||
)
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
"populate_user_directory_process_remote_users",
|
||||
self._populate_user_directory_process_remote_users,
|
||||
)
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
"populate_user_directory_cleanup", self._populate_user_directory_cleanup
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _delete_staging_area(txn: LoggingTransaction) -> None:
|
||||
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
|
||||
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users")
|
||||
txn.execute(
|
||||
"DROP TABLE IF EXISTS " + TEMP_TABLE + "_remote_users_needing_lookup"
|
||||
)
|
||||
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
|
||||
|
||||
async def _populate_user_directory_createtables(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
# Get all the rooms that we want to process.
|
||||
def _make_staging_area(txn: LoggingTransaction) -> None:
|
||||
# Clear out any tables if they already exist beforehand.
|
||||
UserDirectoryBackgroundUpdateStore._delete_staging_area(txn)
|
||||
|
||||
sql = (
|
||||
"CREATE TABLE IF NOT EXISTS "
|
||||
+ TEMP_TABLE
|
||||
@@ -142,6 +161,18 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
||||
txn, TEMP_TABLE + "_users", keys=("user_id",), values=users
|
||||
)
|
||||
|
||||
# A table for storing a list of remote users that *may* need a remote
|
||||
# lookup in order to obtain a public profile.
|
||||
# The list should be compared against the user directory's cache
|
||||
# to see whether any queries can be skipped because the remote user
|
||||
# also appeared in a public room.
|
||||
sql = (
|
||||
"CREATE TABLE IF NOT EXISTS "
|
||||
+ TEMP_TABLE
|
||||
+ "_remote_users_needing_lookup(user_id TEXT PRIMARY KEY NOT NULL)"
|
||||
)
|
||||
txn.execute(sql)
|
||||
|
||||
new_pos = await self.get_max_stream_id_in_current_state_deltas()
|
||||
await self.db_pool.runInteraction(
|
||||
"populate_user_directory_temp_build", _make_staging_area
|
||||
@@ -168,13 +199,9 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
||||
)
|
||||
await self.update_user_directory_stream_pos(position)
|
||||
|
||||
def _delete_staging_area(txn: LoggingTransaction) -> None:
|
||||
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
|
||||
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users")
|
||||
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"populate_user_directory_cleanup", _delete_staging_area
|
||||
"populate_user_directory_cleanup",
|
||||
UserDirectoryBackgroundUpdateStore._delete_staging_area,
|
||||
)
|
||||
|
||||
await self.db_pool.updates._end_background_update(
|
||||
@@ -262,10 +289,17 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
||||
or await self.should_include_local_user_in_dir(user_id)
|
||||
}
|
||||
|
||||
# Determine whether the room is public
|
||||
is_public = await self.is_room_world_readable_or_publicly_joinable(
|
||||
room_id
|
||||
)
|
||||
|
||||
remote_users_to_query_later = set()
|
||||
|
||||
# Upsert a user_directory record for each remote user we see.
|
||||
for user_id, profile in users_with_profile.items():
|
||||
# Local users are processed separately in
|
||||
# `_populate_user_directory_users`; there we can read from
|
||||
# `_populate_user_directory_local_users`; there we can read from
|
||||
# the `profiles` table to ensure we don't leak their per-room
|
||||
# profiles. It also means we write local users to this table
|
||||
# exactly once, rather than once for every room they're in.
|
||||
@@ -274,14 +308,29 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
||||
# TODO `users_with_profile` above reads from the `user_directory`
|
||||
# table, meaning that `profile` is bespoke to this room.
|
||||
# and this leaks remote users' per-room profiles to the user directory.
|
||||
await self.update_profile_in_user_dir(
|
||||
user_id, profile.display_name, profile.avatar_url
|
||||
)
|
||||
if is_public:
|
||||
# If this is a public room, it's acceptable to add the profile
|
||||
# into the user directory.
|
||||
await self.update_profile_in_user_dir(
|
||||
user_id, profile.display_name, profile.avatar_url
|
||||
)
|
||||
else:
|
||||
# Otherwise query the user at a later time
|
||||
remote_users_to_query_later.add(user_id)
|
||||
|
||||
# (insert the remote users needing a query in batch;
|
||||
# use upsert with no values for 'INSERT OR IGNORE' semantics)
|
||||
await self.db_pool.simple_upsert_many(
|
||||
f"{TEMP_TABLE}_remote_users_needing_lookup",
|
||||
("user_id",),
|
||||
[(u,) for u in remote_users_to_query_later],
|
||||
(),
|
||||
(),
|
||||
desc="populate_user_directory_queue_remote_needing_lookup",
|
||||
)
|
||||
del remote_users_to_query_later
|
||||
|
||||
# Now update the room sharing tables to include this room.
|
||||
is_public = await self.is_room_world_readable_or_publicly_joinable(
|
||||
room_id
|
||||
)
|
||||
if is_public:
|
||||
if users_with_profile:
|
||||
await self.add_users_in_public_rooms(
|
||||
@@ -336,7 +385,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
||||
|
||||
return processed_event_count
|
||||
|
||||
async def _populate_user_directory_process_users(
|
||||
async def _populate_user_directory_process_local_users(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
"""
|
||||
@@ -404,6 +453,114 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
||||
|
||||
return len(users_to_work_on)
|
||||
|
||||
async def _populate_user_directory_process_remote_users(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
"""
|
||||
Sorts through the `_remote_users_needing_lookup` table and adds the
|
||||
users within to the list of stale remote profiles,
|
||||
unless we already populated a user directory entry for them (i.e. they were
|
||||
also in a public room).
|
||||
"""
|
||||
|
||||
def _get_next_batch_txn(
|
||||
txn: LoggingTransaction, done_up_to_user_id: str
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Given the last user ID we've processed,
|
||||
Returns
|
||||
- a user ID to process up to and including; or
|
||||
- `None` if there is no limit left (i.e. we should just process all
|
||||
remaining rows).
|
||||
"""
|
||||
# Should be a B-Tree index only scan: so reasonably efficient despite the
|
||||
# OFFSET
|
||||
# If we're lucky, will also warm up the disk cache for the subsequent query
|
||||
# that actually does some work.
|
||||
txn.execute(
|
||||
f"""
|
||||
SELECT user_id
|
||||
FROM {TEMP_TABLE}_remote_users_needing_lookup
|
||||
WHERE user_id > ?
|
||||
ORDER BY user_id
|
||||
LIMIT 1 OFFSET ?
|
||||
""",
|
||||
(done_up_to_user_id, batch_size),
|
||||
)
|
||||
row = txn.fetchone()
|
||||
if row:
|
||||
return row[0]
|
||||
else:
|
||||
return None
|
||||
|
||||
def _add_private_only_users_to_stale_profile_refresh_queue_txn(
|
||||
txn: LoggingTransaction, from_exc: str, until_inc: Optional[str]
|
||||
) -> None:
|
||||
end_condition = "AND user_id <= ?" if until_inc is not None else ""
|
||||
end_args = (until_inc,) if until_inc is not None else ()
|
||||
|
||||
user_id_serverpart: str
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
user_id_serverpart = (
|
||||
"SUBSTRING(user_id FROM POSITION(':' IN user_id) + 1)"
|
||||
)
|
||||
elif isinstance(self.database_engine, Sqlite3Engine):
|
||||
user_id_serverpart = "SUBSTR(user_id, INSTR(user_id, ':') + 1)"
|
||||
else:
|
||||
raise RuntimeError("Unknown database engine!")
|
||||
|
||||
txn.execute(
|
||||
f"""
|
||||
INSERT INTO user_directory_stale_remote_users
|
||||
(user_id, next_try_at_ts, retry_counter, user_server_name)
|
||||
SELECT
|
||||
user_id, 0, 0, {user_id_serverpart}
|
||||
FROM {TEMP_TABLE}_remote_users_needing_lookup AS runl
|
||||
LEFT JOIN user_directory AS ud USING (user_id)
|
||||
WHERE ud.user_id IS NULL
|
||||
AND ? < user_id {end_condition}
|
||||
""",
|
||||
(from_exc,) + end_args,
|
||||
)
|
||||
|
||||
def _do_txn(txn: LoggingTransaction) -> None:
|
||||
"""
|
||||
Does a step of background update.
|
||||
"""
|
||||
last_user_id = progress.get("last_user_id", "@")
|
||||
next_end_limit_inc = _get_next_batch_txn(txn, last_user_id)
|
||||
_add_private_only_users_to_stale_profile_refresh_queue_txn(
|
||||
txn, last_user_id, next_end_limit_inc
|
||||
)
|
||||
|
||||
# Update the progress
|
||||
progress["last_user_id"] = next_end_limit_inc
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn, "populate_user_directory_process_remote_users", progress
|
||||
)
|
||||
|
||||
if progress.get("last_user_id", "@") is None:
|
||||
await self.db_pool.updates._end_background_update(
|
||||
"populate_user_directory_process_remote_users"
|
||||
)
|
||||
|
||||
# Now kick off querying remote homeservers for profile information.
|
||||
if self.hs.config.worker.should_update_user_directory:
|
||||
self.hs.get_user_directory_handler().kick_off_remote_profile_refresh_process()
|
||||
else:
|
||||
command_handler = self.hs.get_replication_command_handler()
|
||||
command_handler.send_command(
|
||||
ReadyToRefreshStaleUserDirectoryProfilesCommand("")
|
||||
)
|
||||
|
||||
return 1
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"populate_user_directory_process_remote_users",
|
||||
_do_txn,
|
||||
)
|
||||
return batch_size
|
||||
|
||||
async def should_include_local_user_in_dir(self, user: str) -> bool:
|
||||
"""Certain classes of local user are omitted from the user directory.
|
||||
Is this user one of them?
|
||||
|
||||
@@ -0,0 +1,40 @@
|
||||
/* Copyright 2023 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Rebuild the user directory in light of the fix for leaking the per-room
|
||||
-- profiles of remote users to the user directory.
|
||||
|
||||
-- First cancel any existing rebuilds if already pending; we'll run from fresh.
|
||||
DELETE FROM background_updates WHERE update_name IN (
|
||||
'populate_user_directory_createtables',
|
||||
'populate_user_directory_process_rooms',
|
||||
'populate_user_directory_process_users',
|
||||
'populate_user_directory_process_remote_users',
|
||||
'populate_user_directory_cleanup'
|
||||
);
|
||||
|
||||
-- Then schedule the steps.
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
|
||||
-- Set up user directory staging tables.
|
||||
(7402, 'populate_user_directory_createtables', '{}', NULL),
|
||||
-- Run through each room and update the user directory according to who is in it.
|
||||
(7402, 'populate_user_directory_process_rooms', '{}', 'populate_user_directory_createtables'),
|
||||
-- Insert all users into the user directory, if search_all_users is on.
|
||||
(7402, 'populate_user_directory_process_users', '{}', 'populate_user_directory_process_rooms'),
|
||||
-- Insert remote users into the queue for fetching.
|
||||
(7402, 'populate_user_directory_process_remote_users', '{}', 'populate_user_directory_process_users'),
|
||||
-- Clean up user directory staging tables.
|
||||
(7402, 'populate_user_directory_cleanup', '{}', 'populate_user_directory_process_remote_users')
|
||||
ON CONFLICT (update_name) DO NOTHING;
|
||||
@@ -1128,9 +1128,10 @@ class UserDirectoryRemoteProfileTestCase(unittest.HomeserverTestCase):
|
||||
self.user_dir_handler = hs.get_user_directory_handler()
|
||||
self.profile_handler = hs.get_profile_handler()
|
||||
|
||||
# Cancel the startup call: in the steady-state case we can't rely on it anyway.
|
||||
assert self.user_dir_handler._refresh_remote_profiles_call_later is not None
|
||||
self.user_dir_handler._refresh_remote_profiles_call_later.cancel()
|
||||
if self.user_dir_handler._refresh_remote_profiles_call_later is not None:
|
||||
# Cancel the startup call: in the steady-state case we can't rely on
|
||||
# it anyway.
|
||||
self.user_dir_handler._refresh_remote_profiles_call_later.cancel()
|
||||
|
||||
def test_public_rooms_have_profiles_collected(self) -> None:
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user