mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-07 01:20:16 +00:00
Compare commits
11 Commits
madlittlem
...
quenting/m
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c5cd804095 | ||
|
|
fcccca4aaf | ||
|
|
0a3777e4bf | ||
|
|
d79b156798 | ||
|
|
0cf5832006 | ||
|
|
03d716e17e | ||
|
|
ced8f29f5d | ||
|
|
203210b61c | ||
|
|
0f4b31f94d | ||
|
|
01a74b6534 | ||
|
|
901dbcbe8e |
1
changelog.d/18313.misc
Normal file
1
changelog.d/18313.misc
Normal file
@@ -0,0 +1 @@
|
||||
Allow a few admin APIs used by matrix-authentication-service to run on workers.
|
||||
@@ -52,7 +52,10 @@ from synapse.logging.context import LoggingContext
|
||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
|
||||
from synapse.rest import ClientRestResource
|
||||
from synapse.rest.admin import register_servlets_for_media_repo
|
||||
from synapse.rest.admin import (
|
||||
register_servlets_for_media_repo,
|
||||
register_servlets_for_msc3861_delegation,
|
||||
)
|
||||
from synapse.rest.health import HealthResource
|
||||
from synapse.rest.key.v2 import KeyResource
|
||||
from synapse.rest.synapse.client import build_synapse_client_resource_tree
|
||||
@@ -176,8 +179,13 @@ class GenericWorkerServer(HomeServer):
|
||||
def _listen_http(self, listener_config: ListenerConfig) -> None:
|
||||
assert listener_config.http_options is not None
|
||||
|
||||
# We always include a health resource.
|
||||
resources: Dict[str, Resource] = {"/health": HealthResource()}
|
||||
# We always include an admin resource that we populate with servlets as needed
|
||||
admin_resource = JsonResource(self, canonical_json=False)
|
||||
resources: Dict[str, Resource] = {
|
||||
# We always include a health resource.
|
||||
"/health": HealthResource(),
|
||||
"/_synapse/admin": admin_resource,
|
||||
}
|
||||
|
||||
for res in listener_config.http_options.resources:
|
||||
for name in res.names:
|
||||
@@ -191,6 +199,9 @@ class GenericWorkerServer(HomeServer):
|
||||
resources.update(build_synapse_client_resource_tree(self))
|
||||
resources["/.well-known"] = well_known_resource(self)
|
||||
|
||||
if self.config.experimental.msc3861.enabled:
|
||||
register_servlets_for_msc3861_delegation(self, admin_resource)
|
||||
|
||||
elif name == "federation":
|
||||
resources[FEDERATION_PREFIX] = TransportLayerServer(self)
|
||||
elif name == "media":
|
||||
@@ -199,7 +210,6 @@ class GenericWorkerServer(HomeServer):
|
||||
|
||||
# We need to serve the admin servlets for media on the
|
||||
# worker.
|
||||
admin_resource = JsonResource(self, canonical_json=False)
|
||||
register_servlets_for_media_repo(self, admin_resource)
|
||||
|
||||
resources.update(
|
||||
@@ -207,7 +217,6 @@ class GenericWorkerServer(HomeServer):
|
||||
MEDIA_R0_PREFIX: media_repo,
|
||||
MEDIA_V3_PREFIX: media_repo,
|
||||
LEGACY_MEDIA_PREFIX: media_repo,
|
||||
"/_synapse/admin": admin_resource,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@@ -36,10 +36,17 @@ class SetPasswordHandler:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.store = hs.get_datastores().main
|
||||
self._auth_handler = hs.get_auth_handler()
|
||||
# This can only be instantiated on the main process.
|
||||
device_handler = hs.get_device_handler()
|
||||
assert isinstance(device_handler, DeviceHandler)
|
||||
self._device_handler = device_handler
|
||||
|
||||
# We don't need the device handler if password changing is disabled.
|
||||
# This allows us to instantiate the SetPasswordHandler on the workers
|
||||
# that have admin APIs for MAS
|
||||
if self._auth_handler.can_change_password():
|
||||
# This can only be instantiated on the main process.
|
||||
device_handler = hs.get_device_handler()
|
||||
assert isinstance(device_handler, DeviceHandler)
|
||||
self._device_handler: Optional[DeviceHandler] = device_handler
|
||||
else:
|
||||
self._device_handler = None
|
||||
|
||||
async def set_password(
|
||||
self,
|
||||
@@ -51,6 +58,9 @@ class SetPasswordHandler:
|
||||
if not self._auth_handler.can_change_password():
|
||||
raise SynapseError(403, "Password change disabled", errcode=Codes.FORBIDDEN)
|
||||
|
||||
# We should have this available only if password changing is enabled.
|
||||
assert self._device_handler is not None
|
||||
|
||||
try:
|
||||
await self.store.user_set_password_hash(user_id, password_hash)
|
||||
except StoreError as e:
|
||||
|
||||
@@ -187,7 +187,6 @@ class ClientRestResource(JsonResource):
|
||||
mutual_rooms.register_servlets,
|
||||
login_token_request.register_servlets,
|
||||
rendezvous.register_servlets,
|
||||
auth_metadata.register_servlets,
|
||||
]:
|
||||
continue
|
||||
|
||||
|
||||
@@ -51,6 +51,7 @@ from synapse.rest.admin.background_updates import (
|
||||
from synapse.rest.admin.devices import (
|
||||
DeleteDevicesRestServlet,
|
||||
DeviceRestServlet,
|
||||
DevicesGetRestServlet,
|
||||
DevicesRestServlet,
|
||||
)
|
||||
from synapse.rest.admin.event_reports import (
|
||||
@@ -275,7 +276,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||
"""
|
||||
Register all the admin servlets.
|
||||
"""
|
||||
# Admin servlets aren't registered on workers.
|
||||
# Admin servlets aren't registered on workers
|
||||
if hs.config.worker.worker_app is not None:
|
||||
return
|
||||
|
||||
@@ -365,3 +366,16 @@ def register_servlets_for_client_rest_resource(
|
||||
|
||||
# don't add more things here: new servlets should only be exposed on
|
||||
# /_synapse/admin so should not go here. Instead register them in AdminRestResource.
|
||||
|
||||
|
||||
def register_servlets_for_msc3861_delegation(
|
||||
hs: "HomeServer", http_server: HttpServer
|
||||
) -> None:
|
||||
"""Register servlets needed by MAS when MSC3861 is enabled"""
|
||||
if not hs.config.experimental.msc3861.enabled:
|
||||
return
|
||||
|
||||
UserRestServletV2(hs).register(http_server)
|
||||
UsernameAvailableRestServlet(hs).register(http_server)
|
||||
UserReplaceMasterCrossSigningKeyRestServlet(hs).register(http_server)
|
||||
DevicesGetRestServlet(hs).register(http_server)
|
||||
|
||||
@@ -113,18 +113,18 @@ class DeviceRestServlet(RestServlet):
|
||||
return HTTPStatus.OK, {}
|
||||
|
||||
|
||||
class DevicesRestServlet(RestServlet):
|
||||
class DevicesGetRestServlet(RestServlet):
|
||||
"""
|
||||
Retrieve the given user's devices
|
||||
|
||||
This can be mounted on workers
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/devices$", "v2")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.auth = hs.get_auth()
|
||||
handler = hs.get_device_handler()
|
||||
assert isinstance(handler, DeviceHandler)
|
||||
self.device_handler = handler
|
||||
self.device_worker_handler = hs.get_device_handler()
|
||||
self.store = hs.get_datastores().main
|
||||
self.is_mine = hs.is_mine
|
||||
|
||||
@@ -141,9 +141,24 @@ class DevicesRestServlet(RestServlet):
|
||||
if u is None:
|
||||
raise NotFoundError("Unknown user")
|
||||
|
||||
devices = await self.device_handler.get_devices_by_user(target_user.to_string())
|
||||
devices = await self.device_worker_handler.get_devices_by_user(
|
||||
target_user.to_string()
|
||||
)
|
||||
return HTTPStatus.OK, {"devices": devices, "total": len(devices)}
|
||||
|
||||
|
||||
class DevicesRestServlet(DevicesGetRestServlet):
|
||||
"""
|
||||
Retrieve the given user's devices
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/devices$", "v2")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
assert isinstance(self.device_worker_handler, DeviceHandler)
|
||||
self.device_handler = self.device_worker_handler
|
||||
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, user_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
|
||||
@@ -1501,6 +1501,45 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
|
||||
"delete_old_otks_for_next_user_batch", impl
|
||||
)
|
||||
|
||||
async def allow_master_cross_signing_key_replacement_without_uia(
|
||||
self, user_id: str, duration_ms: int
|
||||
) -> Optional[int]:
|
||||
"""Mark this user's latest master key as being replaceable without UIA.
|
||||
|
||||
Said replacement will only be permitted for a short time after calling this
|
||||
function. That time period is controlled by the duration argument.
|
||||
|
||||
Returns:
|
||||
None, if there is no such key.
|
||||
Otherwise, the timestamp before which replacement is allowed without UIA.
|
||||
"""
|
||||
timestamp = self._clock.time_msec() + duration_ms
|
||||
|
||||
def impl(txn: LoggingTransaction) -> Optional[int]:
|
||||
txn.execute(
|
||||
"""
|
||||
UPDATE e2e_cross_signing_keys
|
||||
SET updatable_without_uia_before_ms = ?
|
||||
WHERE stream_id = (
|
||||
SELECT stream_id
|
||||
FROM e2e_cross_signing_keys
|
||||
WHERE user_id = ? AND keytype = 'master'
|
||||
ORDER BY stream_id DESC
|
||||
LIMIT 1
|
||||
)
|
||||
""",
|
||||
(timestamp, user_id),
|
||||
)
|
||||
if txn.rowcount == 0:
|
||||
return None
|
||||
|
||||
return timestamp
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"allow_master_cross_signing_key_replacement_without_uia",
|
||||
impl,
|
||||
)
|
||||
|
||||
|
||||
class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||
def __init__(
|
||||
@@ -1755,42 +1794,3 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||
],
|
||||
desc="add_e2e_signing_key",
|
||||
)
|
||||
|
||||
async def allow_master_cross_signing_key_replacement_without_uia(
|
||||
self, user_id: str, duration_ms: int
|
||||
) -> Optional[int]:
|
||||
"""Mark this user's latest master key as being replaceable without UIA.
|
||||
|
||||
Said replacement will only be permitted for a short time after calling this
|
||||
function. That time period is controlled by the duration argument.
|
||||
|
||||
Returns:
|
||||
None, if there is no such key.
|
||||
Otherwise, the timestamp before which replacement is allowed without UIA.
|
||||
"""
|
||||
timestamp = self._clock.time_msec() + duration_ms
|
||||
|
||||
def impl(txn: LoggingTransaction) -> Optional[int]:
|
||||
txn.execute(
|
||||
"""
|
||||
UPDATE e2e_cross_signing_keys
|
||||
SET updatable_without_uia_before_ms = ?
|
||||
WHERE stream_id = (
|
||||
SELECT stream_id
|
||||
FROM e2e_cross_signing_keys
|
||||
WHERE user_id = ? AND keytype = 'master'
|
||||
ORDER BY stream_id DESC
|
||||
LIMIT 1
|
||||
)
|
||||
""",
|
||||
(timestamp, user_id),
|
||||
)
|
||||
if txn.rowcount == 0:
|
||||
return None
|
||||
|
||||
return timestamp
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"allow_master_cross_signing_key_replacement_without_uia",
|
||||
impl,
|
||||
)
|
||||
|
||||
@@ -2090,6 +2090,136 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
|
||||
func=is_user_approved_txn,
|
||||
)
|
||||
|
||||
async def set_user_deactivated_status(
|
||||
self, user_id: str, deactivated: bool
|
||||
) -> None:
|
||||
"""Set the `deactivated` property for the provided user to the provided value.
|
||||
|
||||
Args:
|
||||
user_id: The ID of the user to set the status for.
|
||||
deactivated: The value to set for `deactivated`.
|
||||
"""
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"set_user_deactivated_status",
|
||||
self.set_user_deactivated_status_txn,
|
||||
user_id,
|
||||
deactivated,
|
||||
)
|
||||
|
||||
def set_user_deactivated_status_txn(
|
||||
self, txn: LoggingTransaction, user_id: str, deactivated: bool
|
||||
) -> None:
|
||||
self.db_pool.simple_update_one_txn(
|
||||
txn=txn,
|
||||
table="users",
|
||||
keyvalues={"name": user_id},
|
||||
updatevalues={"deactivated": 1 if deactivated else 0},
|
||||
)
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_user_deactivated_status, (user_id,)
|
||||
)
|
||||
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
|
||||
self._invalidate_cache_and_stream(txn, self.is_guest, (user_id,))
|
||||
|
||||
async def set_user_suspended_status(self, user_id: str, suspended: bool) -> None:
|
||||
"""
|
||||
Set whether the user's account is suspended in the `users` table.
|
||||
|
||||
Args:
|
||||
user_id: The user ID of the user in question
|
||||
suspended: True if the user is suspended, false if not
|
||||
"""
|
||||
await self.db_pool.runInteraction(
|
||||
"set_user_suspended_status",
|
||||
self.set_user_suspended_status_txn,
|
||||
user_id,
|
||||
suspended,
|
||||
)
|
||||
|
||||
def set_user_suspended_status_txn(
|
||||
self, txn: LoggingTransaction, user_id: str, suspended: bool
|
||||
) -> None:
|
||||
self.db_pool.simple_update_one_txn(
|
||||
txn=txn,
|
||||
table="users",
|
||||
keyvalues={"name": user_id},
|
||||
updatevalues={"suspended": suspended},
|
||||
)
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_user_suspended_status, (user_id,)
|
||||
)
|
||||
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
|
||||
|
||||
async def set_user_locked_status(self, user_id: str, locked: bool) -> None:
|
||||
"""Set the `locked` property for the provided user to the provided value.
|
||||
|
||||
Args:
|
||||
user_id: The ID of the user to set the status for.
|
||||
locked: The value to set for `locked`.
|
||||
"""
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"set_user_locked_status",
|
||||
self.set_user_locked_status_txn,
|
||||
user_id,
|
||||
locked,
|
||||
)
|
||||
|
||||
def set_user_locked_status_txn(
|
||||
self, txn: LoggingTransaction, user_id: str, locked: bool
|
||||
) -> None:
|
||||
self.db_pool.simple_update_one_txn(
|
||||
txn=txn,
|
||||
table="users",
|
||||
keyvalues={"name": user_id},
|
||||
updatevalues={"locked": locked},
|
||||
)
|
||||
self._invalidate_cache_and_stream(txn, self.get_user_locked_status, (user_id,))
|
||||
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
|
||||
|
||||
async def update_user_approval_status(
|
||||
self, user_id: UserID, approved: bool
|
||||
) -> None:
|
||||
"""Set the user's 'approved' flag to the given value.
|
||||
|
||||
The boolean will be turned into an int (in update_user_approval_status_txn)
|
||||
because the column is a smallint.
|
||||
|
||||
Args:
|
||||
user_id: the user to update the flag for.
|
||||
approved: the value to set the flag to.
|
||||
"""
|
||||
await self.db_pool.runInteraction(
|
||||
"update_user_approval_status",
|
||||
self.update_user_approval_status_txn,
|
||||
user_id.to_string(),
|
||||
approved,
|
||||
)
|
||||
|
||||
def update_user_approval_status_txn(
|
||||
self, txn: LoggingTransaction, user_id: str, approved: bool
|
||||
) -> None:
|
||||
"""Set the user's 'approved' flag to the given value.
|
||||
|
||||
The boolean is turned into an int because the column is a smallint.
|
||||
|
||||
Args:
|
||||
txn: the current database transaction.
|
||||
user_id: the user to update the flag for.
|
||||
approved: the value to set the flag to.
|
||||
"""
|
||||
self.db_pool.simple_update_one_txn(
|
||||
txn=txn,
|
||||
table="users",
|
||||
keyvalues={"name": user_id},
|
||||
updatevalues={"approved": approved},
|
||||
)
|
||||
|
||||
# Invalidate the caches of methods that read the value of the 'approved' flag.
|
||||
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
|
||||
self._invalidate_cache_and_stream(txn, self.is_user_approved, (user_id,))
|
||||
|
||||
|
||||
class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
|
||||
def __init__(
|
||||
@@ -2202,117 +2332,6 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
|
||||
|
||||
return nb_processed
|
||||
|
||||
async def set_user_deactivated_status(
|
||||
self, user_id: str, deactivated: bool
|
||||
) -> None:
|
||||
"""Set the `deactivated` property for the provided user to the provided value.
|
||||
|
||||
Args:
|
||||
user_id: The ID of the user to set the status for.
|
||||
deactivated: The value to set for `deactivated`.
|
||||
"""
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"set_user_deactivated_status",
|
||||
self.set_user_deactivated_status_txn,
|
||||
user_id,
|
||||
deactivated,
|
||||
)
|
||||
|
||||
def set_user_deactivated_status_txn(
|
||||
self, txn: LoggingTransaction, user_id: str, deactivated: bool
|
||||
) -> None:
|
||||
self.db_pool.simple_update_one_txn(
|
||||
txn=txn,
|
||||
table="users",
|
||||
keyvalues={"name": user_id},
|
||||
updatevalues={"deactivated": 1 if deactivated else 0},
|
||||
)
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_user_deactivated_status, (user_id,)
|
||||
)
|
||||
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
|
||||
txn.call_after(self.is_guest.invalidate, (user_id,))
|
||||
|
||||
async def set_user_suspended_status(self, user_id: str, suspended: bool) -> None:
|
||||
"""
|
||||
Set whether the user's account is suspended in the `users` table.
|
||||
|
||||
Args:
|
||||
user_id: The user ID of the user in question
|
||||
suspended: True if the user is suspended, false if not
|
||||
"""
|
||||
await self.db_pool.runInteraction(
|
||||
"set_user_suspended_status",
|
||||
self.set_user_suspended_status_txn,
|
||||
user_id,
|
||||
suspended,
|
||||
)
|
||||
|
||||
def set_user_suspended_status_txn(
|
||||
self, txn: LoggingTransaction, user_id: str, suspended: bool
|
||||
) -> None:
|
||||
self.db_pool.simple_update_one_txn(
|
||||
txn=txn,
|
||||
table="users",
|
||||
keyvalues={"name": user_id},
|
||||
updatevalues={"suspended": suspended},
|
||||
)
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_user_suspended_status, (user_id,)
|
||||
)
|
||||
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
|
||||
|
||||
async def set_user_locked_status(self, user_id: str, locked: bool) -> None:
|
||||
"""Set the `locked` property for the provided user to the provided value.
|
||||
|
||||
Args:
|
||||
user_id: The ID of the user to set the status for.
|
||||
locked: The value to set for `locked`.
|
||||
"""
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"set_user_locked_status",
|
||||
self.set_user_locked_status_txn,
|
||||
user_id,
|
||||
locked,
|
||||
)
|
||||
|
||||
def set_user_locked_status_txn(
|
||||
self, txn: LoggingTransaction, user_id: str, locked: bool
|
||||
) -> None:
|
||||
self.db_pool.simple_update_one_txn(
|
||||
txn=txn,
|
||||
table="users",
|
||||
keyvalues={"name": user_id},
|
||||
updatevalues={"locked": locked},
|
||||
)
|
||||
self._invalidate_cache_and_stream(txn, self.get_user_locked_status, (user_id,))
|
||||
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
|
||||
|
||||
def update_user_approval_status_txn(
|
||||
self, txn: LoggingTransaction, user_id: str, approved: bool
|
||||
) -> None:
|
||||
"""Set the user's 'approved' flag to the given value.
|
||||
|
||||
The boolean is turned into an int because the column is a smallint.
|
||||
|
||||
Args:
|
||||
txn: the current database transaction.
|
||||
user_id: the user to update the flag for.
|
||||
approved: the value to set the flag to.
|
||||
"""
|
||||
self.db_pool.simple_update_one_txn(
|
||||
txn=txn,
|
||||
table="users",
|
||||
keyvalues={"name": user_id},
|
||||
updatevalues={"approved": approved},
|
||||
)
|
||||
|
||||
# Invalidate the caches of methods that read the value of the 'approved' flag.
|
||||
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
|
||||
self._invalidate_cache_and_stream(txn, self.is_user_approved, (user_id,))
|
||||
|
||||
|
||||
class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
|
||||
def __init__(
|
||||
@@ -2941,25 +2960,6 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
|
||||
start_or_continue_validation_session_txn,
|
||||
)
|
||||
|
||||
async def update_user_approval_status(
|
||||
self, user_id: UserID, approved: bool
|
||||
) -> None:
|
||||
"""Set the user's 'approved' flag to the given value.
|
||||
|
||||
The boolean will be turned into an int (in update_user_approval_status_txn)
|
||||
because the column is a smallint.
|
||||
|
||||
Args:
|
||||
user_id: the user to update the flag for.
|
||||
approved: the value to set the flag to.
|
||||
"""
|
||||
await self.db_pool.runInteraction(
|
||||
"update_user_approval_status",
|
||||
self.update_user_approval_status_txn,
|
||||
user_id.to_string(),
|
||||
approved,
|
||||
)
|
||||
|
||||
@wrap_as_background_process("delete_expired_login_tokens")
|
||||
async def _delete_expired_login_tokens(self) -> None:
|
||||
"""Remove login tokens with expiry dates that have passed."""
|
||||
|
||||
Reference in New Issue
Block a user