Compare commits

...

19 Commits

Author SHA1 Message Date
Andrew Morgan
edac7a471f 1.135.0 2025-08-01 13:12:33 +01:00
Andrew Morgan
caf5f0110e Linkify GitHub PR ID in changelog 2025-07-30 12:57:20 +01:00
Andrew Morgan
c823d2e98a 1.135.0rc2 2025-07-30 12:19:34 +01:00
Andrew Morgan
7ae7468159 Improve performance of is_server_admin by adding a cache (#18747)
Fixes https://github.com/element-hq/synapse/issues/18738
2025-07-30 10:43:39 +00:00
Quentin Gliech
61e79a4cdf Fix deactivation running off the main process (#18716)
Best reviewed commit by commit.

With the new dedicated MAS API
(https://github.com/element-hq/synapse/pull/18520), it's possible that
deactivation starts off the main process, which was not possible because
of a few calls.

I basically looked at everything that the deactivation handler was
doing, reviewed whether it could run on workers or not, and find a
workaround when possible

---------

Co-authored-by: Eric Eastwood <erice@element.io>
2025-07-24 08:43:58 +00:00
Olivier 'reivilibre
9ecf192089 Remove stray dev log 2025-07-22 13:54:32 +01:00
Olivier 'reivilibre
6838a1020b Tweak changelog again 2025-07-22 12:47:12 +01:00
Olivier 'reivilibre
a77befcc29 Tweak changelog 2025-07-22 12:16:35 +01:00
Olivier 'reivilibre
cedb8cd045 1.135.0rc1 2025-07-22 12:10:57 +01:00
dependabot[bot]
bb84121553 Bump authlib from 1.6.0 to 1.6.1 (#18704) 2025-07-22 11:57:09 +01:00
dependabot[bot]
7de4fdf61a Bump ruff from 0.12.3 to 0.12.4 (#18705) 2025-07-22 11:57:00 +01:00
dependabot[bot]
8fc9aa70a5 Bump jsonschema from 4.24.0 to 4.25.0 (#18707) 2025-07-22 11:56:43 +01:00
dependabot[bot]
3db73b974f Bump sigstore/cosign-installer from 3.9.1 to 3.9.2 (#18708) 2025-07-22 11:56:32 +01:00
dependabot[bot]
c51bd89c3b Bump serde_json from 1.0.140 to 1.0.141 (#18709) 2025-07-22 11:55:38 +01:00
dependabot[bot]
7de9ac01a0 Bump once_cell from 1.19.0 to 1.21.3 (#18710) 2025-07-22 11:55:28 +01:00
Devon Hudson
4e118aecd0 Reduce log spam when client stops downloading media while it is being streamed to them (ConsumerStopProducingError) (#18699)
The case where a consumer stops downloading media that is currently
being streamed is now able to be handled explicitly.
That scenario isn't really an error, it is expected behaviour.

This PR adds a custom exception which allows us to drop the log level
for this specific case from `WARNING` to `INFO`.


### Pull Request Checklist

<!-- Please read
https://element-hq.github.io/synapse/latest/development/contributing_guide.html
before submitting your pull request -->

* [X] Pull request is based on the develop branch
* [X] Pull request includes a [changelog
file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog).
The entry should:
- Be a short description of your change which makes sense to users.
"Fixed a bug that prevented receiving messages from other servers."
instead of "Moved X method from `EventStore` to `EventWorkerStore`.".
  - Use markdown where necessary, mostly for `code blocks`.
  - End with either a period (.) or an exclamation mark (!).
  - Start with a capital letter.
- Feel free to credit yourself, by adding a sentence "Contributed by
@github_username." or "Contributed by [Your Name]." to the end of the
entry.
* [X] [Code
style](https://element-hq.github.io/synapse/latest/code_style.html) is
correct (run the
[linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))

---------

Co-authored-by: Eric Eastwood <erice@element.io>
2025-07-21 20:11:46 +00:00
Shay
11a11414c5 Add an option to issue redactions as admin user on admin redaction endpoint (#18671)
Currently the [admin redaction
endpoint](https://element-hq.github.io/synapse/latest/admin_api/user_admin_api.html#redact-all-the-events-of-a-user)
defaults to puppeting the user being redacted. This PR adds an optional
param `use_admin`, which when provided issues the redactions as the
admin user instead.
2025-07-21 16:40:45 +00:00
Quentin Gliech
8a4e2e826d Dedicated MAS API (#18520)
This introduces a dedicated API for MAS to consume. Companion PR on the
MAS side: element-hq/matrix-authentication-service#4801

This has a few advantages over the previous admin API:

- it works on workers (this will be documented once we stabilise MSC3861
as a whole)
 - it is more efficient because more focused
 - it propagates trace contexts from MAS
- it is only accessible to MAS (through the shared secret) and will let
us remove the weird hack that made this token 'admin' with a ghost
'@__oidc_admin:' user

The next MAS version should support it, but will be opt-in. The version
after that should use this new API by default

---------

Co-authored-by: Eric Eastwood <erice@element.io>
2025-07-21 16:17:43 +00:00
reivilibre
875269eb53 Add experimental and incomplete support for MSC4306: Thread Subscriptions. (#18674)
Implements:
[MSC4306](https://github.com/matrix-org/matrix-spec-proposals/blob/rei/msc_thread_subscriptions/proposals/4306-thread-subscriptions.md)
(partially)

What's missing:
- Changes to push rules

Signed-off-by: Olivier 'reivilibre <oliverw@matrix.org>
2025-07-21 15:54:28 +01:00
88 changed files with 4999 additions and 170 deletions

View File

@@ -120,7 +120,7 @@ jobs:
uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1
- name: Install Cosign
uses: sigstore/cosign-installer@398d4b0eeef1380460a10c8013a76f728fb906ac # v3.9.1
uses: sigstore/cosign-installer@d58896d6a1865668819e1d91763c7751a165e159 # v3.9.2
- name: Calculate docker image tag
uses: docker/metadata-action@902fa8ec7d6ecbf8d84d538b9b233a880e428804 # v5.7.0

View File

@@ -1,3 +1,87 @@
# Synapse 1.135.0 (2025-08-01)
No significant changes since 1.135.0rc2.
# Synapse 1.135.0rc2 (2025-07-30)
### Bugfixes
- Fix user failing to deactivate with MAS when `/_synapse/mas` is handled by a worker. ([\#18716](https://github.com/element-hq/synapse/issues/18716))
### Internal Changes
- Fix performance regression introduced in [#18238](https://github.com/element-hq/synapse/issues/18238) by adding a cache to `is_server_admin`. ([\#18747](https://github.com/element-hq/synapse/issues/18747))
# Synapse 1.135.0rc1 (2025-07-22)
### Features
- Add `recaptcha_private_key_path` and `recaptcha_public_key_path` config option. ([\#17984](https://github.com/element-hq/synapse/issues/17984), [\#18684](https://github.com/element-hq/synapse/issues/18684))
- Add plain-text handling for rich-text topics as per [MSC3765](https://github.com/matrix-org/matrix-spec-proposals/pull/3765). ([\#18195](https://github.com/element-hq/synapse/issues/18195))
- If enabled by the user, server admins will see [soft failed](https://spec.matrix.org/v1.13/server-server-api/#soft-failure) events over the Client-Server API. ([\#18238](https://github.com/element-hq/synapse/issues/18238))
- Add experimental support for [MSC4277: Harmonizing the reporting endpoints](https://github.com/matrix-org/matrix-spec-proposals/pull/4277). ([\#18263](https://github.com/element-hq/synapse/issues/18263))
- Add ability to limit amount of media uploaded by a user in a given time period. ([\#18527](https://github.com/element-hq/synapse/issues/18527))
- Enable workers to write directly to the device lists stream and handle device list updates, reducing load on the main process. ([\#18581](https://github.com/element-hq/synapse/issues/18581))
- Support arbitrary profile fields. Contributed by @clokep. ([\#18635](https://github.com/element-hq/synapse/issues/18635))
- Advertise support for Matrix v1.12. ([\#18647](https://github.com/element-hq/synapse/issues/18647))
- Add an option to issue redactions as an admin user via the [admin redaction endpoint](https://element-hq.github.io/synapse/latest/admin_api/user_admin_api.html#redact-all-the-events-of-a-user). ([\#18671](https://github.com/element-hq/synapse/issues/18671))
- Add experimental and incomplete support for [MSC4306: Thread Subscriptions](https://github.com/matrix-org/matrix-spec-proposals/blob/rei/msc_thread_subscriptions/proposals/4306-thread-subscriptions.md). ([\#18674](https://github.com/element-hq/synapse/issues/18674))
- Include `event_id` when getting state with `?format=event`. Contributed by @tulir @ Beeper. ([\#18675](https://github.com/element-hq/synapse/issues/18675))
### Bugfixes
- Fix CPU and database spinning when retrying sending events to servers whilst at the same time purging those events. ([\#18499](https://github.com/element-hq/synapse/issues/18499))
- Don't allow creation of tags with names longer than 255 bytes, [as per the spec](https://spec.matrix.org/v1.15/client-server-api/#events-14). ([\#18660](https://github.com/element-hq/synapse/issues/18660))
- Fix `sliding_sync_connections`-related errors when porting from SQLite to Postgres. ([\#18677](https://github.com/element-hq/synapse/issues/18677))
- Fix the MAS integration not working when Synapse is started with `--daemonize` or using `synctl`. ([\#18691](https://github.com/element-hq/synapse/issues/18691))
### Improved Documentation
- Document that some config options for the user directory are in violation of the Matrix spec. ([\#18548](https://github.com/element-hq/synapse/issues/18548))
- Update `rc_delayed_event_mgmt` docs to the actual nesting level. Contributed by @HarHarLinks. ([\#18692](https://github.com/element-hq/synapse/issues/18692))
### Internal Changes
- Add a dedicated internal API for Matrix Authentication Service to Synapse communication. ([\#18520](https://github.com/element-hq/synapse/issues/18520))
- Allow user registrations to be done on workers. ([\#18552](https://github.com/element-hq/synapse/issues/18552))
- Remove unnecessary HTTP replication calls. ([\#18564](https://github.com/element-hq/synapse/issues/18564))
- Refactor `Measure` block metrics to be homeserver-scoped. ([\#18601](https://github.com/element-hq/synapse/issues/18601))
- Refactor cache metrics to be homeserver-scoped. ([\#18604](https://github.com/element-hq/synapse/issues/18604))
- Unbreak "Latest dependencies" workflow by using the `--without dev` poetry option instead of removed `--no-dev`. ([\#18617](https://github.com/element-hq/synapse/issues/18617))
- Update URL Preview code to work with `lxml` 6.0.0+. ([\#18622](https://github.com/element-hq/synapse/issues/18622))
- Use `markdown-it-py` instead of `commonmark` in the release script. ([\#18637](https://github.com/element-hq/synapse/issues/18637))
- Fix typing errors with upgraded mypy version. ([\#18653](https://github.com/element-hq/synapse/issues/18653))
- Add doc comment explaining that config files are shallowly merged. ([\#18664](https://github.com/element-hq/synapse/issues/18664))
- Minor speed up of insertion into `stream_positions` table. ([\#18672](https://github.com/element-hq/synapse/issues/18672))
- Remove unused `allow_no_prev_events` option when creating an event. ([\#18676](https://github.com/element-hq/synapse/issues/18676))
- Clean up `MetricsResource` and Prometheus hacks. ([\#18687](https://github.com/element-hq/synapse/issues/18687))
- Fix dirty `Cargo.lock` changes appearing after install (`base64`). ([\#18689](https://github.com/element-hq/synapse/issues/18689))
- Prevent dirty `Cargo.lock` changes from install. ([\#18693](https://github.com/element-hq/synapse/issues/18693))
- Correct spelling of 'Admin token used' log line. ([\#18697](https://github.com/element-hq/synapse/issues/18697))
- Reduce log spam when client stops downloading media while it is being streamed to them. ([\#18699](https://github.com/element-hq/synapse/issues/18699))
### Updates to locked dependencies
* Bump authlib from 1.6.0 to 1.6.1. ([\#18704](https://github.com/element-hq/synapse/issues/18704))
* Bump base64 from 0.21.7 to 0.22.1. ([\#18666](https://github.com/element-hq/synapse/issues/18666))
* Bump jsonschema from 4.24.0 to 4.25.0. ([\#18707](https://github.com/element-hq/synapse/issues/18707))
* Bump lxml from 5.4.0 to 6.0.0. ([\#18631](https://github.com/element-hq/synapse/issues/18631))
* Bump mypy from 1.13.0 to 1.16.1. ([\#18653](https://github.com/element-hq/synapse/issues/18653))
* Bump once_cell from 1.19.0 to 1.21.3. ([\#18710](https://github.com/element-hq/synapse/issues/18710))
* Bump phonenumbers from 9.0.8 to 9.0.9. ([\#18681](https://github.com/element-hq/synapse/issues/18681))
* Bump ruff from 0.12.2 to 0.12.5. ([\#18683](https://github.com/element-hq/synapse/issues/18683), [\#18705](https://github.com/element-hq/synapse/issues/18705))
* Bump serde_json from 1.0.140 to 1.0.141. ([\#18709](https://github.com/element-hq/synapse/issues/18709))
* Bump sigstore/cosign-installer from 3.9.1 to 3.9.2. ([\#18708](https://github.com/element-hq/synapse/issues/18708))
* Bump types-jsonschema from 4.24.0.20250528 to 4.24.0.20250708. ([\#18682](https://github.com/element-hq/synapse/issues/18682))
# Synapse 1.134.0 (2025-07-15)
No significant changes since 1.134.0rc1.

8
Cargo.lock generated
View File

@@ -887,9 +887,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.19.0"
version = "1.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
[[package]]
name = "openssl-probe"
@@ -1355,9 +1355,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.140"
version = "1.0.141"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373"
checksum = "30b9eff21ebe718216c6ec64e1d9ac57087aad11efc64e32002bce4a0d4c03d3"
dependencies = [
"itoa",
"memchr",

View File

@@ -1 +0,0 @@
Add `recaptcha_private_key_path` and `recaptcha_public_key_path` config option.

View File

@@ -1 +0,0 @@
Add plain-text handling for rich-text topics as per [MSC3765](https://github.com/matrix-org/matrix-spec-proposals/pull/3765).

View File

@@ -1 +0,0 @@
If enabled by the user, server admins will see [soft failed](https://spec.matrix.org/v1.13/server-server-api/#soft-failure) events over the Client-Server API.

View File

@@ -1 +0,0 @@
Add experimental support for [MSC4277](https://github.com/matrix-org/matrix-spec-proposals/pull/4277).

View File

@@ -1 +0,0 @@
Fix CPU and database spinning when retrying sending events to servers whilst at the same time purging those events.

View File

@@ -1 +0,0 @@
Add ability to limit amount uploaded by a user in a given time period.

View File

@@ -1 +0,0 @@
Document that some config options for the user directory are in violation of the Matrix spec.

View File

@@ -1 +0,0 @@
Allow user registrations to be done on workers.

View File

@@ -1 +0,0 @@
Remove unnecessary HTTP replication calls.

View File

@@ -1 +0,0 @@
Enable workers to write directly to the device lists stream and handle device list updates, reducing load on the main process.

View File

@@ -1 +0,0 @@
Refactor `Measure` block metrics to be homeserver-scoped.

View File

@@ -1 +0,0 @@
Refactor cache metrics to be homeserver-scoped.

View File

@@ -1 +0,0 @@
Unbreak "Latest dependencies" workflow by using the `--without dev` poetry option instead of removed `--no-dev`.

View File

@@ -1 +0,0 @@
Update URL Preview code to work with `lxml` 6.0.0+.

View File

@@ -1 +0,0 @@
Support arbitrary profile fields.

View File

@@ -1 +0,0 @@
Use `markdown-it-py` instead of `commonmark` in the release script.

View File

@@ -1 +0,0 @@
Advertise support for Matrix v1.12.

View File

@@ -1 +0,0 @@
Fix typing errors with upgraded mypy version.

View File

@@ -1 +0,0 @@
Don't allow creation of tags with names longer than 255 bytes, as per the spec.

View File

@@ -1 +0,0 @@
Add doc comment explaining that config files are shallowly merged.

View File

@@ -1 +0,0 @@
Minor speed up of insertion into `stream_positions` table.

View File

@@ -1 +0,0 @@
Include `event_id` when getting state with `?format=event`. Contributed by @tulir @ Beeper.

View File

@@ -1 +0,0 @@
Remove unused `allow_no_prev_events` option when creating an event.

View File

@@ -1 +0,0 @@
Fix sliding_sync_connections related errors when porting from SQLite to Postgres.

View File

@@ -1 +0,0 @@
Add `recaptcha_private_key_path` and `recaptcha_public_key_path` config option.

View File

@@ -1 +0,0 @@
Clean up `MetricsResource` and Prometheus hacks.

View File

@@ -1 +0,0 @@
Fix dirty `Cargo.lock` changes appearing after install (`base64`).

View File

@@ -1 +0,0 @@
Fix the MAS integration not working when Synapse is started with `--daemonize` or using `synctl`.

View File

@@ -1 +0,0 @@
Update `rc_delayed_event_mgmt` docs to the actual nesting level. Contributed by @HarHarLinks.

View File

@@ -1 +0,0 @@
Prevent dirty `Cargo.lock` changes from install.

View File

@@ -1 +0,0 @@
Correct spelling of 'Admin token used' log line.

18
debian/changelog vendored
View File

@@ -1,3 +1,21 @@
matrix-synapse-py3 (1.135.0) stable; urgency=medium
* New Synapse release 1.135.0.
-- Synapse Packaging team <packages@matrix.org> Fri, 01 Aug 2025 13:12:28 +0100
matrix-synapse-py3 (1.135.0~rc2) stable; urgency=medium
* New Synapse release 1.135.0rc2.
-- Synapse Packaging team <packages@matrix.org> Wed, 30 Jul 2025 12:19:14 +0100
matrix-synapse-py3 (1.135.0~rc1) stable; urgency=medium
* New Synapse release 1.135.0rc1.
-- Synapse Packaging team <packages@matrix.org> Tue, 22 Jul 2025 12:08:37 +0100
matrix-synapse-py3 (1.134.0) stable; urgency=medium
* New Synapse release 1.134.0.

View File

@@ -178,6 +178,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_matrix/client/(api/v1|r0|v3|unstable)/login$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/account/3pid$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/account/whoami$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/account/deactivate$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/devices(/|$)",
"^/_matrix/client/(r0|v3)/delete_devices$",
"^/_matrix/client/versions$",
@@ -327,6 +328,15 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"thread_subscriptions": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": [
"^/_matrix/client/unstable/io.element.msc4306/.*",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
}
# Templates for sections that may be inserted multiple times in config files
@@ -427,6 +437,7 @@ def add_worker_roles_to_shared_config(
"to_device",
"typing",
"push_rules",
"thread_subscriptions",
}
# Worker-type specific sharding config. Now a single worker can fulfill multiple

View File

@@ -1464,8 +1464,11 @@ _Added in Synapse 1.72.0._
## Redact all the events of a user
This endpoint allows an admin to redact the events of a given user. There are no restrictions on redactions for a
local user. By default, we puppet the user who sent the message to redact it themselves. Redactions for non-local users are issued using the admin user, and will fail in rooms where the admin user is not admin/does not have the specified power level to issue redactions.
This endpoint allows an admin to redact the events of a given user. There are no restrictions on
redactions for a local user. By default, we puppet the user who sent the message to redact it themselves.
Redactions for non-local users are issued using the admin user, and will fail in rooms where the
admin user is not admin/does not have the specified power level to issue redactions. An option
is provided to override the default and allow the admin to issue the redactions in all cases.
The API is
```
@@ -1501,7 +1504,10 @@ The following JSON body parameter must be provided:
The following JSON body parameters are optional:
- `reason` - Reason the redaction is being requested, ie "spam", "abuse", etc. This will be included in each redaction event, and be visible to users.
- `limit` - a limit on the number of the user's events to search for ones that can be redacted (events are redacted newest to oldest) in each room, defaults to 1000 if not provided
- `limit` - a limit on the number of the user's events to search for ones that can be redacted (events are redacted newest to oldest) in each room, defaults to 1000 if not provided.
- `use_admin` - If set to `true`, the admin user is used to issue the redactions, rather than puppeting the user. Useful
when the admin is also the moderator of the rooms that require redactions. Note that the redactions will fail in rooms
where the admin does not have the sufficient power level to issue the redactions.
_Added in Synapse 1.116.0._

View File

@@ -238,6 +238,7 @@ information.
^/_matrix/client/unstable/im.nheko.summary/summary/.*$
^/_matrix/client/(r0|v3|unstable)/account/3pid$
^/_matrix/client/(r0|v3|unstable)/account/whoami$
^/_matrix/client/(r0|v3|unstable)/account/deactivate$
^/_matrix/client/(r0|v3)/delete_devices$
^/_matrix/client/(api/v1|r0|v3|unstable)/devices(/|$)
^/_matrix/client/versions$

100
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
[[package]]
name = "annotated-types"
@@ -34,15 +34,15 @@ tests-mypy = ["mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" a
[[package]]
name = "authlib"
version = "1.6.0"
version = "1.6.1"
description = "The ultimate Python library in building OAuth and OpenID Connect servers and clients."
optional = true
python-versions = ">=3.9"
groups = ["main"]
markers = "extra == \"oidc\" or extra == \"jwt\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"jwt\" or extra == \"oidc\""
files = [
{file = "authlib-1.6.0-py2.py3-none-any.whl", hash = "sha256:91685589498f79e8655e8a8947431ad6288831d643f11c55c2143ffcc738048d"},
{file = "authlib-1.6.0.tar.gz", hash = "sha256:4367d32031b7af175ad3a323d571dc7257b7099d55978087ceae4a0d88cd3210"},
{file = "authlib-1.6.1-py2.py3-none-any.whl", hash = "sha256:e9d2031c34c6309373ab845afc24168fe9e93dc52d252631f52642f21f5ed06e"},
{file = "authlib-1.6.1.tar.gz", hash = "sha256:4dffdbb1460ba6ec8c17981a4c67af7d8af131231b5a36a88a1e8c80c111cdfd"},
]
[package.dependencies]
@@ -435,7 +435,7 @@ description = "XML bomb protection for Python stdlib modules"
optional = true
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
groups = ["main"]
markers = "extra == \"saml2\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"saml2\""
files = [
{file = "defusedxml-0.7.1-py2.py3-none-any.whl", hash = "sha256:a352e7e428770286cc899e2542b6cdaedb2b4953ff269a210103ec58f6198a61"},
{file = "defusedxml-0.7.1.tar.gz", hash = "sha256:1bb3032db185915b62d7c6209c5a8792be6a32ab2fedacc84e01b52c51aa3e69"},
@@ -478,7 +478,7 @@ description = "XPath 1.0/2.0/3.0/3.1 parsers and selectors for ElementTree and l
optional = true
python-versions = ">=3.7"
groups = ["main"]
markers = "extra == \"saml2\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"saml2\""
files = [
{file = "elementpath-4.1.5-py3-none-any.whl", hash = "sha256:2ac1a2fb31eb22bbbf817f8cf6752f844513216263f0e3892c8e79782fe4bb55"},
{file = "elementpath-4.1.5.tar.gz", hash = "sha256:c2d6dc524b29ef751ecfc416b0627668119d8812441c555d7471da41d4bacb8d"},
@@ -528,7 +528,7 @@ description = "Python wrapper for hiredis"
optional = true
python-versions = ">=3.8"
groups = ["main"]
markers = "extra == \"redis\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"redis\""
files = [
{file = "hiredis-3.2.1-cp310-cp310-macosx_10_15_universal2.whl", hash = "sha256:add17efcbae46c5a6a13b244ff0b4a8fa079602ceb62290095c941b42e9d5dec"},
{file = "hiredis-3.2.1-cp310-cp310-macosx_10_15_x86_64.whl", hash = "sha256:5fe955cc4f66c57df1ae8e5caf4de2925d43b5efab4e40859662311d1bcc5f54"},
@@ -865,7 +865,7 @@ description = "Jaeger Python OpenTracing Tracer implementation"
optional = true
python-versions = ">=3.7"
groups = ["main"]
markers = "extra == \"opentracing\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"opentracing\""
files = [
{file = "jaeger-client-4.8.0.tar.gz", hash = "sha256:3157836edab8e2c209bd2d6ae61113db36f7ee399e66b1dcbb715d87ab49bfe0"},
]
@@ -936,14 +936,14 @@ i18n = ["Babel (>=2.7)"]
[[package]]
name = "jsonschema"
version = "4.24.0"
version = "4.25.0"
description = "An implementation of JSON Schema validation for Python"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "jsonschema-4.24.0-py3-none-any.whl", hash = "sha256:a462455f19f5faf404a7902952b6f0e3ce868f3ee09a359b05eca6673bd8412d"},
{file = "jsonschema-4.24.0.tar.gz", hash = "sha256:0b4e8069eb12aedfa881333004bccaec24ecef5a8a6a4b6df142b2cc9599d196"},
{file = "jsonschema-4.25.0-py3-none-any.whl", hash = "sha256:24c2e8da302de79c8b9382fee3e76b355e44d2a4364bb207159ce10b517bd716"},
{file = "jsonschema-4.25.0.tar.gz", hash = "sha256:e63acf5c11762c0e6672ffb61482bdf57f0876684d8d249c0fe2d730d48bc55f"},
]
[package.dependencies]
@@ -954,7 +954,7 @@ rpds-py = ">=0.7.1"
[package.extras]
format = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3987", "uri-template", "webcolors (>=1.11)"]
format-nongpl = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-validator (>0.1.0)", "uri-template", "webcolors (>=24.6.0)"]
format-nongpl = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-validator (>0.1.0)", "rfc3987-syntax (>=1.1.0)", "uri-template", "webcolors (>=24.6.0)"]
[[package]]
name = "jsonschema-specifications"
@@ -1003,7 +1003,7 @@ description = "A strictly RFC 4510 conforming LDAP V3 pure Python client library
optional = true
python-versions = "*"
groups = ["main"]
markers = "extra == \"matrix-synapse-ldap3\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"matrix-synapse-ldap3\""
files = [
{file = "ldap3-2.9.1-py2.py3-none-any.whl", hash = "sha256:5869596fc4948797020d3f03b7939da938778a0f9e2009f7a072ccf92b8e8d70"},
{file = "ldap3-2.9.1.tar.gz", hash = "sha256:f3e7fc4718e3f09dda568b57100095e0ce58633bcabbed8667ce3f8fbaa4229f"},
@@ -1019,7 +1019,7 @@ description = "Powerful and Pythonic XML processing library combining libxml2/li
optional = true
python-versions = ">=3.8"
groups = ["main"]
markers = "extra == \"url-preview\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"url-preview\""
files = [
{file = "lxml-6.0.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:35bc626eec405f745199200ccb5c6b36f202675d204aa29bb52e27ba2b71dea8"},
{file = "lxml-6.0.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:246b40f8a4aec341cbbf52617cad8ab7c888d944bfe12a6abd2b1f6cfb6f6082"},
@@ -1260,7 +1260,7 @@ description = "An LDAP3 auth provider for Synapse"
optional = true
python-versions = ">=3.7"
groups = ["main"]
markers = "extra == \"matrix-synapse-ldap3\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"matrix-synapse-ldap3\""
files = [
{file = "matrix-synapse-ldap3-0.3.0.tar.gz", hash = "sha256:8bb6517173164d4b9cc44f49de411d8cebdb2e705d5dd1ea1f38733c4a009e1d"},
{file = "matrix_synapse_ldap3-0.3.0-py3-none-any.whl", hash = "sha256:8b4d701f8702551e98cc1d8c20dbed532de5613584c08d0df22de376ba99159d"},
@@ -1493,7 +1493,7 @@ description = "OpenTracing API for Python. See documentation at http://opentraci
optional = true
python-versions = "*"
groups = ["main"]
markers = "extra == \"opentracing\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"opentracing\""
files = [
{file = "opentracing-2.4.0.tar.gz", hash = "sha256:a173117e6ef580d55874734d1fa7ecb6f3655160b8b8974a2a1e98e5ec9c840d"},
]
@@ -1699,7 +1699,7 @@ description = "psycopg2 - Python-PostgreSQL Database Adapter"
optional = true
python-versions = ">=3.8"
groups = ["main"]
markers = "extra == \"postgres\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"postgres\""
files = [
{file = "psycopg2-2.9.10-cp310-cp310-win32.whl", hash = "sha256:5df2b672140f95adb453af93a7d669d7a7bf0a56bcd26f1502329166f4a61716"},
{file = "psycopg2-2.9.10-cp310-cp310-win_amd64.whl", hash = "sha256:c6f7b8561225f9e711a9c47087388a97fdc948211c10a4bccbf0ba68ab7b3b5a"},
@@ -1720,7 +1720,7 @@ description = ".. image:: https://travis-ci.org/chtd/psycopg2cffi.svg?branch=mas
optional = true
python-versions = "*"
groups = ["main"]
markers = "platform_python_implementation == \"PyPy\" and (extra == \"postgres\" or extra == \"all\")"
markers = "platform_python_implementation == \"PyPy\" and (extra == \"all\" or extra == \"postgres\")"
files = [
{file = "psycopg2cffi-2.9.0.tar.gz", hash = "sha256:7e272edcd837de3a1d12b62185eb85c45a19feda9e62fa1b120c54f9e8d35c52"},
]
@@ -1736,7 +1736,7 @@ description = "A Simple library to enable psycopg2 compatability"
optional = true
python-versions = "*"
groups = ["main"]
markers = "platform_python_implementation == \"PyPy\" and (extra == \"postgres\" or extra == \"all\")"
markers = "platform_python_implementation == \"PyPy\" and (extra == \"all\" or extra == \"postgres\")"
files = [
{file = "psycopg2cffi-compat-1.1.tar.gz", hash = "sha256:d25e921748475522b33d13420aad5c2831c743227dc1f1f2585e0fdb5c914e05"},
]
@@ -1996,7 +1996,7 @@ description = "A development tool to measure, monitor and analyze the memory beh
optional = true
python-versions = ">=3.6"
groups = ["main"]
markers = "extra == \"cache-memory\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"cache-memory\""
files = [
{file = "Pympler-1.0.1-py3-none-any.whl", hash = "sha256:d260dda9ae781e1eab6ea15bacb84015849833ba5555f141d2d9b7b7473b307d"},
{file = "Pympler-1.0.1.tar.gz", hash = "sha256:993f1a3599ca3f4fcd7160c7545ad06310c9e12f70174ae7ae8d4e25f6c5d3fa"},
@@ -2056,7 +2056,7 @@ description = "Python implementation of SAML Version 2 Standard"
optional = true
python-versions = ">=3.9,<4.0"
groups = ["main"]
markers = "extra == \"saml2\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"saml2\""
files = [
{file = "pysaml2-7.5.0-py3-none-any.whl", hash = "sha256:bc6627cc344476a83c757f440a73fda1369f13b6fda1b4e16bca63ffbabb5318"},
{file = "pysaml2-7.5.0.tar.gz", hash = "sha256:f36871d4e5ee857c6b85532e942550d2cf90ea4ee943d75eb681044bbc4f54f7"},
@@ -2081,7 +2081,7 @@ description = "Extensions to the standard Python datetime module"
optional = true
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7"
groups = ["main"]
markers = "extra == \"saml2\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"saml2\""
files = [
{file = "python-dateutil-2.8.2.tar.gz", hash = "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86"},
{file = "python_dateutil-2.8.2-py2.py3-none-any.whl", hash = "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9"},
@@ -2109,7 +2109,7 @@ description = "World timezone definitions, modern and historical"
optional = true
python-versions = "*"
groups = ["main"]
markers = "extra == \"saml2\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"saml2\""
files = [
{file = "pytz-2022.7.1-py2.py3-none-any.whl", hash = "sha256:78f4f37d8198e0627c5f1143240bb0206b8691d8d7ac6d78fee88b78733f8c4a"},
{file = "pytz-2022.7.1.tar.gz", hash = "sha256:01a0681c4b9684a28304615eba55d1ab31ae00bf68ec157ec3708a8182dbbcd0"},
@@ -2408,30 +2408,30 @@ files = [
[[package]]
name = "ruff"
version = "0.12.3"
version = "0.12.4"
description = "An extremely fast Python linter and code formatter, written in Rust."
optional = false
python-versions = ">=3.7"
groups = ["dev"]
files = [
{file = "ruff-0.12.3-py3-none-linux_armv6l.whl", hash = "sha256:47552138f7206454eaf0c4fe827e546e9ddac62c2a3d2585ca54d29a890137a2"},
{file = "ruff-0.12.3-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:0a9153b000c6fe169bb307f5bd1b691221c4286c133407b8827c406a55282041"},
{file = "ruff-0.12.3-py3-none-macosx_11_0_arm64.whl", hash = "sha256:fa6b24600cf3b750e48ddb6057e901dd5b9aa426e316addb2a1af185a7509882"},
{file = "ruff-0.12.3-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e2506961bf6ead54887ba3562604d69cb430f59b42133d36976421bc8bd45901"},
{file = "ruff-0.12.3-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:c4faaff1f90cea9d3033cbbcdf1acf5d7fb11d8180758feb31337391691f3df0"},
{file = "ruff-0.12.3-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:40dced4a79d7c264389de1c59467d5d5cefd79e7e06d1dfa2c75497b5269a5a6"},
{file = "ruff-0.12.3-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:0262d50ba2767ed0fe212aa7e62112a1dcbfd46b858c5bf7bbd11f326998bafc"},
{file = "ruff-0.12.3-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:12371aec33e1a3758597c5c631bae9a5286f3c963bdfb4d17acdd2d395406687"},
{file = "ruff-0.12.3-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:560f13b6baa49785665276c963edc363f8ad4b4fc910a883e2625bdb14a83a9e"},
{file = "ruff-0.12.3-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:023040a3499f6f974ae9091bcdd0385dd9e9eb4942f231c23c57708147b06311"},
{file = "ruff-0.12.3-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:883d844967bffff5ab28bba1a4d246c1a1b2933f48cb9840f3fdc5111c603b07"},
{file = "ruff-0.12.3-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:2120d3aa855ff385e0e562fdee14d564c9675edbe41625c87eeab744a7830d12"},
{file = "ruff-0.12.3-py3-none-musllinux_1_2_i686.whl", hash = "sha256:6b16647cbb470eaf4750d27dddc6ebf7758b918887b56d39e9c22cce2049082b"},
{file = "ruff-0.12.3-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:e1417051edb436230023575b149e8ff843a324557fe0a265863b7602df86722f"},
{file = "ruff-0.12.3-py3-none-win32.whl", hash = "sha256:dfd45e6e926deb6409d0616078a666ebce93e55e07f0fb0228d4b2608b2c248d"},
{file = "ruff-0.12.3-py3-none-win_amd64.whl", hash = "sha256:a946cf1e7ba3209bdef039eb97647f1c77f6f540e5845ec9c114d3af8df873e7"},
{file = "ruff-0.12.3-py3-none-win_arm64.whl", hash = "sha256:5f9c7c9c8f84c2d7f27e93674d27136fbf489720251544c4da7fb3d742e011b1"},
{file = "ruff-0.12.3.tar.gz", hash = "sha256:f1b5a4b6668fd7b7ea3697d8d98857390b40c1320a63a178eee6be0899ea2d77"},
{file = "ruff-0.12.4-py3-none-linux_armv6l.whl", hash = "sha256:cb0d261dac457ab939aeb247e804125a5d521b21adf27e721895b0d3f83a0d0a"},
{file = "ruff-0.12.4-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:55c0f4ca9769408d9b9bac530c30d3e66490bd2beb2d3dae3e4128a1f05c7442"},
{file = "ruff-0.12.4-py3-none-macosx_11_0_arm64.whl", hash = "sha256:a8224cc3722c9ad9044da7f89c4c1ec452aef2cfe3904365025dd2f51daeae0e"},
{file = "ruff-0.12.4-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e9949d01d64fa3672449a51ddb5d7548b33e130240ad418884ee6efa7a229586"},
{file = "ruff-0.12.4-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:be0593c69df9ad1465e8a2d10e3defd111fdb62dcd5be23ae2c06da77e8fcffb"},
{file = "ruff-0.12.4-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a7dea966bcb55d4ecc4cc3270bccb6f87a337326c9dcd3c07d5b97000dbff41c"},
{file = "ruff-0.12.4-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:afcfa3ab5ab5dd0e1c39bf286d829e042a15e966b3726eea79528e2e24d8371a"},
{file = "ruff-0.12.4-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:c057ce464b1413c926cdb203a0f858cd52f3e73dcb3270a3318d1630f6395bb3"},
{file = "ruff-0.12.4-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:e64b90d1122dc2713330350626b10d60818930819623abbb56535c6466cce045"},
{file = "ruff-0.12.4-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2abc48f3d9667fdc74022380b5c745873499ff827393a636f7a59da1515e7c57"},
{file = "ruff-0.12.4-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:2b2449dc0c138d877d629bea151bee8c0ae3b8e9c43f5fcaafcd0c0d0726b184"},
{file = "ruff-0.12.4-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:56e45bb11f625db55f9b70477062e6a1a04d53628eda7784dce6e0f55fd549eb"},
{file = "ruff-0.12.4-py3-none-musllinux_1_2_i686.whl", hash = "sha256:478fccdb82ca148a98a9ff43658944f7ab5ec41c3c49d77cd99d44da019371a1"},
{file = "ruff-0.12.4-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:0fc426bec2e4e5f4c4f182b9d2ce6a75c85ba9bcdbe5c6f2a74fcb8df437df4b"},
{file = "ruff-0.12.4-py3-none-win32.whl", hash = "sha256:4de27977827893cdfb1211d42d84bc180fceb7b72471104671c59be37041cf93"},
{file = "ruff-0.12.4-py3-none-win_amd64.whl", hash = "sha256:fe0b9e9eb23736b453143d72d2ceca5db323963330d5b7859d60d101147d461a"},
{file = "ruff-0.12.4-py3-none-win_arm64.whl", hash = "sha256:0618ec4442a83ab545e5b71202a5c0ed7791e8471435b94e655b570a5031a98e"},
{file = "ruff-0.12.4.tar.gz", hash = "sha256:13efa16df6c6eeb7d0f091abae50f58e9522f3843edb40d56ad52a5a4a4b6873"},
]
[[package]]
@@ -2474,7 +2474,7 @@ description = "Python client for Sentry (https://sentry.io)"
optional = true
python-versions = ">=3.6"
groups = ["main"]
markers = "extra == \"sentry\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"sentry\""
files = [
{file = "sentry_sdk-2.32.0-py2.py3-none-any.whl", hash = "sha256:6cf51521b099562d7ce3606da928c473643abe99b00ce4cb5626ea735f4ec345"},
{file = "sentry_sdk-2.32.0.tar.gz", hash = "sha256:9016c75d9316b0f6921ac14c8cd4fb938f26002430ac5be9945ab280f78bec6b"},
@@ -2662,7 +2662,7 @@ description = "Tornado IOLoop Backed Concurrent Futures"
optional = true
python-versions = "*"
groups = ["main"]
markers = "extra == \"opentracing\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"opentracing\""
files = [
{file = "threadloop-1.0.2-py2-none-any.whl", hash = "sha256:5c90dbefab6ffbdba26afb4829d2a9df8275d13ac7dc58dccb0e279992679599"},
{file = "threadloop-1.0.2.tar.gz", hash = "sha256:8b180aac31013de13c2ad5c834819771992d350267bddb854613ae77ef571944"},
@@ -2678,7 +2678,7 @@ description = "Python bindings for the Apache Thrift RPC system"
optional = true
python-versions = "*"
groups = ["main"]
markers = "extra == \"opentracing\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"opentracing\""
files = [
{file = "thrift-0.16.0.tar.gz", hash = "sha256:2b5b6488fcded21f9d312aa23c9ff6a0195d0f6ae26ddbd5ad9e3e25dfc14408"},
]
@@ -2740,7 +2740,7 @@ description = "Tornado is a Python web framework and asynchronous networking lib
optional = true
python-versions = ">=3.9"
groups = ["main"]
markers = "extra == \"opentracing\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"opentracing\""
files = [
{file = "tornado-6.5-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:f81067dad2e4443b015368b24e802d0083fecada4f0a4572fdb72fc06e54a9a6"},
{file = "tornado-6.5-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:9ac1cbe1db860b3cbb251e795c701c41d343f06a96049d6274e7c77559117e41"},
@@ -2877,7 +2877,7 @@ description = "non-blocking redis client for python"
optional = true
python-versions = "*"
groups = ["main"]
markers = "extra == \"redis\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"redis\""
files = [
{file = "txredisapi-1.4.11-py3-none-any.whl", hash = "sha256:ac64d7a9342b58edca13ef267d4fa7637c1aa63f8595e066801c1e8b56b22d0b"},
{file = "txredisapi-1.4.11.tar.gz", hash = "sha256:3eb1af99aefdefb59eb877b1dd08861efad60915e30ad5bf3d5bf6c5cedcdbc6"},
@@ -3208,7 +3208,7 @@ description = "An XML Schema validator and decoder"
optional = true
python-versions = ">=3.7"
groups = ["main"]
markers = "extra == \"saml2\" or extra == \"all\""
markers = "extra == \"all\" or extra == \"saml2\""
files = [
{file = "xmlschema-2.4.0-py3-none-any.whl", hash = "sha256:dc87be0caaa61f42649899189aab2fd8e0d567f2cf548433ba7b79278d231a4a"},
{file = "xmlschema-2.4.0.tar.gz", hash = "sha256:d74cd0c10866ac609e1ef94a5a69b018ad16e39077bc6393408b40c6babee793"},
@@ -3352,4 +3352,4 @@ url-preview = ["lxml"]
[metadata]
lock-version = "2.1"
python-versions = "^3.9.0"
content-hash = "a6965a294ca751ec2b5b0b92a050acc9afd4efb3e58550845dd32c60b74a70d1"
content-hash = "b1a0f4708465fd597d0bc7ebb09443ce0e2613cd58a33387a28036249f26856b"

View File

@@ -101,7 +101,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry]
name = "matrix-synapse"
version = "1.134.0"
version = "1.135.0"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "AGPL-3.0-or-later"
@@ -319,7 +319,7 @@ all = [
# failing on new releases. Keeping lower bounds loose here means that dependabot
# can bump versions without having to update the content-hash in the lockfile.
# This helps prevents merge conflicts when running a batch of dependabot updates.
ruff = "0.12.3"
ruff = "0.12.4"
# Type checking only works with the pydantic.v1 compat module from pydantic v2
pydantic = "^2"

View File

@@ -1,5 +1,5 @@
$schema: https://element-hq.github.io/synapse/latest/schema/v1/meta.schema.json
$id: https://element-hq.github.io/synapse/schema/synapse/v1.134/synapse-config.schema.json
$id: https://element-hq.github.io/synapse/schema/synapse/v1.135/synapse-config.schema.json
type: object
properties:
modules:

View File

@@ -48,6 +48,7 @@ if TYPE_CHECKING or HAS_PYDANTIC_V2:
conint,
constr,
parse_obj_as,
root_validator,
validator,
)
from pydantic.v1.error_wrappers import ErrorWrapper
@@ -68,6 +69,7 @@ else:
conint,
constr,
parse_obj_as,
root_validator,
validator,
)
from pydantic.error_wrappers import ErrorWrapper
@@ -92,4 +94,5 @@ __all__ = (
"StrictStr",
"ValidationError",
"validator",
"root_validator",
)

View File

@@ -136,6 +136,7 @@ BOOLEAN_COLUMNS = {
"has_known_state",
"is_encrypted",
],
"thread_subscriptions": ["subscribed", "automatic"],
"users": ["shadow_banned", "approved", "locked", "suspended"],
"un_partial_stated_event_stream": ["rejection_status_changed"],
"users_who_share_rooms": ["share_private"],

View File

@@ -369,6 +369,12 @@ class MSC3861DelegatedAuth(BaseAuth):
async def is_server_admin(self, requester: Requester) -> bool:
return "urn:synapse:admin:*" in requester.scope
def _is_access_token_the_admin_token(self, token: str) -> bool:
admin_token = self._admin_token()
if admin_token is None:
return False
return token == admin_token
async def get_user_by_req(
self,
request: SynapseRequest,
@@ -434,7 +440,7 @@ class MSC3861DelegatedAuth(BaseAuth):
requester = await self.get_user_by_access_token(access_token, allow_expired)
# Do not record requests from MAS using the virtual `__oidc_admin` user.
if access_token != self._admin_token():
if not self._is_access_token_the_admin_token(access_token):
await self._record_request(request, requester)
if not allow_guest and requester.is_guest:
@@ -470,13 +476,25 @@ class MSC3861DelegatedAuth(BaseAuth):
raise UnrecognizedRequestError(code=404)
def is_request_using_the_admin_token(self, request: SynapseRequest) -> bool:
"""
Check if the request is using the admin token.
Args:
request: The request to check.
Returns:
True if the request is using the admin token, False otherwise.
"""
access_token = self.get_access_token_from_request(request)
return self._is_access_token_the_admin_token(access_token)
async def get_user_by_access_token(
self,
token: str,
allow_expired: bool = False,
) -> Requester:
admin_token = self._admin_token()
if admin_token is not None and token == admin_token:
if self._is_access_token_the_admin_token(token):
# XXX: This is a temporary solution so that the admin API can be called by
# the OIDC provider. This will be removed once we have OIDC client
# credentials grant support in matrix-authentication-service.

View File

@@ -104,6 +104,9 @@ from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.databases.main.tags import TagsWorkerStore
from synapse.storage.databases.main.task_scheduler import TaskSchedulerWorkerStore
from synapse.storage.databases.main.thread_subscriptions import (
ThreadSubscriptionsWorkerStore,
)
from synapse.storage.databases.main.transactions import TransactionWorkerStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
@@ -132,6 +135,7 @@ class GenericWorkerStore(
KeyStore,
RoomWorkerStore,
DirectoryWorkerStore,
ThreadSubscriptionsWorkerStore,
PushRulesWorkerStore,
ApplicationServiceTransactionWorkerStore,
ApplicationServiceWorkerStore,

View File

@@ -581,3 +581,7 @@ class ExperimentalConfig(Config):
# MSC4155: Invite filtering
self.msc4155_enabled: bool = experimental.get("msc4155_enabled", False)
# MSC4306: Thread Subscriptions
# (and MSC4308: sliding sync extension for thread subscriptions)
self.msc4306_enabled: bool = experimental.get("msc4306_enabled", False)

View File

@@ -174,6 +174,10 @@ class WriterLocations:
default=[MAIN_PROCESS_INSTANCE_NAME],
converter=_instance_to_list_converter,
)
thread_subscriptions: List[str] = attr.ib(
default=["master"],
converter=_instance_to_list_converter,
)
@attr.s(auto_attribs=True)

View File

@@ -358,6 +358,7 @@ class AdminHandler:
user_id: str,
rooms: list,
requester: JsonMapping,
use_admin: bool,
reason: Optional[str],
limit: Optional[int],
) -> str:
@@ -368,6 +369,7 @@ class AdminHandler:
user_id: the user ID of the user whose events should be redacted
rooms: the rooms in which to redact the user's events
requester: the user requesting the events
use_admin: whether to use the admin account to issue the redactions
reason: reason for requesting the redaction, ie spam, etc
limit: limit on the number of events in each room to redact
@@ -395,6 +397,7 @@ class AdminHandler:
"rooms": rooms,
"requester": requester,
"user_id": user_id,
"use_admin": use_admin,
"reason": reason,
"limit": limit,
},
@@ -426,9 +429,17 @@ class AdminHandler:
user_id = task.params.get("user_id")
assert user_id is not None
# puppet the user if they're ours, otherwise use admin to redact
use_admin = task.params.get("use_admin", False)
# default to puppeting the user unless they are not local or it's been requested to
# use the admin user to issue the redactions
requester_id = (
admin.user.to_string()
if use_admin or not self.hs.is_mine_id(user_id)
else user_id
)
requester = create_requester(
user_id if self.hs.is_mine_id(user_id) else admin.user.to_string(),
requester_id,
authenticated_entity=admin.user.to_string(),
)

View File

@@ -220,6 +220,7 @@ class AuthHandler:
self._password_localdb_enabled = hs.config.auth.password_localdb_enabled
self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
self._account_validity_handler = hs.get_account_validity_handler()
self._pusher_pool = hs.get_pusherpool()
# Ratelimiter for failed auth during UIA. Uses same ratelimit config
# as per `rc_login.failed_attempts`.
@@ -1652,7 +1653,7 @@ class AuthHandler:
)
if medium == "email":
await self.store.delete_pusher_by_app_id_pushkey_user_id(
await self._pusher_pool.remove_pusher(
app_id="m.email", pushkey=address, user_id=user_id
)

View File

@@ -25,6 +25,9 @@ from typing import TYPE_CHECKING, Optional
from synapse.api.constants import Membership
from synapse.api.errors import SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.deactivate_account import (
ReplicationNotifyAccountDeactivatedServlet,
)
from synapse.types import Codes, Requester, UserID, create_requester
if TYPE_CHECKING:
@@ -44,6 +47,7 @@ class DeactivateAccountHandler:
self._room_member_handler = hs.get_room_member_handler()
self._identity_handler = hs.get_identity_handler()
self._profile_handler = hs.get_profile_handler()
self._pusher_pool = hs.get_pusherpool()
self.user_directory_handler = hs.get_user_directory_handler()
self._server_name = hs.hostname
self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
@@ -52,10 +56,16 @@ class DeactivateAccountHandler:
self._user_parter_running = False
self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
self._notify_account_deactivated_client = None
# Start the user parter loop so it can resume parting users from rooms where
# it left off (if it has work left to do).
if hs.config.worker.run_background_tasks:
if hs.config.worker.worker_app is None:
hs.get_reactor().callWhenRunning(self._start_user_parting)
else:
self._notify_account_deactivated_client = (
ReplicationNotifyAccountDeactivatedServlet.make_client(hs)
)
self._account_validity_enabled = (
hs.config.account_validity.account_validity_enabled
@@ -145,7 +155,7 @@ class DeactivateAccountHandler:
# Most of the pushers will have been deleted when we logged out the
# associated devices above, but we still need to delete pushers not
# associated with devices, e.g. email pushers.
await self.store.delete_all_pushers_for_user(user_id)
await self._pusher_pool.delete_all_pushers_for_user(user_id)
# Add the user to a table of users pending deactivation (ie.
# removal from all the rooms they're a member of)
@@ -169,10 +179,6 @@ class DeactivateAccountHandler:
logger.info("Marking %s as erased", user_id)
await self.store.mark_user_erased(user_id)
# Now start the process that goes through that list and
# parts users from rooms (if it isn't already running)
self._start_user_parting()
# Reject all pending invites and knocks for the user, so that the
# user doesn't show up in the "invited" section of rooms' members list.
await self._reject_pending_invites_and_knocks_for_user(user_id)
@@ -187,18 +193,43 @@ class DeactivateAccountHandler:
# Remove account data (including ignored users and push rules).
await self.store.purge_account_data_for_user(user_id)
# Remove thread subscriptions for the user
await self.store.purge_thread_subscription_settings_for_user(user_id)
# Delete any server-side backup keys
await self.store.bulk_delete_backup_keys_and_versions_for_user(user_id)
# Notify modules and start the room parting process.
await self.notify_account_deactivated(user_id, by_admin=by_admin)
return identity_server_supports_unbinding
async def notify_account_deactivated(
self,
user_id: str,
by_admin: bool = False,
) -> None:
"""Notify modules and start the room parting process.
Goes through replication if this is not the main process.
"""
if self._notify_account_deactivated_client is not None:
await self._notify_account_deactivated_client(
user_id=user_id,
by_admin=by_admin,
)
return
# Now start the process that goes through that list and
# parts users from rooms (if it isn't already running)
self._start_user_parting()
# Let modules know the user has been deactivated.
await self._third_party_rules.on_user_deactivation_status_changed(
user_id,
True,
by_admin,
by_admin=by_admin,
)
return identity_server_supports_unbinding
async def _reject_pending_invites_and_knocks_for_user(self, user_id: str) -> None:
"""Reject pending invites and knocks addressed to a given user ID.

View File

@@ -0,0 +1,126 @@
import logging
from typing import TYPE_CHECKING, Optional
from synapse.api.errors import AuthError, NotFoundError
from synapse.storage.databases.main.thread_subscriptions import ThreadSubscription
from synapse.types import UserID
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class ThreadSubscriptionsHandler:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self.event_handler = hs.get_event_handler()
self.auth = hs.get_auth()
async def get_thread_subscription_settings(
self,
user_id: UserID,
room_id: str,
thread_root_event_id: str,
) -> Optional[ThreadSubscription]:
"""Get thread subscription settings for a specific thread and user.
Checks that the thread root is both a real event and also that it is visible
to the user.
Args:
user_id: The ID of the user
thread_root_event_id: The event ID of the thread root
Returns:
A `ThreadSubscription` containing the active subscription settings or None if not set
"""
# First check that the user can access the thread root event
# and that it exists
try:
event = await self.event_handler.get_event(
user_id, room_id, thread_root_event_id
)
if event is None:
raise NotFoundError("No such thread root")
except AuthError:
raise NotFoundError("No such thread root")
return await self.store.get_subscription_for_thread(
user_id.to_string(), event.room_id, thread_root_event_id
)
async def subscribe_user_to_thread(
self,
user_id: UserID,
room_id: str,
thread_root_event_id: str,
*,
automatic: bool,
) -> Optional[int]:
"""Sets or updates a user's subscription settings for a specific thread root.
Args:
requester_user_id: The ID of the user whose settings are being updated.
thread_root_event_id: The event ID of the thread root.
automatic: whether the user was subscribed by an automatic decision by
their client.
Returns:
The stream ID for this update, if the update isn't no-opped.
Raises:
NotFoundError if the user cannot access the thread root event, or it isn't
known to this homeserver.
"""
# First check that the user can access the thread root event
# and that it exists
try:
event = await self.event_handler.get_event(
user_id, room_id, thread_root_event_id
)
if event is None:
raise NotFoundError("No such thread root")
except AuthError:
logger.info("rejecting thread subscriptions change (thread not accessible)")
raise NotFoundError("No such thread root")
return await self.store.subscribe_user_to_thread(
user_id.to_string(),
event.room_id,
thread_root_event_id,
automatic=automatic,
)
async def unsubscribe_user_from_thread(
self, user_id: UserID, room_id: str, thread_root_event_id: str
) -> Optional[int]:
"""Clears a user's subscription settings for a specific thread root.
Args:
requester_user_id: The ID of the user whose settings are being updated.
thread_root_event_id: The event ID of the thread root.
Returns:
The stream ID for this update, if the update isn't no-opped.
Raises:
NotFoundError if the user cannot access the thread root event, or it isn't
known to this homeserver.
"""
# First check that the user can access the thread root event
# and that it exists
try:
event = await self.event_handler.get_event(
user_id, room_id, thread_root_event_id
)
if event is None:
raise NotFoundError("No such thread root")
except AuthError:
logger.info("rejecting thread subscriptions change (thread not accessible)")
raise NotFoundError("No such thread root")
return await self.store.unsubscribe_user_from_thread(
user_id.to_string(),
event.room_id,
thread_root_event_id,
)

View File

@@ -380,12 +380,13 @@ async def respond_with_multipart_responder(
try:
await responder.write_to_consumer(multipart_consumer)
except ConsumerRequestedStopError as e:
logger.debug("Failed to write to consumer: %s %s", type(e), e)
# Unregister the producer, if it has one, so Twisted doesn't complain
if request.producer:
request.unregisterProducer()
except Exception as e:
# The majority of the time this will be due to the client having gone
# away. Unfortunately, Twisted simply throws a generic exception at us
# in that case.
logger.warning("Failed to write to consumer: %s %s", type(e), e)
# Unregister the producer, if it has one, so Twisted doesn't complain
if request.producer:
request.unregisterProducer()
@@ -426,12 +427,13 @@ async def respond_with_responder(
add_file_headers(request, media_type, file_size, upload_name)
try:
await responder.write_to_consumer(request)
except ConsumerRequestedStopError as e:
logger.debug("Failed to write to consumer: %s %s", type(e), e)
# Unregister the producer, if it has one, so Twisted doesn't complain
if request.producer:
request.unregisterProducer()
except Exception as e:
# The majority of the time this will be due to the client having gone
# away. Unfortunately, Twisted simply throws a generic exception at us
# in that case.
logger.warning("Failed to write to consumer: %s %s", type(e), e)
# Unregister the producer, if it has one, so Twisted doesn't complain
if request.producer:
request.unregisterProducer()
@@ -674,6 +676,10 @@ def _parseparam(s: bytes) -> Generator[bytes, None, None]:
s = s[end:]
class ConsumerRequestedStopError(Exception):
"""A consumer asked us to stop producing"""
@implementer(interfaces.IPushProducer)
class ThreadedFileSender:
"""
@@ -751,7 +757,9 @@ class ThreadedFileSender:
self.wakeup_event.set()
if not self.deferred.called:
self.deferred.errback(Exception("Consumer asked us to stop producing"))
self.deferred.errback(
ConsumerRequestedStopError("Consumer asked us to stop producing")
)
async def start_read_loop(self) -> None:
"""This is the loop that drives reading/writing"""

View File

@@ -31,7 +31,10 @@ from synapse.metrics.background_process_metrics import (
)
from synapse.push import Pusher, PusherConfig, PusherConfigException
from synapse.push.pusher import PusherFactory
from synapse.replication.http.push import ReplicationRemovePusherRestServlet
from synapse.replication.http.push import (
ReplicationDeleteAllPushersForUserRestServlet,
ReplicationRemovePusherRestServlet,
)
from synapse.types import JsonDict, RoomStreamToken, StrCollection
from synapse.util.async_helpers import concurrently_execute
from synapse.util.threepids import canonicalise_email
@@ -78,10 +81,14 @@ class PusherPool:
# We can only delete pushers on master.
self._remove_pusher_client = None
self._delete_all_pushers_for_user_client = None
if hs.config.worker.worker_app:
self._remove_pusher_client = ReplicationRemovePusherRestServlet.make_client(
hs
)
self._delete_all_pushers_for_user_client = (
ReplicationDeleteAllPushersForUserRestServlet.make_client(hs)
)
# Record the last stream ID that we were poked about so we can get
# changes since then. We set this to the current max stream ID on
@@ -454,6 +461,13 @@ class PusherPool:
app_id, pushkey, user_id
)
async def delete_all_pushers_for_user(self, user_id: str) -> None:
"""Deletes all pushers for a user."""
if self._delete_all_pushers_for_user_client is not None:
await self._delete_all_pushers_for_user_client(user_id=user_id)
else:
await self.store.delete_all_pushers_for_user(user_id=user_id)
def maybe_stop_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:
"""Stops a pusher with the given app ID and push key if one is running.

View File

@@ -23,6 +23,7 @@ from typing import TYPE_CHECKING
from synapse.http.server import JsonResource
from synapse.replication.http import (
account_data,
deactivate_account,
delayed_events,
devices,
federation,
@@ -66,3 +67,4 @@ class ReplicationRestResource(JsonResource):
login.register_servlets(hs, self)
register.register_servlets(hs, self)
delayed_events.register_servlets(hs, self)
deactivate_account.register_servlets(hs, self)

View File

@@ -0,0 +1,81 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#
import logging
from typing import TYPE_CHECKING, Tuple
from twisted.web.server import Request
from synapse.http.server import HttpServer
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import JsonDict
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class ReplicationNotifyAccountDeactivatedServlet(ReplicationEndpoint):
"""Notify that an account has been deactivated.
Request format:
POST /_synapse/replication/notify_account_deactivated/:user_id
{
"by_admin": true,
}
"""
NAME = "notify_account_deactivated"
PATH_ARGS = ("user_id",)
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.deactivate_account_handler = hs.get_deactivate_account_handler()
@staticmethod
async def _serialize_payload( # type: ignore[override]
user_id: str,
by_admin: bool,
) -> JsonDict:
"""
Args:
user_id: The user ID which has been deactivated.
by_admin: Whether the user was deactivated by an admin.
"""
return {
"by_admin": by_admin,
}
async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict, user_id: str
) -> Tuple[int, JsonDict]:
by_admin = content["by_admin"]
await self.deactivate_account_handler.notify_account_deactivated(
user_id, by_admin=by_admin
)
return 200, {}
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationNotifyAccountDeactivatedServlet(hs).register(http_server)

View File

@@ -118,6 +118,39 @@ class ReplicationCopyPusherRestServlet(ReplicationEndpoint):
return 200, {}
class ReplicationDeleteAllPushersForUserRestServlet(ReplicationEndpoint):
"""Deletes all pushers for a user.
Request format:
POST /_synapse/replication/delete_all_pushers_for_user/:user_id
{}
"""
NAME = "delete_all_pushers_for_user"
PATH_ARGS = ("user_id",)
CACHE = False
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self._store = hs.get_datastores().main
@staticmethod
async def _serialize_payload(user_id: str) -> JsonDict: # type: ignore[override]
return {}
async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict, user_id: str
) -> Tuple[int, JsonDict]:
await self._store.delete_all_pushers_for_user(user_id)
return 200, {}
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationRemovePusherRestServlet(hs).register(http_server)
ReplicationCopyPusherRestServlet(hs).register(http_server)
ReplicationDeleteAllPushersForUserRestServlet(hs).register(http_server)

View File

@@ -72,7 +72,10 @@ from synapse.replication.tcp.streams import (
ToDeviceStream,
TypingStream,
)
from synapse.replication.tcp.streams._base import DeviceListsStream
from synapse.replication.tcp.streams._base import (
DeviceListsStream,
ThreadSubscriptionsStream,
)
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -186,6 +189,15 @@ class ReplicationCommandHandler:
continue
if isinstance(stream, ThreadSubscriptionsStream):
if (
hs.get_instance_name()
in hs.config.worker.writers.thread_subscriptions
):
self._streams_to_replicate.append(stream)
continue
if isinstance(stream, DeviceListsStream):
if hs.get_instance_name() in hs.config.worker.writers.device_lists:
self._streams_to_replicate.append(stream)

View File

@@ -41,6 +41,7 @@ from synapse.replication.tcp.streams._base import (
PushRulesStream,
ReceiptsStream,
Stream,
ThreadSubscriptionsStream,
ToDeviceStream,
TypingStream,
)
@@ -67,6 +68,7 @@ STREAMS_MAP = {
ToDeviceStream,
FederationStream,
AccountDataStream,
ThreadSubscriptionsStream,
UnPartialStatedRoomStream,
UnPartialStatedEventStream,
)
@@ -86,6 +88,7 @@ __all__ = [
"DeviceListsStream",
"ToDeviceStream",
"AccountDataStream",
"ThreadSubscriptionsStream",
"UnPartialStatedRoomStream",
"UnPartialStatedEventStream",
]

View File

@@ -723,3 +723,46 @@ class AccountDataStream(_StreamFromIdGen):
heapq.merge(room_rows, global_rows, tag_rows, key=lambda row: row[0])
)
return updates, to_token, limited
class ThreadSubscriptionsStream(_StreamFromIdGen):
"""A thread subscription was changed."""
@attr.s(slots=True, auto_attribs=True)
class ThreadSubscriptionsStreamRow:
"""Stream to inform workers about changes to thread subscriptions."""
user_id: str
room_id: str
event_id: str # The event ID of the thread root
NAME = "thread_subscriptions"
ROW_TYPE = ThreadSubscriptionsStreamRow
def __init__(self, hs: Any):
self.store = hs.get_datastores().main
super().__init__(
hs.get_instance_name(),
self._update_function,
self.store._thread_subscriptions_id_gen,
)
async def _update_function(
self, instance_name: str, from_token: int, to_token: int, limit: int
) -> StreamUpdateResult:
updates = await self.store.get_updated_thread_subscriptions(
from_token, to_token, limit
)
rows = [
(
stream_id,
# These are the args to `ThreadSubscriptionsStreamRow`
(user_id, room_id, event_id),
)
for stream_id, user_id, room_id, event_id in updates
]
if not rows:
return [], to_token, False
return rows, rows[-1][0], len(updates) == limit

View File

@@ -1414,7 +1414,7 @@ class RedactUser(RestServlet):
"""
Redact all the events of a given user in the given rooms or if empty dict is provided
then all events in all rooms user is member of. Kicks off a background process and
returns an id that can be used to check on the progress of the redaction progress
returns an id that can be used to check on the progress of the redaction progress.
"""
PATTERNS = admin_patterns("/user/(?P<user_id>[^/]*)/redact")
@@ -1428,6 +1428,7 @@ class RedactUser(RestServlet):
rooms: List[StrictStr]
reason: Optional[StrictStr]
limit: Optional[StrictInt]
use_admin: Optional[StrictBool]
async def on_POST(
self, request: SynapseRequest, user_id: str
@@ -1455,8 +1456,12 @@ class RedactUser(RestServlet):
)
rooms = current_rooms + banned_rooms
use_admin = body.use_admin
if not use_admin:
use_admin = False
redact_id = await self.admin_handler.start_redact_events(
user_id, rooms, requester.serialize(), body.reason, limit
user_id, rooms, requester.serialize(), use_admin, body.reason, limit
)
return HTTPStatus.OK, {"redact_id": redact_id}

View File

@@ -896,23 +896,26 @@ class AccountStatusRestServlet(RestServlet):
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ThreepidRestServlet(hs).register(http_server)
WhoamiRestServlet(hs).register(http_server)
if not hs.config.experimental.msc3861.enabled:
DeactivateAccountRestServlet(hs).register(http_server)
# These servlets are only registered on the main process
if hs.config.worker.worker_app is None:
ThreepidBindRestServlet(hs).register(http_server)
ThreepidUnbindRestServlet(hs).register(http_server)
if not hs.config.experimental.msc3861.enabled:
EmailPasswordRequestTokenRestServlet(hs).register(http_server)
DeactivateAccountRestServlet(hs).register(http_server)
PasswordRestServlet(hs).register(http_server)
EmailThreepidRequestTokenRestServlet(hs).register(http_server)
MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
AddThreepidEmailSubmitTokenServlet(hs).register(http_server)
AddThreepidMsisdnSubmitTokenServlet(hs).register(http_server)
ThreepidRestServlet(hs).register(http_server)
if hs.config.worker.worker_app is None:
ThreepidBindRestServlet(hs).register(http_server)
ThreepidUnbindRestServlet(hs).register(http_server)
if not hs.config.experimental.msc3861.enabled:
ThreepidAddRestServlet(hs).register(http_server)
ThreepidDeleteRestServlet(hs).register(http_server)
WhoamiRestServlet(hs).register(http_server)
if hs.config.worker.worker_app is None and hs.config.experimental.msc3720_enabled:
if hs.config.experimental.msc3720_enabled:
AccountStatusRestServlet(hs).register(http_server)

View File

@@ -0,0 +1,98 @@
from http import HTTPStatus
from typing import Tuple
from synapse._pydantic_compat import StrictBool
from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.http.server import HttpServer
from synapse.http.servlet import (
RestServlet,
parse_and_validate_json_object_from_request,
)
from synapse.http.site import SynapseRequest
from synapse.rest.client._base import client_patterns
from synapse.server import HomeServer
from synapse.types import JsonDict, RoomID
from synapse.types.rest import RequestBodyModel
class ThreadSubscriptionsRestServlet(RestServlet):
PATTERNS = client_patterns(
"/io.element.msc4306/rooms/(?P<room_id>[^/]*)/thread/(?P<thread_root_id>[^/]*)/subscription$",
unstable=True,
releases=(),
)
CATEGORY = "Thread Subscriptions requests (unstable)"
def __init__(self, hs: "HomeServer"):
self.auth = hs.get_auth()
self.is_mine = hs.is_mine
self.store = hs.get_datastores().main
self.handler = hs.get_thread_subscriptions_handler()
class PutBody(RequestBodyModel):
automatic: StrictBool
async def on_GET(
self, request: SynapseRequest, room_id: str, thread_root_id: str
) -> Tuple[int, JsonDict]:
RoomID.from_string(room_id)
if not thread_root_id.startswith("$"):
raise SynapseError(
HTTPStatus.BAD_REQUEST, "Invalid event ID", errcode=Codes.INVALID_PARAM
)
requester = await self.auth.get_user_by_req(request)
subscription = await self.handler.get_thread_subscription_settings(
requester.user,
room_id,
thread_root_id,
)
if subscription is None:
raise NotFoundError("Not subscribed.")
return HTTPStatus.OK, {"automatic": subscription.automatic}
async def on_PUT(
self, request: SynapseRequest, room_id: str, thread_root_id: str
) -> Tuple[int, JsonDict]:
RoomID.from_string(room_id)
if not thread_root_id.startswith("$"):
raise SynapseError(
HTTPStatus.BAD_REQUEST, "Invalid event ID", errcode=Codes.INVALID_PARAM
)
requester = await self.auth.get_user_by_req(request)
body = parse_and_validate_json_object_from_request(request, self.PutBody)
await self.handler.subscribe_user_to_thread(
requester.user,
room_id,
thread_root_id,
automatic=body.automatic,
)
return HTTPStatus.OK, {}
async def on_DELETE(
self, request: SynapseRequest, room_id: str, thread_root_id: str
) -> Tuple[int, JsonDict]:
RoomID.from_string(room_id)
if not thread_root_id.startswith("$"):
raise SynapseError(
HTTPStatus.BAD_REQUEST, "Invalid event ID", errcode=Codes.INVALID_PARAM
)
requester = await self.auth.get_user_by_req(request)
await self.handler.unsubscribe_user_from_thread(
requester.user,
room_id,
thread_root_id,
)
return HTTPStatus.OK, {}
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
if hs.config.experimental.msc4306_enabled:
ThreadSubscriptionsRestServlet(hs).register(http_server)

View File

@@ -30,6 +30,7 @@ from synapse.rest.synapse.client.pick_username import pick_username_resource
from synapse.rest.synapse.client.rendezvous import MSC4108RendezvousSessionResource
from synapse.rest.synapse.client.sso_register import SsoRegisterResource
from synapse.rest.synapse.client.unsubscribe import UnsubscribeResource
from synapse.rest.synapse.mas import MasResource
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -60,6 +61,7 @@ def build_synapse_client_resource_tree(hs: "HomeServer") -> Mapping[str, Resourc
from synapse.rest.synapse.client.jwks import JwksResource
resources["/_synapse/jwks"] = JwksResource(hs)
resources["/_synapse/mas"] = MasResource(hs)
# provider-specific SSO bits. Only load these if they are enabled, since they
# rely on optional dependencies.

View File

@@ -0,0 +1,71 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl_3.0.html>.
#
#
import logging
from typing import TYPE_CHECKING
from twisted.web.resource import Resource
from synapse.rest.synapse.mas.devices import (
MasDeleteDeviceResource,
MasSyncDevicesResource,
MasUpdateDeviceDisplayNameResource,
MasUpsertDeviceResource,
)
from synapse.rest.synapse.mas.users import (
MasAllowCrossSigningResetResource,
MasDeleteUserResource,
MasIsLocalpartAvailableResource,
MasProvisionUserResource,
MasQueryUserResource,
MasReactivateUserResource,
MasSetDisplayNameResource,
MasUnsetDisplayNameResource,
)
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class MasResource(Resource):
"""
Provides endpoints for MAS to manage user accounts and devices.
All endpoints are mounted under the path `/_synapse/mas/` and only work
using the MAS admin token.
"""
def __init__(self, hs: "HomeServer"):
Resource.__init__(self)
self.putChild(b"query_user", MasQueryUserResource(hs))
self.putChild(b"provision_user", MasProvisionUserResource(hs))
self.putChild(b"is_localpart_available", MasIsLocalpartAvailableResource(hs))
self.putChild(b"delete_user", MasDeleteUserResource(hs))
self.putChild(b"upsert_device", MasUpsertDeviceResource(hs))
self.putChild(b"delete_device", MasDeleteDeviceResource(hs))
self.putChild(
b"update_device_display_name", MasUpdateDeviceDisplayNameResource(hs)
)
self.putChild(b"sync_devices", MasSyncDevicesResource(hs))
self.putChild(b"reactivate_user", MasReactivateUserResource(hs))
self.putChild(b"set_displayname", MasSetDisplayNameResource(hs))
self.putChild(b"unset_displayname", MasUnsetDisplayNameResource(hs))
self.putChild(
b"allow_cross_signing_reset", MasAllowCrossSigningResetResource(hs)
)

View File

@@ -0,0 +1,47 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl_3.0.html>.
#
#
from typing import TYPE_CHECKING, cast
from synapse.api.errors import SynapseError
from synapse.http.server import DirectServeJsonResource
if TYPE_CHECKING:
from synapse.app.generic_worker import GenericWorkerStore
from synapse.http.site import SynapseRequest
from synapse.server import HomeServer
class MasBaseResource(DirectServeJsonResource):
def __init__(self, hs: "HomeServer"):
# Importing this module requires authlib, which is an optional
# dependency but required if msc3861 is enabled
from synapse.api.auth.msc3861_delegated import MSC3861DelegatedAuth
DirectServeJsonResource.__init__(self, extract_context=True)
auth = hs.get_auth()
assert isinstance(auth, MSC3861DelegatedAuth)
self.msc3861_auth = auth
self.store = cast("GenericWorkerStore", hs.get_datastores().main)
self.hostname = hs.hostname
def assert_request_is_from_mas(self, request: "SynapseRequest") -> None:
"""Assert that the request is coming from MAS itself, not a regular user.
Throws a 403 if the request is not coming from MAS.
"""
if not self.msc3861_auth.is_request_using_the_admin_token(request):
raise SynapseError(403, "This endpoint must only be called by MAS")

View File

@@ -0,0 +1,238 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl_3.0.html>.
#
#
import logging
from http import HTTPStatus
from typing import TYPE_CHECKING, Optional, Tuple
from synapse._pydantic_compat import StrictStr
from synapse.api.errors import NotFoundError
from synapse.http.servlet import parse_and_validate_json_object_from_request
from synapse.types import JsonDict, UserID
from synapse.types.rest import RequestBodyModel
if TYPE_CHECKING:
from synapse.http.site import SynapseRequest
from synapse.server import HomeServer
from ._base import MasBaseResource
logger = logging.getLogger(__name__)
class MasUpsertDeviceResource(MasBaseResource):
"""
Endpoint for MAS to create or update user devices.
Takes a localpart, device ID, and optional display name to create new devices
or update existing ones.
POST /_synapse/mas/upsert_device
{"localpart": "alice", "device_id": "DEVICE123", "display_name": "Alice's Phone"}
"""
def __init__(self, hs: "HomeServer"):
MasBaseResource.__init__(self, hs)
self.device_handler = hs.get_device_handler()
class PostBody(RequestBodyModel):
localpart: StrictStr
device_id: StrictStr
display_name: Optional[StrictStr]
async def _async_render_POST(
self, request: "SynapseRequest"
) -> Tuple[int, JsonDict]:
self.assert_request_is_from_mas(request)
body = parse_and_validate_json_object_from_request(request, self.PostBody)
user_id = UserID(body.localpart, self.hostname)
# Check the user exists
user = await self.store.get_user_by_id(user_id=str(user_id))
if user is None:
raise NotFoundError("User not found")
inserted = await self.device_handler.upsert_device(
user_id=str(user_id),
device_id=body.device_id,
display_name=body.display_name,
)
return HTTPStatus.CREATED if inserted else HTTPStatus.OK, {}
class MasDeleteDeviceResource(MasBaseResource):
"""
Endpoint for MAS to delete user devices.
Takes a localpart and device ID to remove the specified device from the user's account.
POST /_synapse/mas/delete_device
{"localpart": "alice", "device_id": "DEVICE123"}
"""
def __init__(self, hs: "HomeServer"):
MasBaseResource.__init__(self, hs)
self.device_handler = hs.get_device_handler()
class PostBody(RequestBodyModel):
localpart: StrictStr
device_id: StrictStr
async def _async_render_POST(
self, request: "SynapseRequest"
) -> Tuple[int, JsonDict]:
self.assert_request_is_from_mas(request)
body = parse_and_validate_json_object_from_request(request, self.PostBody)
user_id = UserID(body.localpart, self.hostname)
# Check the user exists
user = await self.store.get_user_by_id(user_id=str(user_id))
if user is None:
raise NotFoundError("User not found")
await self.device_handler.delete_devices(
user_id=str(user_id),
device_ids=[body.device_id],
)
return HTTPStatus.NO_CONTENT, {}
class MasUpdateDeviceDisplayNameResource(MasBaseResource):
"""
Endpoint for MAS to update a device's display name.
Takes a localpart, device ID, and new display name to update the device's name.
POST /_synapse/mas/update_device_display_name
{"localpart": "alice", "device_id": "DEVICE123", "display_name": "Alice's New Phone"}
"""
def __init__(self, hs: "HomeServer"):
MasBaseResource.__init__(self, hs)
self.device_handler = hs.get_device_handler()
class PostBody(RequestBodyModel):
localpart: StrictStr
device_id: StrictStr
display_name: StrictStr
async def _async_render_POST(
self, request: "SynapseRequest"
) -> Tuple[int, JsonDict]:
self.assert_request_is_from_mas(request)
body = parse_and_validate_json_object_from_request(request, self.PostBody)
user_id = UserID(body.localpart, self.hostname)
# Check the user exists
user = await self.store.get_user_by_id(user_id=str(user_id))
if user is None:
raise NotFoundError("User not found")
await self.device_handler.update_device(
user_id=str(user_id),
device_id=body.device_id,
content={"display_name": body.display_name},
)
return HTTPStatus.OK, {}
class MasSyncDevicesResource(MasBaseResource):
"""
Endpoint for MAS to synchronize a user's complete device list.
Takes a localpart and a set of device IDs to ensure the user's device list
matches the provided set by adding missing devices and removing extra ones.
POST /_synapse/mas/sync_devices
{"localpart": "alice", "devices": ["DEVICE123", "DEVICE456"]}
"""
def __init__(self, hs: "HomeServer"):
MasBaseResource.__init__(self, hs)
self.device_handler = hs.get_device_handler()
class PostBody(RequestBodyModel):
localpart: StrictStr
devices: set[StrictStr]
async def _async_render_POST(
self, request: "SynapseRequest"
) -> Tuple[int, JsonDict]:
self.assert_request_is_from_mas(request)
body = parse_and_validate_json_object_from_request(request, self.PostBody)
user_id = UserID(body.localpart, self.hostname)
# Check the user exists
user = await self.store.get_user_by_id(user_id=str(user_id))
if user is None:
raise NotFoundError("User not found")
current_devices = await self.store.get_devices_by_user(user_id=str(user_id))
current_devices_list = set(current_devices.keys())
target_device_list = set(body.devices)
to_add = target_device_list - current_devices_list
to_delete = current_devices_list - target_device_list
# Log what we're about to do to make it easier to debug if it stops
# mid-way, as this can be a long operation if there are a lot of devices
# to delete or to add.
if to_add and to_delete:
logger.info(
"Syncing %d devices for user %s will add %d devices and delete %d devices",
len(target_device_list),
user_id,
len(to_add),
len(to_delete),
)
elif to_add:
logger.info(
"Syncing %d devices for user %s will add %d devices",
len(target_device_list),
user_id,
len(to_add),
)
elif to_delete:
logger.info(
"Syncing %d devices for user %s will delete %d devices",
len(target_device_list),
user_id,
len(to_delete),
)
if to_delete:
await self.device_handler.delete_devices(
user_id=str(user_id), device_ids=to_delete
)
for device_id in to_add:
await self.device_handler.upsert_device(
user_id=str(user_id),
device_id=device_id,
)
return 200, {}

View File

@@ -0,0 +1,467 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl_3.0.html>.
#
#
import logging
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Optional, Tuple, TypedDict
from synapse._pydantic_compat import StrictBool, StrictStr, root_validator
from synapse.api.errors import NotFoundError, SynapseError
from synapse.http.servlet import (
parse_and_validate_json_object_from_request,
parse_string,
)
from synapse.types import JsonDict, UserID, UserInfo, create_requester
from synapse.types.rest import RequestBodyModel
if TYPE_CHECKING:
from synapse.http.site import SynapseRequest
from synapse.server import HomeServer
from ._base import MasBaseResource
logger = logging.getLogger(__name__)
class MasQueryUserResource(MasBaseResource):
"""
Endpoint for MAS to query user information by localpart.
Takes a localpart parameter and returns user profile data including display name,
avatar URL, and account status (suspended/deactivated).
GET /_synapse/mas/query_user?localpart=alice
"""
def __init__(self, hs: "HomeServer"):
MasBaseResource.__init__(self, hs)
class Response(TypedDict):
user_id: str
display_name: Optional[str]
avatar_url: Optional[str]
is_suspended: bool
is_deactivated: bool
async def _async_render_GET(
self, request: "SynapseRequest"
) -> Tuple[int, Response]:
self.assert_request_is_from_mas(request)
localpart = parse_string(request, "localpart", required=True)
user_id = UserID(localpart, self.hostname)
user: Optional[UserInfo] = await self.store.get_user_by_id(user_id=str(user_id))
if user is None:
raise NotFoundError("User not found")
profile = await self.store.get_profileinfo(user_id=user_id)
return HTTPStatus.OK, self.Response(
user_id=user_id.to_string(),
display_name=profile.display_name,
avatar_url=profile.avatar_url,
is_suspended=user.suspended,
is_deactivated=user.is_deactivated,
)
class MasProvisionUserResource(MasBaseResource):
"""
Endpoint for MAS to create or update user accounts and their profile data.
Takes a localpart and optional profile fields (display name, avatar URL, email addresses).
Can create new users or update existing ones by setting or unsetting profile fields.
POST /_synapse/mas/provision_user
{"localpart": "alice", "set_displayname": "Alice", "set_emails": ["alice@example.com"]}
"""
def __init__(self, hs: "HomeServer"):
MasBaseResource.__init__(self, hs)
self.registration_handler = hs.get_registration_handler()
self.identity_handler = hs.get_identity_handler()
self.auth_handler = hs.get_auth_handler()
self.profile_handler = hs.get_profile_handler()
self.clock = hs.get_clock()
self.auth = hs.get_auth()
class PostBody(RequestBodyModel):
localpart: StrictStr
unset_displayname: StrictBool = False
set_displayname: Optional[StrictStr] = None
unset_avatar_url: StrictBool = False
set_avatar_url: Optional[StrictStr] = None
unset_emails: StrictBool = False
set_emails: Optional[list[StrictStr]] = None
@root_validator(pre=True)
def validate_exclusive(cls, values: Any) -> Any:
if "unset_displayname" in values and "set_displayname" in values:
raise ValueError(
"Cannot specify both unset_displayname and set_displayname"
)
if "unset_avatar_url" in values and "set_avatar_url" in values:
raise ValueError(
"Cannot specify both unset_avatar_url and set_avatar_url"
)
if "unset_emails" in values and "set_emails" in values:
raise ValueError("Cannot specify both unset_emails and set_emails")
return values
async def _async_render_POST(
self, request: "SynapseRequest"
) -> Tuple[int, JsonDict]:
self.assert_request_is_from_mas(request)
body = parse_and_validate_json_object_from_request(request, self.PostBody)
localpart = body.localpart
user_id = UserID(localpart, self.hostname)
requester = create_requester(user_id=user_id)
existing_user = await self.store.get_user_by_id(user_id=str(user_id))
if existing_user is None:
created = True
await self.registration_handler.register_user(
localpart=localpart,
default_display_name=body.set_displayname,
bind_emails=body.set_emails,
by_admin=True,
)
else:
created = False
if body.unset_displayname:
await self.profile_handler.set_displayname(
target_user=user_id,
requester=requester,
new_displayname="",
by_admin=True,
)
elif body.set_displayname is not None:
await self.profile_handler.set_displayname(
target_user=user_id,
requester=requester,
new_displayname=body.set_displayname,
by_admin=True,
)
new_email_list: Optional[set[str]] = None
if body.unset_emails:
new_email_list = set()
elif body.set_emails is not None:
new_email_list = set(body.set_emails)
if new_email_list is not None:
medium = "email"
current_threepid_list = await self.store.user_get_threepids(
user_id=user_id.to_string()
)
current_email_list = {
t.address for t in current_threepid_list if t.medium == medium
}
to_delete = current_email_list - new_email_list
to_add = new_email_list - current_email_list
for address in to_delete:
await self.identity_handler.try_unbind_threepid(
mxid=user_id.to_string(),
medium=medium,
address=address,
id_server=None,
)
await self.auth_handler.delete_local_threepid(
user_id=user_id.to_string(),
medium=medium,
address=address,
)
current_time = self.clock.time_msec()
for address in to_add:
await self.auth_handler.add_threepid(
user_id=user_id.to_string(),
medium=medium,
address=address,
validated_at=current_time,
)
if body.unset_avatar_url:
await self.profile_handler.set_avatar_url(
target_user=user_id,
requester=requester,
new_avatar_url="",
by_admin=True,
)
elif body.set_avatar_url is not None:
await self.profile_handler.set_avatar_url(
target_user=user_id,
requester=requester,
new_avatar_url=body.set_avatar_url,
by_admin=True,
)
return HTTPStatus.CREATED if created else HTTPStatus.OK, {}
class MasIsLocalpartAvailableResource(MasBaseResource):
"""
Endpoint for MAS to check if a localpart is available for user registration.
Takes a localpart parameter and validates its format and availability,
checking for conflicts with existing users or application service namespaces.
GET /_synapse/mas/is_localpart_available?localpart=alice
"""
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.registration_handler = hs.get_registration_handler()
async def _async_render_GET(
self, request: "SynapseRequest"
) -> Tuple[int, JsonDict]:
self.assert_request_is_from_mas(request)
localpart = parse_string(request, "localpart")
if localpart is None:
raise SynapseError(400, "Missing localpart")
await self.registration_handler.check_username(localpart)
return HTTPStatus.OK, {}
class MasDeleteUserResource(MasBaseResource):
"""
Endpoint for MAS to delete/deactivate user accounts.
Takes a localpart and an erase flag to determine whether to deactivate
the account and optionally erase user data for compliance purposes.
POST /_synapse/mas/delete_user
{"localpart": "alice", "erase": true}
"""
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.deactivate_account_handler = hs.get_deactivate_account_handler()
class PostBody(RequestBodyModel):
localpart: StrictStr
erase: StrictBool
async def _async_render_POST(
self, request: "SynapseRequest"
) -> Tuple[int, JsonDict]:
self.assert_request_is_from_mas(request)
body = parse_and_validate_json_object_from_request(request, self.PostBody)
user_id = UserID(body.localpart, self.hostname)
# Check the user exists
user = await self.store.get_user_by_id(user_id=str(user_id))
if user is None:
raise NotFoundError("User not found")
await self.deactivate_account_handler.deactivate_account(
user_id=user_id.to_string(),
erase_data=body.erase,
requester=create_requester(user_id=user_id),
)
return HTTPStatus.OK, {}
class MasReactivateUserResource(MasBaseResource):
"""
Endpoint for MAS to reactivate previously deactivated user accounts.
Takes a localpart parameter to restore access to deactivated accounts.
POST /_synapse/mas/reactivate_user
{"localpart": "alice"}
"""
def __init__(self, hs: "HomeServer"):
MasBaseResource.__init__(self, hs)
self.deactivate_account_handler = hs.get_deactivate_account_handler()
class PostBody(RequestBodyModel):
localpart: StrictStr
async def _async_render_POST(
self, request: "SynapseRequest"
) -> Tuple[int, JsonDict]:
self.assert_request_is_from_mas(request)
body = parse_and_validate_json_object_from_request(request, self.PostBody)
user_id = UserID(body.localpart, self.hostname)
# Check the user exists
user = await self.store.get_user_by_id(user_id=str(user_id))
if user is None:
raise NotFoundError("User not found")
await self.deactivate_account_handler.activate_account(user_id=str(user_id))
return HTTPStatus.OK, {}
class MasSetDisplayNameResource(MasBaseResource):
"""
Endpoint for MAS to set a user's display name.
Takes a localpart and display name to update the user's profile.
POST /_synapse/mas/set_displayname
{"localpart": "alice", "displayname": "Alice"}
"""
def __init__(self, hs: "HomeServer"):
MasBaseResource.__init__(self, hs)
self.profile_handler = hs.get_profile_handler()
self.auth_handler = hs.get_auth_handler()
class PostBody(RequestBodyModel):
localpart: StrictStr
displayname: StrictStr
async def _async_render_POST(
self, request: "SynapseRequest"
) -> Tuple[int, JsonDict]:
self.assert_request_is_from_mas(request)
body = parse_and_validate_json_object_from_request(request, self.PostBody)
user_id = UserID(body.localpart, self.hostname)
# Check the user exists
user = await self.store.get_user_by_id(user_id=str(user_id))
if user is None:
raise NotFoundError("User not found")
requester = create_requester(user_id=user_id)
await self.profile_handler.set_displayname(
target_user=requester.user,
requester=requester,
new_displayname=body.displayname,
by_admin=True,
)
return HTTPStatus.OK, {}
class MasUnsetDisplayNameResource(MasBaseResource):
"""
Endpoint for MAS to clear a user's display name.
Takes a localpart parameter to remove the display name for the specified user.
POST /_synapse/mas/unset_displayname
{"localpart": "alice"}
"""
def __init__(self, hs: "HomeServer"):
MasBaseResource.__init__(self, hs)
self.profile_handler = hs.get_profile_handler()
self.auth_handler = hs.get_auth_handler()
class PostBody(RequestBodyModel):
localpart: StrictStr
async def _async_render_POST(
self, request: "SynapseRequest"
) -> Tuple[int, JsonDict]:
self.assert_request_is_from_mas(request)
body = parse_and_validate_json_object_from_request(request, self.PostBody)
user_id = UserID(body.localpart, self.hostname)
# Check the user exists
user = await self.store.get_user_by_id(user_id=str(user_id))
if user is None:
raise NotFoundError("User not found")
requester = create_requester(user_id=user_id)
await self.profile_handler.set_displayname(
target_user=requester.user,
requester=requester,
new_displayname="",
by_admin=True,
)
return HTTPStatus.OK, {}
class MasAllowCrossSigningResetResource(MasBaseResource):
"""
Endpoint for MAS to allow cross-signing key reset without user interaction.
Takes a localpart parameter to temporarily allow cross-signing key replacement
without requiring User-Interactive Authentication (UIA).
POST /_synapse/mas/allow_cross_signing_reset
{"localpart": "alice"}
"""
REPLACEMENT_PERIOD_MS = 10 * 60 * 1000 # 10 minutes
def __init__(self, hs: "HomeServer"):
MasBaseResource.__init__(self, hs)
self.auth_handler = hs.get_auth_handler()
class PostBody(RequestBodyModel):
localpart: StrictStr
async def _async_render_POST(
self, request: "SynapseRequest"
) -> Tuple[int, JsonDict]:
self.assert_request_is_from_mas(request)
body = parse_and_validate_json_object_from_request(request, self.PostBody)
user_id = UserID(body.localpart, self.hostname)
# Check the user exists
user = await self.store.get_user_by_id(user_id=str(user_id))
if user is None:
raise NotFoundError("User not found")
timestamp = (
await self.store.allow_master_cross_signing_key_replacement_without_uia(
user_id=str(user_id),
duration_ms=self.REPLACEMENT_PERIOD_MS,
)
)
if timestamp is None:
# If there are no cross-signing keys, this is a no-op, but we should log
logger.warning(
"User %s has no master cross-signing key", user_id.to_string()
)
return HTTPStatus.OK, {}

View File

@@ -117,6 +117,7 @@ from synapse.handlers.sliding_sync import SlidingSyncHandler
from synapse.handlers.sso import SsoHandler
from synapse.handlers.stats import StatsHandler
from synapse.handlers.sync import SyncHandler
from synapse.handlers.thread_subscriptions import ThreadSubscriptionsHandler
from synapse.handlers.typing import FollowerTypingHandler, TypingWriterHandler
from synapse.handlers.user_directory import UserDirectoryHandler
from synapse.handlers.worker_lock import WorkerLocksHandler
@@ -789,6 +790,10 @@ class HomeServer(metaclass=abc.ABCMeta):
def get_timestamp_lookup_handler(self) -> TimestampLookupHandler:
return TimestampLookupHandler(self)
@cache_in_self
def get_thread_subscriptions_handler(self) -> ThreadSubscriptionsHandler:
return ThreadSubscriptionsHandler(self)
@cache_in_self
def get_registration_handler(self) -> RegistrationHandler:
return RegistrationHandler(self)

View File

@@ -19,7 +19,6 @@
# [This file includes modifications made by New Vector Limited]
#
#
import logging
from typing import TYPE_CHECKING, List, Optional, Tuple, Union, cast
@@ -35,6 +34,9 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.sliding_sync import SlidingSyncStore
from synapse.storage.databases.main.stats import UserSortOrder
from synapse.storage.databases.main.thread_subscriptions import (
ThreadSubscriptionsWorkerStore,
)
from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.types import Cursor
from synapse.types import get_domain_from_id
@@ -141,6 +143,7 @@ class DataStore(
SearchStore,
TagsStore,
AccountDataStore,
ThreadSubscriptionsWorkerStore,
PushRulesWorkerStore,
StreamWorkerStore,
OpenIdStore,

View File

@@ -2986,6 +2986,10 @@ class PersistEventsStore:
# Upsert into the threads table, but only overwrite the value if the
# new event is of a later topological order OR if the topological
# ordering is equal, but the stream ordering is later.
# (Note by definition that the stream ordering will always be later
# unless this is a backfilled event [= negative stream ordering]
# because we are only persisting this event now and stream_orderings
# are strictly monotonically increasing)
sql = """
INSERT INTO threads (room_id, thread_id, latest_event_id, topological_ordering, stream_ordering)
VALUES (?, ?, ?, ?, ?)

View File

@@ -673,6 +673,7 @@ class RegistrationWorkerStore(StatsStore, CacheInvalidationWorkerStore):
desc="delete_account_validity_for_user",
)
@cached(max_entries=100000)
async def is_server_admin(self, user: UserID) -> bool:
"""Determines if a user is an admin of this homeserver.
@@ -707,6 +708,9 @@ class RegistrationWorkerStore(StatsStore, CacheInvalidationWorkerStore):
self._invalidate_cache_and_stream(
txn, self.get_user_by_id, (user.to_string(),)
)
self._invalidate_cache_and_stream(
txn, self.is_server_admin, (user.to_string(),)
)
await self.db_pool.runInteraction("set_server_admin", set_server_admin_txn)
@@ -2596,6 +2600,36 @@ class RegistrationWorkerStore(StatsStore, CacheInvalidationWorkerStore):
await self.db_pool.runInteraction("delete_access_token", f)
async def user_set_password_hash(
self, user_id: str, password_hash: Optional[str]
) -> None:
"""
NB. This does *not* evict any cache because the one use for this
removes most of the entries subsequently anyway so it would be
pointless. Use flush_user separately.
"""
def user_set_password_hash_txn(txn: LoggingTransaction) -> None:
self.db_pool.simple_update_one_txn(
txn, "users", {"name": user_id}, {"password_hash": password_hash}
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
await self.db_pool.runInteraction(
"user_set_password_hash", user_set_password_hash_txn
)
async def add_user_pending_deactivation(self, user_id: str) -> None:
"""
Adds a user to the table of users who need to be parted from all the rooms they're
in
"""
await self.db_pool.simple_insert(
"users_pending_deactivation",
values={"user_id": user_id},
desc="add_user_pending_deactivation",
)
class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
def __init__(
@@ -2820,25 +2854,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
return next_id
async def user_set_password_hash(
self, user_id: str, password_hash: Optional[str]
) -> None:
"""
NB. This does *not* evict any cache because the one use for this
removes most of the entries subsequently anyway so it would be
pointless. Use flush_user separately.
"""
def user_set_password_hash_txn(txn: LoggingTransaction) -> None:
self.db_pool.simple_update_one_txn(
txn, "users", {"name": user_id}, {"password_hash": password_hash}
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
await self.db_pool.runInteraction(
"user_set_password_hash", user_set_password_hash_txn
)
async def user_set_consent_version(
self, user_id: str, consent_version: str
) -> None:
@@ -2891,17 +2906,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
await self.db_pool.runInteraction("user_set_consent_server_notice_sent", f)
async def add_user_pending_deactivation(self, user_id: str) -> None:
"""
Adds a user to the table of users who need to be parted from all the rooms they're
in
"""
await self.db_pool.simple_insert(
"users_pending_deactivation",
values={"user_id": user_id},
desc="add_user_pending_deactivation",
)
async def validate_threepid_session(
self, session_id: str, client_secret: str, token: str, current_ts: int
) -> Optional[str]:

View File

@@ -0,0 +1,382 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
import logging
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterable,
List,
Optional,
Tuple,
Union,
cast,
)
import attr
from synapse.replication.tcp.streams._base import ThreadSubscriptionsStream
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.util.caches.descriptors import cached
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class ThreadSubscription:
automatic: bool
"""
whether the subscription was made automatically (as opposed to by manual
action from the user)
"""
class ThreadSubscriptionsWorkerStore(CacheInvalidationWorkerStore):
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
self._can_write_to_thread_subscriptions = (
self._instance_name in hs.config.worker.writers.thread_subscriptions
)
self._thread_subscriptions_id_gen: MultiWriterIdGenerator = (
MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="thread_subscriptions",
instance_name=self._instance_name,
tables=[
("thread_subscriptions", "instance_name", "stream_id"),
],
sequence_name="thread_subscriptions_sequence",
writers=hs.config.worker.writers.thread_subscriptions,
)
)
def process_replication_rows(
self,
stream_name: str,
instance_name: str,
token: int,
rows: Iterable[Any],
) -> None:
if stream_name == ThreadSubscriptionsStream.NAME:
for row in rows:
self.get_subscription_for_thread.invalidate(
(row.user_id, row.room_id, row.event_id)
)
super().process_replication_rows(stream_name, instance_name, token, rows)
def process_replication_position(
self, stream_name: str, instance_name: str, token: int
) -> None:
if stream_name == ThreadSubscriptionsStream.NAME:
self._thread_subscriptions_id_gen.advance(instance_name, token)
super().process_replication_position(stream_name, instance_name, token)
async def subscribe_user_to_thread(
self, user_id: str, room_id: str, thread_root_event_id: str, *, automatic: bool
) -> Optional[int]:
"""Updates a user's subscription settings for a specific thread root.
If no change would be made to the subscription, does not produce any database change.
Args:
user_id: The ID of the user whose settings are being updated.
room_id: The ID of the room the thread root belongs to.
thread_root_event_id: The event ID of the thread root.
automatic: Whether the subscription was performed automatically by the user's client.
Only `False` will overwrite an existing value of automatic for a subscription row.
Returns:
The stream ID for this update, if the update isn't no-opped.
"""
assert self._can_write_to_thread_subscriptions
def _subscribe_user_to_thread_txn(txn: LoggingTransaction) -> Optional[int]:
already_automatic = self.db_pool.simple_select_one_onecol_txn(
txn,
table="thread_subscriptions",
keyvalues={
"user_id": user_id,
"event_id": thread_root_event_id,
"room_id": room_id,
"subscribed": True,
},
retcol="automatic",
allow_none=True,
)
if already_automatic is None:
already_subscribed = False
already_automatic = True
else:
already_subscribed = True
# convert int (SQLite bool) to Python bool
already_automatic = bool(already_automatic)
if already_subscribed and already_automatic == automatic:
# there is nothing we need to do here
return None
stream_id = self._thread_subscriptions_id_gen.get_next_txn(txn)
values: Dict[str, Optional[Union[bool, int, str]]] = {
"subscribed": True,
"stream_id": stream_id,
"instance_name": self._instance_name,
"automatic": already_automatic and automatic,
}
self.db_pool.simple_upsert_txn(
txn,
table="thread_subscriptions",
keyvalues={
"user_id": user_id,
"event_id": thread_root_event_id,
"room_id": room_id,
},
values=values,
)
txn.call_after(
self.get_subscription_for_thread.invalidate,
(user_id, room_id, thread_root_event_id),
)
return stream_id
return await self.db_pool.runInteraction(
"subscribe_user_to_thread", _subscribe_user_to_thread_txn
)
async def unsubscribe_user_from_thread(
self, user_id: str, room_id: str, thread_root_event_id: str
) -> Optional[int]:
"""Unsubscribes a user from a thread.
If no change would be made to the subscription, does not produce any database change.
Args:
user_id: The ID of the user whose settings are being updated.
room_id: The ID of the room the thread root belongs to.
thread_root_event_id: The event ID of the thread root.
Returns:
The stream ID for this update, if the update isn't no-opped.
"""
assert self._can_write_to_thread_subscriptions
def _unsubscribe_user_from_thread_txn(txn: LoggingTransaction) -> Optional[int]:
already_subscribed = self.db_pool.simple_select_one_onecol_txn(
txn,
table="thread_subscriptions",
keyvalues={
"user_id": user_id,
"event_id": thread_root_event_id,
"room_id": room_id,
},
retcol="subscribed",
allow_none=True,
)
if already_subscribed is None or already_subscribed is False:
# there is nothing we need to do here
return None
stream_id = self._thread_subscriptions_id_gen.get_next_txn(txn)
self.db_pool.simple_update_txn(
txn,
table="thread_subscriptions",
keyvalues={
"user_id": user_id,
"event_id": thread_root_event_id,
"room_id": room_id,
"subscribed": True,
},
updatevalues={
"subscribed": False,
"stream_id": stream_id,
"instance_name": self._instance_name,
},
)
txn.call_after(
self.get_subscription_for_thread.invalidate,
(user_id, room_id, thread_root_event_id),
)
return stream_id
return await self.db_pool.runInteraction(
"unsubscribe_user_from_thread", _unsubscribe_user_from_thread_txn
)
async def purge_thread_subscription_settings_for_user(self, user_id: str) -> None:
"""
Purge all subscriptions for the user.
The fact that subscriptions have been purged will not be streamed;
all stream rows for the user will in fact be removed.
This is intended only for dealing with user deactivation.
"""
def _purge_thread_subscription_settings_for_user_txn(
txn: LoggingTransaction,
) -> None:
self.db_pool.simple_delete_txn(
txn,
table="thread_subscriptions",
keyvalues={"user_id": user_id},
)
self._invalidate_cache_and_stream(
txn, self.get_subscription_for_thread, (user_id,)
)
await self.db_pool.runInteraction(
desc="purge_thread_subscription_settings_for_user",
func=_purge_thread_subscription_settings_for_user_txn,
)
@cached(tree=True)
async def get_subscription_for_thread(
self, user_id: str, room_id: str, thread_root_event_id: str
) -> Optional[ThreadSubscription]:
"""Get the thread subscription for a specific thread and user.
Args:
user_id: The ID of the user
room_id: The ID of the room
thread_root_event_id: The event ID of the thread root
Returns:
A `ThreadSubscription` dataclass if there is a subscription,
or `None` if there is no subscription.
If there is a row in the table but `subscribed` is `False`,
behaves the same as if there was no row at all and returns `None`.
"""
row = await self.db_pool.simple_select_one(
table="thread_subscriptions",
keyvalues={
"user_id": user_id,
"room_id": room_id,
"event_id": thread_root_event_id,
"subscribed": True,
},
retcols=("automatic",),
allow_none=True,
desc="get_subscription_for_thread",
)
if row is None:
return None
(automatic_rawbool,) = row
# convert SQLite integer booleans into real booleans
automatic = bool(automatic_rawbool)
return ThreadSubscription(automatic=automatic)
def get_max_thread_subscriptions_stream_id(self) -> int:
"""Get the current maximum stream_id for thread subscriptions.
Returns:
The maximum stream_id
"""
return self._thread_subscriptions_id_gen.get_current_token()
async def get_updated_thread_subscriptions(
self, from_id: int, to_id: int, limit: int
) -> List[Tuple[int, str, str, str]]:
"""Get updates to thread subscriptions between two stream IDs.
Args:
from_id: The starting stream ID (exclusive)
to_id: The ending stream ID (inclusive)
limit: The maximum number of rows to return
Returns:
list of (stream_id, user_id, room_id, thread_root_id) tuples
"""
def get_updated_thread_subscriptions_txn(
txn: LoggingTransaction,
) -> List[Tuple[int, str, str, str]]:
sql = """
SELECT stream_id, user_id, room_id, event_id
FROM thread_subscriptions
WHERE ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC
LIMIT ?
"""
txn.execute(sql, (from_id, to_id, limit))
return cast(List[Tuple[int, str, str, str]], txn.fetchall())
return await self.db_pool.runInteraction(
"get_updated_thread_subscriptions",
get_updated_thread_subscriptions_txn,
)
async def get_updated_thread_subscriptions_for_user(
self, user_id: str, from_id: int, to_id: int, limit: int
) -> List[Tuple[int, str, str]]:
"""Get updates to thread subscriptions for a specific user.
Args:
user_id: The ID of the user
from_id: The starting stream ID (exclusive)
to_id: The ending stream ID (inclusive)
limit: The maximum number of rows to return
Returns:
A list of (stream_id, room_id, thread_root_event_id) tuples.
"""
def get_updated_thread_subscriptions_for_user_txn(
txn: LoggingTransaction,
) -> List[Tuple[int, str, str]]:
sql = """
SELECT stream_id, room_id, event_id
FROM thread_subscriptions
WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC
LIMIT ?
"""
txn.execute(sql, (user_id, from_id, to_id, limit))
return [(row[0], row[1], row[2]) for row in txn]
return await self.db_pool.runInteraction(
"get_updated_thread_subscriptions_for_user",
get_updated_thread_subscriptions_for_user_txn,
)

View File

@@ -70,8 +70,6 @@ class UserErasureWorkerStore(CacheInvalidationWorkerStore):
return {u: u in erased_users for u in user_ids}
class UserErasureStore(UserErasureWorkerStore):
async def mark_user_erased(self, user_id: str) -> None:
"""Indicate that user_id wishes their message history to be erased.
@@ -113,3 +111,7 @@ class UserErasureStore(UserErasureWorkerStore):
self._invalidate_cache_and_stream(txn, self.is_user_erased, (user_id,))
await self.db_pool.runInteraction("mark_user_not_erased", f)
class UserErasureStore(UserErasureWorkerStore):
pass

View File

@@ -0,0 +1,59 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- Introduce a table for tracking users' subscriptions to threads.
CREATE TABLE thread_subscriptions (
stream_id INTEGER NOT NULL PRIMARY KEY,
instance_name TEXT NOT NULL,
room_id TEXT NOT NULL,
event_id TEXT NOT NULL,
user_id TEXT NOT NULL,
subscribed BOOLEAN NOT NULL,
automatic BOOLEAN NOT NULL,
CONSTRAINT thread_subscriptions_fk_users
FOREIGN KEY (user_id)
REFERENCES users(name),
CONSTRAINT thread_subscriptions_fk_rooms
FOREIGN KEY (room_id)
-- When we delete a room, we should already have deleted all the events in that room
-- and so there shouldn't be any subscriptions left in that room.
-- So the `ON DELETE CASCADE` should be optional, but included anyway for good measure.
REFERENCES rooms(room_id) ON DELETE CASCADE,
CONSTRAINT thread_subscriptions_fk_events
FOREIGN KEY (event_id)
REFERENCES events(event_id) ON DELETE CASCADE,
-- This order provides a useful index for:
-- 1. foreign key constraint on (room_id)
-- 2. foreign key constraint on (room_id, event_id)
-- 3. finding the user's settings for a specific thread (as well as enforcing uniqueness)
UNIQUE (room_id, event_id, user_id)
);
-- this provides a useful index for finding a user's own rules,
-- potentially scoped to a single room
CREATE INDEX thread_subscriptions_user_room ON thread_subscriptions (user_id, room_id);
-- this provides a useful way for clients to efficiently find new changes to
-- their subscriptions.
-- (This is necessary to sync subscriptions between multiple devices.)
CREATE INDEX thread_subscriptions_by_user ON thread_subscriptions (user_id, stream_id);
-- this provides a useful index for deleting the subscriptions when the underlying
-- events are removed. This also covers the foreign key constraint on `events`.
CREATE INDEX thread_subscriptions_by_event ON thread_subscriptions (event_id);

View File

@@ -0,0 +1,19 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
CREATE SEQUENCE thread_subscriptions_sequence
-- Synapse streams start at 2, because the default position is 1
-- so any item inserted at position 1 is ignored.
-- This is also what existing streams do, except they use `setval(..., 1)`
-- which is semantically the same except less obvious.
START WITH 2;

View File

@@ -0,0 +1,18 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
COMMENT ON TABLE thread_subscriptions IS 'Tracks local users that subscribe to threads';
COMMENT ON COLUMN thread_subscriptions.subscribed IS 'Whether the user is subscribed to the thread or not. We track unsubscribed threads because we need to stream the subscription change to the client.';
COMMENT ON COLUMN thread_subscriptions.automatic IS 'True if the user was subscribed to the thread automatically by their client, or false if the client manually requested the subscription.';

View File

@@ -0,0 +1,24 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
COMMENT ON COLUMN threads.latest_event_id IS
'the ID of the event that is latest, ordered by (topological_ordering, stream_ordering)';
COMMENT ON COLUMN threads.topological_ordering IS
$$the topological ordering of the thread''s LATEST event.
Used as the primary way of ordering threads by recency in a room.$$;
COMMENT ON COLUMN threads.stream_ordering IS
$$the stream ordering of the thread's LATEST event.
Used as a tie-breaker for ordering threads by recency in a room, when the topological order is a tie.
Also used for recency ordering in sliding sync.$$;

View File

@@ -184,6 +184,12 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
Note: Only works with Postgres.
Warning: Streams using this generator start at ID 2, because ID 1 is always assumed
to have been 'seen as persisted'.
Unclear if this extant behaviour is desirable for some reason.
When creating a new sequence for a new stream,
it will be necessary to use `START WITH 2`.
Args:
db_conn
db
@@ -269,6 +275,9 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
self._known_persisted_positions: List[int] = []
# The maximum stream ID that we have seen been allocated across any writer.
# Since this defaults to 1, this means that ID 1 is assumed to have already
# been 'seen'. In other words, multi-writer streams start at 2.
# Unclear if this is desirable behaviour.
self._max_seen_allocated_stream_id = 1
# The maximum position of the local instance. This can be higher than

View File

@@ -362,7 +362,8 @@ class RoomID(DomainSpecificString):
@attr.s(slots=True, frozen=True, repr=False)
class EventID(DomainSpecificString):
"""Structure representing an event id."""
"""Structure representing an event ID which is namespaced to a homeserver.
Room versions 3 and above are not supported by this grammar."""
SIGIL = "$"

View File

@@ -0,0 +1,157 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
from twisted.test.proto_helpers import MemoryReactor
from synapse.replication.tcp.streams._base import (
_STREAM_UPDATE_TARGET_ROW_COUNT,
ThreadSubscriptionsStream,
)
from synapse.server import HomeServer
from synapse.storage.database import LoggingTransaction
from synapse.util import Clock
from tests.replication._base import BaseStreamTestCase
class ThreadSubscriptionsStreamTestCase(BaseStreamTestCase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
super().prepare(reactor, clock, hs)
# Postgres
def f(txn: LoggingTransaction) -> None:
txn.execute(
"""
ALTER TABLE thread_subscriptions
DROP CONSTRAINT thread_subscriptions_fk_users,
DROP CONSTRAINT thread_subscriptions_fk_rooms,
DROP CONSTRAINT thread_subscriptions_fk_events;
""",
)
self.get_success(
self.hs.get_datastores().main.db_pool.runInteraction(
"disable_foreign_keys", f
)
)
def test_thread_subscription_updates(self) -> None:
"""Test replication with thread subscription updates"""
store = self.hs.get_datastores().main
# Create thread subscription updates
updates = []
room_id = "!test_room:example.com"
# Generate several thread subscription updates
for i in range(_STREAM_UPDATE_TARGET_ROW_COUNT + 5):
thread_root_id = f"$thread_{i}:example.com"
self.get_success(
store.subscribe_user_to_thread(
"@test_user:example.org",
room_id,
thread_root_id,
automatic=True,
)
)
updates.append(thread_root_id)
# Also add one in a different room
other_room_id = "!other_room:example.com"
other_thread_root_id = "$other_thread:example.com"
self.get_success(
store.subscribe_user_to_thread(
"@test_user:example.org",
other_room_id,
other_thread_root_id,
automatic=False,
)
)
# Not yet connected: no rows should yet have been received
self.assertEqual([], self.test_handler.received_rdata_rows)
# Now reconnect to pull the updates
self.reconnect()
self.replicate()
# We should have received all the expected rows in the right order
# Filter the updates to only include thread subscription changes
received_rows = [
upd
for upd in self.test_handler.received_rdata_rows
if upd[0] == ThreadSubscriptionsStream.NAME
]
# Verify all the thread subscription updates
for thread_id in updates:
(stream_name, token, row) = received_rows.pop(0)
self.assertEqual(stream_name, ThreadSubscriptionsStream.NAME)
self.assertIsInstance(row, ThreadSubscriptionsStream.ROW_TYPE)
self.assertEqual(row.user_id, "@test_user:example.org")
self.assertEqual(row.room_id, room_id)
self.assertEqual(row.event_id, thread_id)
# Verify the last update in the different room
(stream_name, token, row) = received_rows.pop(0)
self.assertEqual(stream_name, ThreadSubscriptionsStream.NAME)
self.assertIsInstance(row, ThreadSubscriptionsStream.ROW_TYPE)
self.assertEqual(row.user_id, "@test_user:example.org")
self.assertEqual(row.room_id, other_room_id)
self.assertEqual(row.event_id, other_thread_root_id)
self.assertEqual([], received_rows)
def test_multiple_users_thread_subscription_updates(self) -> None:
"""Test replication with thread subscription updates for multiple users"""
store = self.hs.get_datastores().main
room_id = "!test_room:example.com"
thread_root_id = "$thread_root:example.com"
# Create updates for multiple users
users = ["@user1:example.com", "@user2:example.com", "@user3:example.com"]
for user_id in users:
self.get_success(
store.subscribe_user_to_thread(
user_id, room_id, thread_root_id, automatic=True
)
)
# Check no rows have been received yet
self.replicate()
self.assertEqual([], self.test_handler.received_rdata_rows)
# Not yet connected: no rows should yet have been received
self.reconnect()
self.replicate()
# We should have received all the expected rows
# Filter the updates to only include thread subscription changes
received_rows = [
upd
for upd in self.test_handler.received_rdata_rows
if upd[0] == ThreadSubscriptionsStream.NAME
]
# Should have one update per user
self.assertEqual(len(received_rows), len(users))
# Verify all updates
for i, user_id in enumerate(users):
(stream_name, token, row) = received_rows[i]
self.assertEqual(stream_name, ThreadSubscriptionsStream.NAME)
self.assertIsInstance(row, ThreadSubscriptionsStream.ROW_TYPE)
self.assertEqual(row.user_id, user_id)
self.assertEqual(row.room_id, room_id)
self.assertEqual(row.event_id, thread_root_id)

View File

@@ -5667,6 +5667,54 @@ class UserRedactionTestCase(unittest.HomeserverTestCase):
matched.append(event_id)
self.assertEqual(len(matched), len(originals))
def test_use_admin_param_for_redactions(self) -> None:
"""
Test that if the `use_admin` param is set to true, the admin user is used to issue
the redactions and that they succeed in a room where the admin user has sufficient
power to issue redactions
"""
originals = []
join = self.helper.join(self.rm1, self.bad_user, tok=self.bad_user_tok)
originals.append(join["event_id"])
for i in range(15):
event = {"body": f"hello{i}", "msgtype": "m.text"}
res = self.helper.send_event(
self.rm1, "m.room.message", event, tok=self.bad_user_tok
)
originals.append(res["event_id"])
# redact messages
channel = self.make_request(
"POST",
f"/_synapse/admin/v1/user/{self.bad_user}/redact",
content={"rooms": [self.rm1], "use_admin": True},
access_token=self.admin_tok,
)
self.assertEqual(channel.code, 200)
# messages are redacted, and redactions are issued by the admin user
filter = json.dumps({"types": [EventTypes.Redaction]})
channel = self.make_request(
"GET",
f"rooms/{self.rm1}/messages?filter={filter}&limit=50",
access_token=self.admin_tok,
)
self.assertEqual(channel.code, 200)
matches = []
for event in channel.json_body["chunk"]:
for event_id in originals:
if event["type"] == "m.room.redaction" and event["redacts"] == event_id:
matches.append((event_id, event))
# we redacted 16 messages
self.assertEqual(len(matches), 16)
for redaction_tuple in matches:
redaction = redaction_tuple[1]
if redaction["sender"] != self.admin:
self.fail("Redaction was not issued by admin account")
class UserRedactionBackgroundTaskTestCase(BaseMultiWorkerStreamTestCase):
servlets = [

View File

@@ -1181,7 +1181,7 @@ class BundledAggregationsTestCase(BaseRelationsTestCase):
bundled_aggregations,
)
self._test_bundled_aggregations(RelationTypes.REFERENCE, assert_annotations, 8)
self._test_bundled_aggregations(RelationTypes.REFERENCE, assert_annotations, 7)
def test_thread(self) -> None:
"""
@@ -1226,21 +1226,21 @@ class BundledAggregationsTestCase(BaseRelationsTestCase):
# The "user" sent the root event and is making queries for the bundled
# aggregations: they have participated.
self._test_bundled_aggregations(RelationTypes.THREAD, _gen_assert(True), 9)
self._test_bundled_aggregations(RelationTypes.THREAD, _gen_assert(True), 7)
# The "user2" sent replies in the thread and is making queries for the
# bundled aggregations: they have participated.
#
# Note that this re-uses some cached values, so the total number of
# queries is much smaller.
self._test_bundled_aggregations(
RelationTypes.THREAD, _gen_assert(True), 6, access_token=self.user2_token
RelationTypes.THREAD, _gen_assert(True), 4, access_token=self.user2_token
)
# A user with no interactions with the thread: they have not participated.
user3_id, user3_token = self._create_user("charlie")
self.helper.join(self.room, user=user3_id, tok=user3_token)
self._test_bundled_aggregations(
RelationTypes.THREAD, _gen_assert(False), 6, access_token=user3_token
RelationTypes.THREAD, _gen_assert(False), 4, access_token=user3_token
)
def test_thread_with_bundled_aggregations_for_latest(self) -> None:
@@ -1287,7 +1287,7 @@ class BundledAggregationsTestCase(BaseRelationsTestCase):
bundled_aggregations["latest_event"].get("unsigned"),
)
self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 9)
self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 7)
def test_nested_thread(self) -> None:
"""

View File

@@ -0,0 +1,256 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
from http import HTTPStatus
from twisted.test.proto_helpers import MemoryReactor
from synapse.rest import admin
from synapse.rest.client import login, profile, room, thread_subscriptions
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.util import Clock
from tests import unittest
PREFIX = "/_matrix/client/unstable/io.element.msc4306/rooms"
class ThreadSubscriptionsTestCase(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets_for_client_rest_resource,
login.register_servlets,
profile.register_servlets,
room.register_servlets,
thread_subscriptions.register_servlets,
]
def default_config(self) -> JsonDict:
config = super().default_config()
config["experimental_features"] = {"msc4306_enabled": True}
return config
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.user_id = self.register_user("user", "password")
self.token = self.login("user", "password")
self.other_user_id = self.register_user("other_user", "password")
self.other_token = self.login("other_user", "password")
# Create a room and send a message to use as a thread root
self.room_id = self.helper.create_room_as(self.user_id, tok=self.token)
self.helper.join(self.room_id, self.other_user_id, tok=self.other_token)
response = self.helper.send(self.room_id, body="Root message", tok=self.token)
self.root_event_id = response["event_id"]
# Send a message in the thread
self.helper.send_event(
room_id=self.room_id,
type="m.room.message",
content={
"body": "Thread message",
"msgtype": "m.text",
"m.relates_to": {
"rel_type": "m.thread",
"event_id": self.root_event_id,
},
},
tok=self.token,
)
def test_get_thread_subscription_unsubscribed(self) -> None:
"""Test retrieving thread subscription when not subscribed."""
channel = self.make_request(
"GET",
f"{PREFIX}/{self.room_id}/thread/{self.root_event_id}/subscription",
access_token=self.token,
)
self.assertEqual(channel.code, HTTPStatus.NOT_FOUND)
self.assertEqual(channel.json_body["errcode"], "M_NOT_FOUND")
def test_get_thread_subscription_nonexistent_thread(self) -> None:
"""Test retrieving subscription settings for a nonexistent thread."""
channel = self.make_request(
"GET",
f"{PREFIX}/{self.room_id}/thread/$nonexistent:example.org/subscription",
access_token=self.token,
)
self.assertEqual(channel.code, HTTPStatus.NOT_FOUND)
self.assertEqual(channel.json_body["errcode"], "M_NOT_FOUND")
def test_get_thread_subscription_no_access(self) -> None:
"""Test that a user can't get thread subscription for a thread they can't access."""
self.register_user("no_access", "password")
no_access_token = self.login("no_access", "password")
channel = self.make_request(
"GET",
f"{PREFIX}/{self.room_id}/thread/{self.root_event_id}/subscription",
access_token=no_access_token,
)
self.assertEqual(channel.code, HTTPStatus.NOT_FOUND)
self.assertEqual(channel.json_body["errcode"], "M_NOT_FOUND")
def test_subscribe_manual_then_automatic(self) -> None:
"""Test subscribing to a thread, first a manual subscription then an automatic subscription.
The manual subscription wins over the automatic one."""
channel = self.make_request(
"PUT",
f"{PREFIX}/{self.room_id}/thread/{self.root_event_id}/subscription",
{
"automatic": False,
},
access_token=self.token,
)
self.assertEqual(channel.code, HTTPStatus.OK)
# Assert the subscription was saved
channel = self.make_request(
"GET",
f"{PREFIX}/{self.room_id}/thread/{self.root_event_id}/subscription",
access_token=self.token,
)
self.assertEqual(channel.code, HTTPStatus.OK)
self.assertEqual(channel.json_body, {"automatic": False})
# Now also register an automatic subscription; it should not
# override the manual subscription
channel = self.make_request(
"PUT",
f"{PREFIX}/{self.room_id}/thread/{self.root_event_id}/subscription",
{"automatic": True},
access_token=self.token,
)
self.assertEqual(channel.code, HTTPStatus.OK)
# Assert the manual subscription was not overridden
channel = self.make_request(
"GET",
f"{PREFIX}/{self.room_id}/thread/{self.root_event_id}/subscription",
access_token=self.token,
)
self.assertEqual(channel.code, HTTPStatus.OK)
self.assertEqual(channel.json_body, {"automatic": False})
def test_subscribe_automatic_then_manual(self) -> None:
"""Test subscribing to a thread, first an automatic subscription then a manual subscription.
The manual subscription wins over the automatic one."""
channel = self.make_request(
"PUT",
f"{PREFIX}/{self.room_id}/thread/{self.root_event_id}/subscription",
{
"automatic": True,
},
access_token=self.token,
)
self.assertEqual(channel.code, HTTPStatus.OK)
# Assert the subscription was saved
channel = self.make_request(
"GET",
f"{PREFIX}/{self.room_id}/thread/{self.root_event_id}/subscription",
access_token=self.token,
)
self.assertEqual(channel.code, HTTPStatus.OK)
self.assertEqual(channel.json_body, {"automatic": True})
# Now also register a manual subscription
channel = self.make_request(
"PUT",
f"{PREFIX}/{self.room_id}/thread/{self.root_event_id}/subscription",
{"automatic": False},
access_token=self.token,
)
self.assertEqual(channel.code, HTTPStatus.OK)
# Assert the manual subscription was not overridden
channel = self.make_request(
"GET",
f"{PREFIX}/{self.room_id}/thread/{self.root_event_id}/subscription",
access_token=self.token,
)
self.assertEqual(channel.code, HTTPStatus.OK)
self.assertEqual(channel.json_body, {"automatic": False})
def test_unsubscribe(self) -> None:
"""Test subscribing to a thread, then unsubscribing."""
channel = self.make_request(
"PUT",
f"{PREFIX}/{self.room_id}/thread/{self.root_event_id}/subscription",
{
"automatic": True,
},
access_token=self.token,
)
self.assertEqual(channel.code, HTTPStatus.OK)
# Assert the subscription was saved
channel = self.make_request(
"GET",
f"{PREFIX}/{self.room_id}/thread/{self.root_event_id}/subscription",
access_token=self.token,
)
self.assertEqual(channel.code, HTTPStatus.OK)
self.assertEqual(channel.json_body, {"automatic": True})
# Now also register a manual subscription
channel = self.make_request(
"DELETE",
f"{PREFIX}/{self.room_id}/thread/{self.root_event_id}/subscription",
access_token=self.token,
)
self.assertEqual(channel.code, HTTPStatus.OK)
# Assert the manual subscription was not overridden
channel = self.make_request(
"GET",
f"{PREFIX}/{self.room_id}/thread/{self.root_event_id}/subscription",
access_token=self.token,
)
self.assertEqual(channel.code, HTTPStatus.NOT_FOUND)
self.assertEqual(channel.json_body["errcode"], "M_NOT_FOUND")
def test_set_thread_subscription_nonexistent_thread(self) -> None:
"""Test setting subscription settings for a nonexistent thread."""
channel = self.make_request(
"PUT",
f"{PREFIX}/{self.room_id}/thread/$nonexistent:example.org/subscription",
{"automatic": True},
access_token=self.token,
)
self.assertEqual(channel.code, HTTPStatus.NOT_FOUND)
self.assertEqual(channel.json_body["errcode"], "M_NOT_FOUND")
def test_set_thread_subscription_no_access(self) -> None:
"""Test that a user can't set thread subscription for a thread they can't access."""
self.register_user("no_access2", "password")
no_access_token = self.login("no_access2", "password")
channel = self.make_request(
"PUT",
f"{PREFIX}/{self.room_id}/thread/{self.root_event_id}/subscription",
{"automatic": True},
access_token=no_access_token,
)
self.assertEqual(channel.code, HTTPStatus.NOT_FOUND)
self.assertEqual(channel.json_body["errcode"], "M_NOT_FOUND")
def test_invalid_body(self) -> None:
"""Test that sending invalid subscription settings is rejected."""
channel = self.make_request(
"PUT",
f"{PREFIX}/{self.room_id}/thread/{self.root_event_id}/subscription",
# non-boolean `automatic`
{"automatic": "true"},
access_token=self.token,
)
self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST)

View File

@@ -0,0 +1,12 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.

View File

@@ -0,0 +1,43 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
from twisted.web.resource import Resource
from synapse.rest.synapse.client import build_synapse_client_resource_tree
from synapse.types import JsonDict
from tests import unittest
class BaseTestCase(unittest.HomeserverTestCase):
SHARED_SECRET = "shared_secret"
def default_config(self) -> JsonDict:
config = super().default_config()
config["enable_registration"] = False
config["experimental_features"] = {
"msc3861": {
"enabled": True,
"issuer": "https://example.com",
"client_id": "dummy",
"client_auth_method": "client_secret_basic",
"client_secret": "dummy",
"admin_token": self.SHARED_SECRET,
}
}
return config
def create_resource_dict(self) -> dict[str, Resource]:
base = super().create_resource_dict()
base.update(build_synapse_client_resource_tree(self.hs))
return base

View File

@@ -0,0 +1,693 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
from twisted.test.proto_helpers import MemoryReactor
from synapse.server import HomeServer
from synapse.types import UserID
from synapse.util import Clock
from tests.unittest import skip_unless
from tests.utils import HAS_AUTHLIB
from ._base import BaseTestCase
@skip_unless(HAS_AUTHLIB, "requires authlib")
class MasUpsertDeviceResource(BaseTestCase):
def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
# Create a user for testing
self.alice_user_id = UserID("alice", "test")
self.get_success(
homeserver.get_registration_handler().register_user(
localpart=self.alice_user_id.localpart,
)
)
def test_other_token(self) -> None:
channel = self.make_request(
"POST",
"/_synapse/mas/upsert_device",
shorthand=False,
access_token="other_token",
content={
"localpart": "alice",
"device_id": "DEVICE1",
},
)
self.assertEqual(channel.code, 403, channel.json_body)
self.assertEqual(
channel.json_body["error"], "This endpoint must only be called by MAS"
)
def test_upsert_device(self) -> None:
store = self.hs.get_datastores().main
channel = self.make_request(
"POST",
"/_synapse/mas/upsert_device",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "alice",
"device_id": "DEVICE1",
},
)
# This created a new device, hence the 201 status code
self.assertEqual(channel.code, 201, channel.json_body)
self.assertEqual(channel.json_body, {})
# Verify the device exists
device = self.get_success(store.get_device(str(self.alice_user_id), "DEVICE1"))
assert device is not None
self.assertEqual(device["device_id"], "DEVICE1")
self.assertIsNone(device["display_name"])
def test_update_existing_device(self) -> None:
store = self.hs.get_datastores().main
device_handler = self.hs.get_device_handler()
# Create an initial device
self.get_success(
device_handler.upsert_device(
user_id=str(self.alice_user_id),
device_id="DEVICE1",
display_name="Old Name",
)
)
channel = self.make_request(
"POST",
"/_synapse/mas/upsert_device",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "alice",
"device_id": "DEVICE1",
"display_name": "New Name",
},
)
# This updated an existing device, hence the 200 status code
self.assertEqual(channel.code, 200, channel.json_body)
self.assertEqual(channel.json_body, {})
# Verify the device was updated
device = self.get_success(store.get_device(str(self.alice_user_id), "DEVICE1"))
assert device is not None
self.assertEqual(device["display_name"], "New Name")
def test_upsert_device_with_display_name(self) -> None:
store = self.hs.get_datastores().main
channel = self.make_request(
"POST",
"/_synapse/mas/upsert_device",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "alice",
"device_id": "DEVICE1",
"display_name": "Alice's Phone",
},
)
self.assertEqual(channel.code, 201, channel.json_body)
self.assertEqual(channel.json_body, {})
# Verify the device exists with correct display name
device = self.get_success(store.get_device(str(self.alice_user_id), "DEVICE1"))
assert device is not None
self.assertEqual(device["display_name"], "Alice's Phone")
def test_upsert_device_missing_localpart(self) -> None:
channel = self.make_request(
"POST",
"/_synapse/mas/upsert_device",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"device_id": "DEVICE1",
},
)
self.assertEqual(channel.code, 400, channel.json_body)
def test_upsert_device_missing_device_id(self) -> None:
channel = self.make_request(
"POST",
"/_synapse/mas/upsert_device",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "alice",
},
)
self.assertEqual(channel.code, 400, channel.json_body)
def test_upsert_device_nonexistent_user(self) -> None:
channel = self.make_request(
"POST",
"/_synapse/mas/upsert_device",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "nonexistent",
"device_id": "DEVICE1",
},
)
# We get a 404 here as the user doesn't exist
self.assertEqual(channel.code, 404, channel.json_body)
@skip_unless(HAS_AUTHLIB, "requires authlib")
class MasDeleteDeviceResource(BaseTestCase):
def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
# Create a user and device for testing
self.alice_user_id = UserID("alice", "test")
self.get_success(
homeserver.get_registration_handler().register_user(
localpart=self.alice_user_id.localpart,
)
)
# Create a device
device_handler = homeserver.get_device_handler()
self.get_success(
device_handler.upsert_device(
user_id=str(self.alice_user_id),
device_id="DEVICE1",
display_name="Test Device",
)
)
def test_other_token(self) -> None:
channel = self.make_request(
"POST",
"/_synapse/mas/delete_device",
shorthand=False,
access_token="other_token",
content={
"localpart": "alice",
"device_id": "DEVICE1",
},
)
self.assertEqual(channel.code, 403, channel.json_body)
self.assertEqual(
channel.json_body["error"], "This endpoint must only be called by MAS"
)
def test_delete_device(self) -> None:
store = self.hs.get_datastores().main
# Verify device exists before deletion
device = self.get_success(store.get_device(str(self.alice_user_id), "DEVICE1"))
assert device is not None
channel = self.make_request(
"POST",
"/_synapse/mas/delete_device",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "alice",
"device_id": "DEVICE1",
},
)
self.assertEqual(channel.code, 204)
# Verify the device no longer exists
device = self.get_success(store.get_device(str(self.alice_user_id), "DEVICE1"))
self.assertIsNone(device)
def test_delete_nonexistent_device(self) -> None:
# Deleting a non-existent device should be idempotent
channel = self.make_request(
"POST",
"/_synapse/mas/delete_device",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "alice",
"device_id": "NONEXISTENT",
},
)
self.assertEqual(channel.code, 204)
def test_delete_device_missing_localpart(self) -> None:
channel = self.make_request(
"POST",
"/_synapse/mas/delete_device",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"device_id": "DEVICE1",
},
)
self.assertEqual(channel.code, 400, channel.json_body)
def test_delete_device_missing_device_id(self) -> None:
channel = self.make_request(
"POST",
"/_synapse/mas/delete_device",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "alice",
},
)
self.assertEqual(channel.code, 400, channel.json_body)
def test_delete_device_nonexistent_user(self) -> None:
channel = self.make_request(
"POST",
"/_synapse/mas/delete_device",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "nonexistent",
"device_id": "DEVICE1",
},
)
# Should fail on a non-existent user
self.assertEqual(channel.code, 404, channel.json_body)
@skip_unless(HAS_AUTHLIB, "requires authlib")
class MasUpdateDeviceDisplayNameResource(BaseTestCase):
def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
# Create a user and device for testing
self.alice_user_id = UserID("alice", "test")
self.get_success(
homeserver.get_registration_handler().register_user(
localpart=self.alice_user_id.localpart,
)
)
# Create a device
device_handler = homeserver.get_device_handler()
self.get_success(
device_handler.upsert_device(
user_id=str(self.alice_user_id),
device_id="DEVICE1",
display_name="Old Name",
)
)
def test_other_token(self) -> None:
channel = self.make_request(
"POST",
"/_synapse/mas/update_device_display_name",
shorthand=False,
access_token="other_token",
content={
"localpart": "alice",
"device_id": "DEVICE1",
"display_name": "New Name",
},
)
self.assertEqual(channel.code, 403, channel.json_body)
self.assertEqual(
channel.json_body["error"], "This endpoint must only be called by MAS"
)
def test_update_device_display_name(self) -> None:
store = self.hs.get_datastores().main
# Verify initial display name
device = self.get_success(store.get_device(str(self.alice_user_id), "DEVICE1"))
assert device is not None
self.assertEqual(device["display_name"], "Old Name")
channel = self.make_request(
"POST",
"/_synapse/mas/update_device_display_name",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "alice",
"device_id": "DEVICE1",
"display_name": "Updated Name",
},
)
self.assertEqual(channel.code, 200, channel.json_body)
self.assertEqual(channel.json_body, {})
# Verify the display name was updated
device = self.get_success(store.get_device(str(self.alice_user_id), "DEVICE1"))
assert device is not None
self.assertEqual(device["display_name"], "Updated Name")
def test_update_nonexistent_device(self) -> None:
# Updating a non-existent device should fail
channel = self.make_request(
"POST",
"/_synapse/mas/update_device_display_name",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "alice",
"device_id": "NONEXISTENT",
"display_name": "New Name",
},
)
self.assertEqual(channel.code, 404, channel.json_body)
def test_update_device_display_name_missing_localpart(self) -> None:
channel = self.make_request(
"POST",
"/_synapse/mas/update_device_display_name",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"device_id": "DEVICE1",
"display_name": "New Name",
},
)
self.assertEqual(channel.code, 400, channel.json_body)
def test_update_device_display_name_missing_device_id(self) -> None:
channel = self.make_request(
"POST",
"/_synapse/mas/update_device_display_name",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "alice",
"display_name": "New Name",
},
)
self.assertEqual(channel.code, 400, channel.json_body)
def test_update_device_display_name_missing_display_name(self) -> None:
channel = self.make_request(
"POST",
"/_synapse/mas/update_device_display_name",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "alice",
"device_id": "DEVICE1",
},
)
self.assertEqual(channel.code, 400, channel.json_body)
def test_update_device_display_name_nonexistent_user(self) -> None:
channel = self.make_request(
"POST",
"/_synapse/mas/update_device_display_name",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "nonexistent",
"device_id": "DEVICE1",
"display_name": "New Name",
},
)
self.assertEqual(channel.code, 404, channel.json_body)
@skip_unless(HAS_AUTHLIB, "requires authlib")
class MasSyncDevicesResource(BaseTestCase):
def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
# Create a user for testing
self.alice_user_id = UserID("alice", "test")
self.get_success(
homeserver.get_registration_handler().register_user(
localpart=self.alice_user_id.localpart,
)
)
# Create some initial devices
device_handler = homeserver.get_device_handler()
for device_id in ["DEVICE1", "DEVICE2", "DEVICE3"]:
self.get_success(
device_handler.upsert_device(
user_id=str(self.alice_user_id),
device_id=device_id,
display_name=f"Device {device_id}",
)
)
def test_other_token(self) -> None:
channel = self.make_request(
"POST",
"/_synapse/mas/sync_devices",
shorthand=False,
access_token="other_token",
content={
"localpart": "alice",
"devices": ["DEVICE1", "DEVICE2"],
},
)
self.assertEqual(channel.code, 403, channel.json_body)
self.assertEqual(
channel.json_body["error"], "This endpoint must only be called by MAS"
)
def test_sync_devices_no_changes(self) -> None:
# Sync with the same devices that already exist
channel = self.make_request(
"POST",
"/_synapse/mas/sync_devices",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "alice",
"devices": ["DEVICE1", "DEVICE2", "DEVICE3"],
},
)
self.assertEqual(channel.code, 200, channel.json_body)
self.assertEqual(channel.json_body, {})
# Verify all devices still exist
store = self.hs.get_datastores().main
devices = self.get_success(store.get_devices_by_user(str(self.alice_user_id)))
self.assertEqual(set(devices.keys()), {"DEVICE1", "DEVICE2", "DEVICE3"})
def test_sync_devices_add_only(self) -> None:
# Sync with additional devices
channel = self.make_request(
"POST",
"/_synapse/mas/sync_devices",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "alice",
"devices": ["DEVICE1", "DEVICE2", "DEVICE3", "DEVICE4", "DEVICE5"],
},
)
self.assertEqual(channel.code, 200, channel.json_body)
self.assertEqual(channel.json_body, {})
# Verify new devices were added
store = self.hs.get_datastores().main
devices = self.get_success(store.get_devices_by_user(str(self.alice_user_id)))
self.assertEqual(
set(devices.keys()), {"DEVICE1", "DEVICE2", "DEVICE3", "DEVICE4", "DEVICE5"}
)
def test_sync_devices_delete_only(self) -> None:
# Sync with fewer devices
channel = self.make_request(
"POST",
"/_synapse/mas/sync_devices",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "alice",
"devices": ["DEVICE1"],
},
)
self.assertEqual(channel.code, 200, channel.json_body)
self.assertEqual(channel.json_body, {})
# Verify devices were deleted
store = self.hs.get_datastores().main
devices = self.get_success(store.get_devices_by_user(str(self.alice_user_id)))
self.assertEqual(set(devices.keys()), {"DEVICE1"})
def test_sync_devices_add_and_delete(self) -> None:
# Sync with a mix of additions and deletions
channel = self.make_request(
"POST",
"/_synapse/mas/sync_devices",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "alice",
"devices": ["DEVICE1", "DEVICE4", "DEVICE5"],
},
)
self.assertEqual(channel.code, 200, channel.json_body)
self.assertEqual(channel.json_body, {})
# Verify the correct devices exist
store = self.hs.get_datastores().main
devices = self.get_success(store.get_devices_by_user(str(self.alice_user_id)))
self.assertEqual(set(devices.keys()), {"DEVICE1", "DEVICE4", "DEVICE5"})
def test_sync_devices_empty_list(self) -> None:
# Sync with empty device list (delete all devices)
channel = self.make_request(
"POST",
"/_synapse/mas/sync_devices",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "alice",
"devices": [],
},
)
self.assertEqual(channel.code, 200, channel.json_body)
self.assertEqual(channel.json_body, {})
# Verify all devices were deleted
store = self.hs.get_datastores().main
devices = self.get_success(store.get_devices_by_user(str(self.alice_user_id)))
self.assertEqual(devices, {})
def test_sync_devices_for_new_user(self) -> None:
# Test syncing devices for a user that doesn't have any devices yet
bob_user_id = UserID("bob", "test")
self.get_success(
self.hs.get_registration_handler().register_user(
localpart=bob_user_id.localpart,
)
)
channel = self.make_request(
"POST",
"/_synapse/mas/sync_devices",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "bob",
"devices": ["DEVICE1", "DEVICE2"],
},
)
self.assertEqual(channel.code, 200, channel.json_body)
self.assertEqual(channel.json_body, {})
# Verify devices were created
store = self.hs.get_datastores().main
devices = self.get_success(store.get_devices_by_user(str(bob_user_id)))
self.assertEqual(set(devices.keys()), {"DEVICE1", "DEVICE2"})
def test_sync_devices_missing_localpart(self) -> None:
channel = self.make_request(
"POST",
"/_synapse/mas/sync_devices",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"devices": ["DEVICE1", "DEVICE2"],
},
)
self.assertEqual(channel.code, 400, channel.json_body)
def test_sync_devices_missing_devices(self) -> None:
channel = self.make_request(
"POST",
"/_synapse/mas/sync_devices",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "alice",
},
)
self.assertEqual(channel.code, 400, channel.json_body)
def test_sync_devices_invalid_devices_type(self) -> None:
channel = self.make_request(
"POST",
"/_synapse/mas/sync_devices",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "alice",
"devices": "not_a_list",
},
)
self.assertEqual(channel.code, 400, channel.json_body)
def test_sync_devices_nonexistent_user(self) -> None:
# Test syncing devices for a user that doesn't exist
channel = self.make_request(
"POST",
"/_synapse/mas/sync_devices",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "nonexistent",
"devices": ["DEVICE1", "DEVICE2"],
},
)
self.assertEqual(channel.code, 404, channel.json_body)
def test_sync_devices_duplicate_device_ids(self) -> None:
# Test syncing with duplicate device IDs (sets should handle this)
channel = self.make_request(
"POST",
"/_synapse/mas/sync_devices",
shorthand=False,
access_token=self.SHARED_SECRET,
content={
"localpart": "alice",
"devices": ["DEVICE1", "DEVICE1", "DEVICE2"],
},
)
self.assertEqual(channel.code, 200, channel.json_body)
self.assertEqual(channel.json_body, {})
# Verify the correct devices exist (duplicates should be handled)
store = self.hs.get_datastores().main
devices = self.get_success(store.get_devices_by_user(str(self.alice_user_id)))
self.assertEqual(sorted(devices.keys()), ["DEVICE1", "DEVICE2"])

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,272 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
from typing import Optional
from twisted.test.proto_helpers import MemoryReactor
from synapse.server import HomeServer
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines.sqlite import Sqlite3Engine
from synapse.util import Clock
from tests import unittest
class ThreadSubscriptionsTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = self.hs.get_datastores().main
self.user_id = "@user:test"
self.room_id = "!room:test"
self.thread_root_id = "$thread_root:test"
self.other_thread_root_id = "$other_thread_root:test"
# Disable foreign key checks for testing
# This allows us to insert test data without having to create actual events
db_pool = self.store.db_pool
if isinstance(db_pool.engine, Sqlite3Engine):
self.get_success(
db_pool.execute("disable_foreign_keys", "PRAGMA foreign_keys = OFF;")
)
else:
# Postgres
def f(txn: LoggingTransaction) -> None:
txn.execute(
"""
ALTER TABLE thread_subscriptions
DROP CONSTRAINT thread_subscriptions_fk_users,
DROP CONSTRAINT thread_subscriptions_fk_rooms,
DROP CONSTRAINT thread_subscriptions_fk_events;
""",
)
self.get_success(db_pool.runInteraction("disable_foreign_keys", f))
# Create rooms and events in the db to satisfy foreign key constraints
self.get_success(db_pool.simple_insert("rooms", {"room_id": self.room_id}))
self.get_success(
db_pool.simple_insert(
"events",
{
"event_id": self.thread_root_id,
"room_id": self.room_id,
"topological_ordering": 1,
"stream_ordering": 1,
"type": "m.room.message",
"depth": 1,
"processed": True,
"outlier": False,
},
)
)
self.get_success(
db_pool.simple_insert(
"events",
{
"event_id": self.other_thread_root_id,
"room_id": self.room_id,
"topological_ordering": 2,
"stream_ordering": 2,
"type": "m.room.message",
"depth": 2,
"processed": True,
"outlier": False,
},
)
)
# Create the user
self.get_success(
db_pool.simple_insert("users", {"name": self.user_id, "is_guest": 0})
)
def _subscribe(
self,
thread_root_id: str,
*,
automatic: bool,
room_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> Optional[int]:
if user_id is None:
user_id = self.user_id
if room_id is None:
room_id = self.room_id
return self.get_success(
self.store.subscribe_user_to_thread(
user_id,
room_id,
thread_root_id,
automatic=automatic,
)
)
def _unsubscribe(
self,
thread_root_id: str,
room_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> Optional[int]:
if user_id is None:
user_id = self.user_id
if room_id is None:
room_id = self.room_id
return self.get_success(
self.store.unsubscribe_user_from_thread(
user_id,
room_id,
thread_root_id,
)
)
def test_set_and_get_thread_subscription(self) -> None:
"""Test basic setting and getting of thread subscriptions."""
# Initial state: no subscription
subscription = self.get_success(
self.store.get_subscription_for_thread(
self.user_id, self.room_id, self.thread_root_id
)
)
self.assertIsNone(subscription)
# Subscribe
self._subscribe(
self.thread_root_id,
automatic=True,
)
# Assert subscription went through
subscription = self.get_success(
self.store.get_subscription_for_thread(
self.user_id, self.room_id, self.thread_root_id
)
)
self.assertIsNotNone(subscription)
self.assertTrue(subscription.automatic) # type: ignore
# Now make it a manual subscription
self._subscribe(
self.thread_root_id,
automatic=False,
)
# Assert the manual subscription overrode the automatic one
subscription = self.get_success(
self.store.get_subscription_for_thread(
self.user_id, self.room_id, self.thread_root_id
)
)
self.assertFalse(subscription.automatic) # type: ignore
def test_purge_thread_subscriptions_for_user(self) -> None:
"""Test purging all thread subscription settings for a user."""
# Set subscription settings for multiple threads
self._subscribe(self.thread_root_id, automatic=True)
self._subscribe(self.other_thread_root_id, automatic=False)
subscriptions = self.get_success(
self.store.get_updated_thread_subscriptions_for_user(
self.user_id,
from_id=0,
to_id=50,
limit=50,
)
)
min_id = min(id for (id, _, _) in subscriptions)
self.assertEqual(
subscriptions,
[
(min_id, self.room_id, self.thread_root_id),
(min_id + 1, self.room_id, self.other_thread_root_id),
],
)
# Purge all settings for the user
self.get_success(
self.store.purge_thread_subscription_settings_for_user(self.user_id)
)
# Check user has no subscriptions
subscriptions = self.get_success(
self.store.get_updated_thread_subscriptions_for_user(
self.user_id,
from_id=0,
to_id=50,
limit=50,
)
)
self.assertEqual(subscriptions, [])
def test_get_updated_thread_subscriptions(self) -> None:
"""Test getting updated thread subscriptions since a stream ID."""
stream_id1 = self._subscribe(self.thread_root_id, automatic=False)
stream_id2 = self._subscribe(self.other_thread_root_id, automatic=True)
assert stream_id1 is not None
assert stream_id2 is not None
# Get updates since initial ID (should include both changes)
updates = self.get_success(
self.store.get_updated_thread_subscriptions(0, stream_id2, 10)
)
self.assertEqual(len(updates), 2)
# Get updates since first change (should include only the second change)
updates = self.get_success(
self.store.get_updated_thread_subscriptions(stream_id1, stream_id2, 10)
)
self.assertEqual(
updates,
[(stream_id2, self.user_id, self.room_id, self.other_thread_root_id)],
)
def test_get_updated_thread_subscriptions_for_user(self) -> None:
"""Test getting updated thread subscriptions for a specific user."""
other_user_id = "@other_user:test"
# Set thread subscription for main user
stream_id1 = self._subscribe(self.thread_root_id, automatic=True)
assert stream_id1 is not None
# Set thread subscription for other user
stream_id2 = self._subscribe(
self.other_thread_root_id,
automatic=True,
user_id=other_user_id,
)
assert stream_id2 is not None
# Get updates for main user
updates = self.get_success(
self.store.get_updated_thread_subscriptions_for_user(
self.user_id, 0, stream_id2, 10
)
)
self.assertEqual(updates, [(stream_id1, self.room_id, self.thread_root_id)])
# Get updates for other user
updates = self.get_success(
self.store.get_updated_thread_subscriptions_for_user(
other_user_id, 0, max(stream_id1, stream_id2), 10
)
)
self.assertEqual(
updates, [(stream_id2, self.room_id, self.other_thread_root_id)]
)