Move registrations off the main worker (#18552)

This is mainly moving a few store methods around. Note that this doesn't
yet remove the replication servlet to avoid breaking during rollout.
This commit is contained in:
Quentin Gliech
2025-07-10 15:13:27 +02:00
committed by GitHub
parent 66daf0bfae
commit 1dc29563c1
6 changed files with 182 additions and 189 deletions

1
changelog.d/18552.misc Normal file
View File

@@ -0,0 +1 @@
Allow user registrations to be done on workers.

View File

@@ -117,6 +117,14 @@ each upgrade are complete before moving on to the next upgrade, to avoid
stacking them up. You can monitor the currently running background updates with
[the Admin API](usage/administration/admin_api/background_updates.html#status).
# Upgrading to v1.135.0
## `on_user_registration` module API callback may now run on any worker
Previously, the `on_user_registration` callback would only run on the main
process. Modules relying on this callback must assume that they may now be
called from any worker, not just the main process.
# Upgrading to v1.134.0
## ICU bundled with Synapse

View File

@@ -118,7 +118,6 @@ class GenericWorkerStore(
# FIXME(https://github.com/matrix-org/synapse/issues/3714): We need to add
# UserDirectoryStore as we write directly rather than going via the correct worker.
UserDirectoryStore,
StatsStore,
UIAuthWorkerStore,
EndToEndRoomKeyStore,
PresenceStore,
@@ -154,6 +153,7 @@ class GenericWorkerStore(
StreamWorkerStore,
EventsWorkerStore,
RegistrationWorkerStore,
StatsStore,
SearchStore,
TransactionWorkerStore,
LockStore,

View File

@@ -49,7 +49,6 @@ from synapse.http.servlet import assert_params_in_dict
from synapse.replication.http.login import RegisterDeviceReplicationServlet
from synapse.replication.http.register import (
ReplicationPostRegisterActionsServlet,
ReplicationRegisterServlet,
)
from synapse.spam_checker_api import RegistrationBehaviour
from synapse.types import GUEST_USER_ID_PATTERN, RoomAlias, UserID, create_requester
@@ -120,7 +119,6 @@ class RegistrationHandler:
self._spam_checker_module_callbacks = hs.get_module_api_callbacks().spam_checker
if hs.config.worker.worker_app:
self._register_client = ReplicationRegisterServlet.make_client(hs)
self._register_device_client = RegisterDeviceReplicationServlet.make_client(
hs
)
@@ -738,37 +736,20 @@ class RegistrationHandler:
shadow_banned: Whether to shadow-ban the user
approved: Whether to mark the user as approved by an administrator
"""
if self.hs.config.worker.worker_app:
await self._register_client(
user_id=user_id,
password_hash=password_hash,
was_guest=was_guest,
make_guest=make_guest,
appservice_id=appservice_id,
create_profile_with_displayname=create_profile_with_displayname,
admin=admin,
user_type=user_type,
address=address,
shadow_banned=shadow_banned,
approved=approved,
)
else:
await self.store.register_user(
user_id=user_id,
password_hash=password_hash,
was_guest=was_guest,
make_guest=make_guest,
appservice_id=appservice_id,
create_profile_with_displayname=create_profile_with_displayname,
admin=admin,
user_type=user_type,
shadow_banned=shadow_banned,
approved=approved,
)
await self.store.register_user(
user_id=user_id,
password_hash=password_hash,
was_guest=was_guest,
make_guest=make_guest,
appservice_id=appservice_id,
create_profile_with_displayname=create_profile_with_displayname,
admin=admin,
user_type=user_type,
shadow_banned=shadow_banned,
approved=approved,
)
# Only call the account validity module(s) on the main process, to avoid
# repeating e.g. database writes on all of the workers.
await self._account_validity_handler.on_user_registration(user_id)
await self._account_validity_handler.on_user_registration(user_id)
async def register_device(
self,

View File

@@ -33,6 +33,8 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# FIXME(2025-07-22): Remove this on the next release, this may only be used
# during rollout to Synapse 1.134 and can be removed after that release.
class ReplicationRegisterServlet(ReplicationEndpoint):
"""Register a new user"""

View File

@@ -175,7 +175,7 @@ class ThreepidValidationSession:
"""timestamp of when this session was validated if so"""
class RegistrationWorkerStore(CacheInvalidationWorkerStore):
class RegistrationWorkerStore(StatsStore, CacheInvalidationWorkerStore):
def __init__(
self,
database: DatabasePool,
@@ -217,12 +217,167 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
self._set_expiration_date_when_missing,
)
# If support for MSC3866 is enabled and configured to require approval for new
# account, we will create new users with an 'approved' flag set to false.
self._require_approval = (
hs.config.experimental.msc3866.enabled
and hs.config.experimental.msc3866.require_approval_for_new_accounts
)
# Create a background job for culling expired 3PID validity tokens
if hs.config.worker.run_background_tasks:
self._clock.looping_call(
self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS
)
async def register_user(
self,
user_id: str,
password_hash: Optional[str] = None,
was_guest: bool = False,
make_guest: bool = False,
appservice_id: Optional[str] = None,
create_profile_with_displayname: Optional[str] = None,
admin: bool = False,
user_type: Optional[str] = None,
shadow_banned: bool = False,
approved: bool = False,
) -> None:
"""Attempts to register an account.
Args:
user_id: The desired user ID to register.
password_hash: Optional. The password hash for this user.
was_guest: Whether this is a guest account being upgraded to a
non-guest account.
make_guest: True if the the new user should be guest, false to add a
regular user account.
appservice_id: The ID of the appservice registering the user.
create_profile_with_displayname: Optionally create a profile for
the user, setting their displayname to the given value
admin: is an admin user?
user_type: type of user. One of the values from api.constants.UserTypes,
a custom value set in the configuration file, or None for a normal
user.
shadow_banned: Whether the user is shadow-banned, i.e. they may be
told their requests succeeded but we ignore them.
approved: Whether to consider the user has already been approved by an
administrator.
Raises:
StoreError if the user_id could not be registered.
"""
await self.db_pool.runInteraction(
"register_user",
self._register_user,
user_id,
password_hash,
was_guest,
make_guest,
appservice_id,
create_profile_with_displayname,
admin,
user_type,
shadow_banned,
approved,
)
def _register_user(
self,
txn: LoggingTransaction,
user_id: str,
password_hash: Optional[str],
was_guest: bool,
make_guest: bool,
appservice_id: Optional[str],
create_profile_with_displayname: Optional[str],
admin: bool,
user_type: Optional[str],
shadow_banned: bool,
approved: bool,
) -> None:
user_id_obj = UserID.from_string(user_id)
now = int(self._clock.time())
user_approved = approved or not self._require_approval
try:
if was_guest:
# Ensure that the guest user actually exists
# ``allow_none=False`` makes this raise an exception
# if the row isn't in the database.
self.db_pool.simple_select_one_txn(
txn,
"users",
keyvalues={"name": user_id, "is_guest": 1},
retcols=("name",),
allow_none=False,
)
self.db_pool.simple_update_one_txn(
txn,
"users",
keyvalues={"name": user_id, "is_guest": 1},
updatevalues={
"password_hash": password_hash,
"upgrade_ts": now,
"is_guest": 1 if make_guest else 0,
"appservice_id": appservice_id,
"admin": 1 if admin else 0,
"user_type": user_type,
"shadow_banned": shadow_banned,
"approved": user_approved,
},
)
else:
self.db_pool.simple_insert_txn(
txn,
"users",
values={
"name": user_id,
"password_hash": password_hash,
"creation_ts": now,
"is_guest": 1 if make_guest else 0,
"appservice_id": appservice_id,
"admin": 1 if admin else 0,
"user_type": user_type,
"shadow_banned": shadow_banned,
"approved": user_approved,
},
)
except self.database_engine.module.IntegrityError:
raise StoreError(400, "User ID already taken.", errcode=Codes.USER_IN_USE)
if self._account_validity_enabled:
self.set_expiration_date_for_user_txn(txn, user_id)
if create_profile_with_displayname:
# set a default displayname serverside to avoid ugly race
# between auto-joins and clients trying to set displaynames
#
# *obviously* the 'profiles' table uses localpart for user_id
# while everything else uses the full mxid.
txn.execute(
"INSERT INTO profiles(full_user_id, user_id, displayname) VALUES (?,?,?)",
(user_id, user_id_obj.localpart, create_profile_with_displayname),
)
if self.hs.config.stats.stats_enabled:
# we create a new completed user statistics row
# we don't strictly need current_token since this user really can't
# have any state deltas before now (as it is a new user), but still,
# we include it for completeness.
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
self._update_stats_delta_txn(
txn, now, "user", user_id, {}, complete_with_stream_id=current_token
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
@cached()
async def get_user_by_id(self, user_id: str) -> Optional[UserInfo]:
"""Returns info about the user account, if it exists."""
@@ -2354,7 +2509,7 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
return nb_processed
class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
class RegistrationStore(RegistrationBackgroundUpdateStore):
def __init__(
self,
database: DatabasePool,
@@ -2370,13 +2525,6 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id")
# If support for MSC3866 is enabled and configured to require approval for new
# account, we will create new users with an 'approved' flag set to false.
self._require_approval = (
hs.config.experimental.msc3866.enabled
and hs.config.experimental.msc3866.require_approval_for_new_accounts
)
# Create a background job for removing expired login tokens
if hs.config.worker.run_background_tasks:
self._clock.looping_call(
@@ -2524,153 +2672,6 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
device_id,
)
async def register_user(
self,
user_id: str,
password_hash: Optional[str] = None,
was_guest: bool = False,
make_guest: bool = False,
appservice_id: Optional[str] = None,
create_profile_with_displayname: Optional[str] = None,
admin: bool = False,
user_type: Optional[str] = None,
shadow_banned: bool = False,
approved: bool = False,
) -> None:
"""Attempts to register an account.
Args:
user_id: The desired user ID to register.
password_hash: Optional. The password hash for this user.
was_guest: Whether this is a guest account being upgraded to a
non-guest account.
make_guest: True if the the new user should be guest, false to add a
regular user account.
appservice_id: The ID of the appservice registering the user.
create_profile_with_displayname: Optionally create a profile for
the user, setting their displayname to the given value
admin: is an admin user?
user_type: type of user. One of the values from api.constants.UserTypes,
a custom value set in the configuration file, or None for a normal
user.
shadow_banned: Whether the user is shadow-banned, i.e. they may be
told their requests succeeded but we ignore them.
approved: Whether to consider the user has already been approved by an
administrator.
Raises:
StoreError if the user_id could not be registered.
"""
await self.db_pool.runInteraction(
"register_user",
self._register_user,
user_id,
password_hash,
was_guest,
make_guest,
appservice_id,
create_profile_with_displayname,
admin,
user_type,
shadow_banned,
approved,
)
def _register_user(
self,
txn: LoggingTransaction,
user_id: str,
password_hash: Optional[str],
was_guest: bool,
make_guest: bool,
appservice_id: Optional[str],
create_profile_with_displayname: Optional[str],
admin: bool,
user_type: Optional[str],
shadow_banned: bool,
approved: bool,
) -> None:
user_id_obj = UserID.from_string(user_id)
now = int(self._clock.time())
user_approved = approved or not self._require_approval
try:
if was_guest:
# Ensure that the guest user actually exists
# ``allow_none=False`` makes this raise an exception
# if the row isn't in the database.
self.db_pool.simple_select_one_txn(
txn,
"users",
keyvalues={"name": user_id, "is_guest": 1},
retcols=("name",),
allow_none=False,
)
self.db_pool.simple_update_one_txn(
txn,
"users",
keyvalues={"name": user_id, "is_guest": 1},
updatevalues={
"password_hash": password_hash,
"upgrade_ts": now,
"is_guest": 1 if make_guest else 0,
"appservice_id": appservice_id,
"admin": 1 if admin else 0,
"user_type": user_type,
"shadow_banned": shadow_banned,
"approved": user_approved,
},
)
else:
self.db_pool.simple_insert_txn(
txn,
"users",
values={
"name": user_id,
"password_hash": password_hash,
"creation_ts": now,
"is_guest": 1 if make_guest else 0,
"appservice_id": appservice_id,
"admin": 1 if admin else 0,
"user_type": user_type,
"shadow_banned": shadow_banned,
"approved": user_approved,
},
)
except self.database_engine.module.IntegrityError:
raise StoreError(400, "User ID already taken.", errcode=Codes.USER_IN_USE)
if self._account_validity_enabled:
self.set_expiration_date_for_user_txn(txn, user_id)
if create_profile_with_displayname:
# set a default displayname serverside to avoid ugly race
# between auto-joins and clients trying to set displaynames
#
# *obviously* the 'profiles' table uses localpart for user_id
# while everything else uses the full mxid.
txn.execute(
"INSERT INTO profiles(full_user_id, user_id, displayname) VALUES (?,?,?)",
(user_id, user_id_obj.localpart, create_profile_with_displayname),
)
if self.hs.config.stats.stats_enabled:
# we create a new completed user statistics row
# we don't strictly need current_token since this user really can't
# have any state deltas before now (as it is a new user), but still,
# we include it for completeness.
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
self._update_stats_delta_txn(
txn, now, "user", user_id, {}, complete_with_stream_id=current_token
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
async def user_set_password_hash(
self, user_id: str, password_hash: Optional[str]
) -> None: