Compare commits

...

42 Commits

Author SHA1 Message Date
Andrew Morgan
e7ca69d6b2 Remove None handling
Since `_get_e2e_cross_signing_signatures_for_devices` always return a key for every entry in `device_query`,
there will never be a case where the function can return `None`, even when decorated with `@cacheList`.
2025-09-17 18:50:15 +01:00
Andrew Morgan
90387cc45f Use make_tuple_in_list_sql_clause
Required defining a three-column overload for the method to pass mypy.

Additionally the type-ignore was no longer required according to mypy.
2025-09-17 18:44:10 +01:00
Andrew Morgan
c6534e66b1 JSON encode replication keys
Instead of encoding them using a brittle, custom scheme.

Avoids errors where the device ID or user ID contains a comma.
2025-09-17 18:09:41 +01:00
Andrew Morgan
a096967507 lint 2025-09-17 11:52:35 +01:00
Andrew Morgan
3ec9f164f2 newsfile 2025-09-17 11:52:31 +01:00
Andrew Morgan
b4ff74c2f8 Clarify @cachedList None-caching behaviour 2025-09-17 11:52:31 +01:00
Andrew Morgan
21c2c0403c Special-case handling of cache invalidation stream
As the "keys" in an incoming cache invalidation stream object appear to always be a string,
we have to do some special case handling to convert the string into a tuple of (user_id, device_id).

This works, but is awful.
2025-09-17 11:46:25 +01:00
Andrew Morgan
1ea7cf0c57 Invalidate cross signing signatures cache
Invalidate and stream whenever we update `e2e_cross_signing_signatures`.
2025-09-17 11:36:43 +01:00
Andrew Morgan
6ffd8b9119 Make store_e2e_cross_signing_signatures use a custom txn func
This is needed to then call `_invalidate_cache_and_stream_bulk
with the `txn` object in the next commit.
2025-09-17 11:14:40 +01:00
Andrew Morgan
325632fdfc Update the calling site
The return type has changed, and we now need to handle a map of user_id/device_id tuples to lists of key signatures.
2025-09-17 11:12:52 +01:00
Andrew Morgan
ce8b0747ba Use @cachedList with _get_e2e_cross_signing_signatures_for_devices
The use of `@cachedList` requires having a cached, equivalent "single
item" method that represents the key and cached value. Calls to methods
with @cachedList are then processed and items that are already cached
are removed before the @cachedList-decorated method is called. The
single item method should not be directly used.

The return type needed to be changed to a "Mapping" containing a "Sequence" to satisfy the
requirement that cached results must be immutable (lest you risk
mutating the cache).

`@cachedList` also requires returning an dict where the keys
matches that of the items in the cached input list. Our keys are tuples (and you'll later see why this is a pain).
2025-09-17 11:07:11 +01:00
Eric Eastwood
84d64251dc Remove sentinel logcontext where we log in setup, start and exit (#18870)
Remove `sentinel` logcontext where we log in `setup`, `start`, and exit.

Instead of having one giant PR that removes all places we use `sentinel`
logcontext, I've decided to tackle this more piece-meal. This PR covers
the parts if you just startup Synapse and exit it with no requests or
activity going on in between.

Part of https://github.com/element-hq/synapse/issues/18905 (Remove
`sentinel` logcontext where we log in Synapse)

Prerequisite for https://github.com/element-hq/synapse/pull/18868.
Logging with the `sentinel` logcontext means we won't know which server
the log came from.



### Why


9cc4001778/docs/log_contexts.md (L71-L81)

(docs updated in https://github.com/element-hq/synapse/pull/18900)


### Testing strategy

1. Run Synapse normally and with `daemonize: true`: `poetry run
synapse_homeserver --config-path homeserver.yaml`
 1. Execute some requests
 1. Shutdown the server
 1. Look for any bad log entries in your homeserver logs:
    - `Expected logging context sentinel but found main`
    - `Expected logging context main was lost`
    - `Expected previous context`
    - `utime went backwards!`/`stime went backwards!`
- `Called stop on logcontext POST-0 without recording a start rusage`
 1. Look for any logs coming from the `sentinel` context


With these changes, you should only see the following logs (not from
Synapse) using the `sentinel` context if you start up Synapse and exit:

`homeserver.log`
```
2025-09-10 14:45:39,924 - asyncio - 64 - DEBUG - sentinel - Using selector: EpollSelector

2025-09-10 14:45:40,562 - twisted - 281 - INFO - sentinel - Received SIGINT, shutting down.

2025-09-10 14:45:40,562 - twisted - 281 - INFO - sentinel - (TCP Port 9322 Closed)
2025-09-10 14:45:40,563 - twisted - 281 - INFO - sentinel - (TCP Port 8008 Closed)
2025-09-10 14:45:40,563 - twisted - 281 - INFO - sentinel - (TCP Port 9093 Closed)
2025-09-10 14:45:40,564 - twisted - 281 - INFO - sentinel - Main loop terminated.
```
2025-09-16 17:15:08 -05:00
dependabot[bot]
2bed3fb566 Bump serde from 1.0.219 to 1.0.223 (#18920)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-15 20:05:23 +01:00
dependabot[bot]
2c60b67a95 Bump types-setuptools from 80.9.0.20250809 to 80.9.0.20250822 (#18924)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-15 17:37:43 +01:00
dependabot[bot]
6358afff8d Bump pydantic from 2.11.7 to 2.11.9 (#18922)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-15 17:37:24 +01:00
dependabot[bot]
f7b547e2d8 Bump authlib from 1.6.1 to 1.6.3 (#18921)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-15 17:35:11 +01:00
dependabot[bot]
8f7bd946de Bump serde_json from 1.0.143 to 1.0.145 (#18919)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-15 17:31:12 +01:00
dependabot[bot]
4f80fa4b0a Bump types-psycopg2 from 2.9.21.20250809 to 2.9.21.20250915 (#18918)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-15 17:29:49 +01:00
dependabot[bot]
b2592667a4 Bump sigstore/cosign-installer from 3.9.2 to 3.10.0 (#18917)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-15 17:26:04 +01:00
Eric Eastwood
769d30a247 Clarify Python dependency constraints (#18856)
Clarify Python dependency constraints

Spawning from
https://github.com/element-hq/synapse/pull/18852#issuecomment-3212003675
as I don't actually know the the exact rule of thumb. It's unclear to me
what we care about exactly. Our [deprecation
policy](https://element-hq.github.io/synapse/latest/deprecation_policy.html)
mentions Debian oldstable support at-least for the version of SQLite.
But then we only refer to Debian stable for the Twisted dependency.
2025-09-15 09:45:41 -05:00
Eric Eastwood
7ecfe8b1a8 Better explain which context the task is run in when using run_in_background(...) or run_as_background_process(...) (#18906)
Follow-up to https://github.com/element-hq/synapse/pull/18900
2025-09-12 09:29:35 -05:00
Hugh Nimmo-Smith
e1036ffa48 Add get_media_upload_limits_for_user and on_media_upload_limit_exceeded callbacks to module API (#18848)
Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
2025-09-12 12:26:19 +01:00
Andrew Morgan
8c98cf7e55 Remove usage of deprecated pkg_resources interface (#18910) 2025-09-12 10:57:04 +01:00
Kegan Dougal
ec64c3e88d Ensure we /send PDUs which pass canonical JSON checks (#18641)
### Pull Request Checklist

Fixes https://github.com/element-hq/synapse/issues/18554

Looks like this was missed when it was
[implemented](2277df2a1e).

<!-- Please read
https://element-hq.github.io/synapse/latest/development/contributing_guide.html
before submitting your pull request -->

* [x] Pull request is based on the develop branch
* [x] Pull request includes a [changelog
file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog).
The entry should:
- Be a short description of your change which makes sense to users.
"Fixed a bug that prevented receiving messages from other servers."
instead of "Moved X method from `EventStore` to `EventWorkerStore`.".
  - Use markdown where necessary, mostly for `code blocks`.
  - End with either a period (.) or an exclamation mark (!).
  - Start with a capital letter.
- Feel free to credit yourself, by adding a sentence "Contributed by
@github_username." or "Contributed by [Your Name]." to the end of the
entry.
* [x] [Code
style](https://element-hq.github.io/synapse/latest/code_style.html) is
correct (run the
[linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))

---------

Co-authored-by: reivilibre <oliverw@element.io>
2025-09-12 08:54:20 +00:00
reivilibre
ada3a3b2b3 Add experimental support for MSC4308: Thread Subscriptions extension to Sliding Sync when MSC4306 and MSC4186 are enabled. (#18695)
Closes: #18436

Implements:
https://github.com/matrix-org/matrix-spec-proposals/pull/4308

Follows: #18674

Adds an extension to Sliding Sync and a companion
endpoint needed for backpaginating missed thread subscription changes,
as described in MSC4308

---------

Signed-off-by: Olivier 'reivilibre <oliverw@matrix.org>
Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
2025-09-11 14:45:04 +01:00
Eric Eastwood
9cc4001778 Better explain logcontext in run_in_background(...) and run_as_background_process(...) (#18900)
Also adds a section in the docs explaining the `sentinel` logcontext.

Spawning from https://github.com/element-hq/synapse/pull/18870


### Testing strategy

1. Run Synapse normally and with `daemonize: true`: `poetry run
synapse_homeserver --config-path homeserver.yaml`
 1. Execute some requests
 1. Shutdown the server
 1. Look for any bad log entries in your homeserver logs:
    - `Expected logging context sentinel but found main`
    - `Expected logging context main was lost`
    - `Expected previous context`
    - `utime went backwards!`/`stime went backwards!`
- `Called stop on logcontext POST-0 without recording a start rusage`
    - `Background process re-entered without a proc`

Twisted trial tests:

 1. Run full Twisted trial test suite.
1. Check the logs for `Test starting with non-sentinel logging context ...`
2025-09-10 10:22:53 -05:00
reivilibre
c68c5dd07b Update push rules for experimental MSC4306: Thread Subscriptions to follow newer draft. (#18846)
Follows: #18762

Implements: MSC4306

Closes: #18431
Closes: #18437

Move the MSC4306 push rules to a new kind `postcontent` 

Prevent users from creating user-defined `postcontent` rules 

---------

Signed-off-by: Olivier 'reivilibre <oliverw@matrix.org>
2025-09-09 18:37:04 +01:00
dependabot[bot]
92bdf77c3f Bump jsonschema from 4.25.0 to 4.25.1 (#18897)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-09 16:41:19 +01:00
dependabot[bot]
e43bf10187 Bump types-requests from 2.32.4.20250611 to 2.32.4.20250809 (#18895)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-09 16:39:44 +01:00
dependabot[bot]
6146dbad3e Bump towncrier from 24.8.0 to 25.8.0 (#18894)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-09 16:39:17 +01:00
Eric Eastwood
ca655e4020 Start background tasks after we fork the process (daemonize) (#18886)
Spawning from https://github.com/element-hq/synapse/pull/18871

[This change](6ce2f3e59d)
was originally used to fix CPU time going backwards when we `daemonize`.

While, we don't seem to run into this problem on `develop`, I still
think this is a good change to make. We don't need background tasks
running on a process that will soon be forcefully exited and where the
reactor isn't even running yet. We now kick off the background tasks
(`run_as_background_process`) after we have forked the process and
started the reactor.

Also as simple note, we don't need background tasks running in both halves of a fork.
2025-09-09 10:10:34 -05:00
dependabot[bot]
7951d41b4e Bump phonenumbers from 9.0.12 to 9.0.13 (#18893)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-09 15:53:24 +01:00
dependabot[bot]
e235099ab9 Bump log from 0.4.27 to 0.4.28 (#18892)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-09 15:52:59 +01:00
dependabot[bot]
3e865e403b Bump actions/setup-go from 5.5.0 to 6.0.0 (#18891)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-09 15:52:05 +01:00
dependabot[bot]
35e7e659f6 Bump actions/setup-python from 5.6.0 to 6.0.0 (#18890)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-09 15:49:22 +01:00
Andrew Morgan
39e4f27347 Merge branch 'master' into develop 2025-09-09 12:30:12 +01:00
reivilibre
6fe8137a4a Configure Synapse to run MSC4306: Thread Subscriptions Complement tests. (#18819)
Pairs with: https://github.com/matrix-org/complement/pull/795

Signed-off-by: Olivier 'reivilibre <oliverw@matrix.org>
2025-09-09 11:40:10 +01:00
Andrew Morgan
fcffd2e897 1.138.0 2025-09-09 11:21:30 +01:00
David Baker
d48e69ad4c Fix prefixed support for MSC4133 (#18875)
This fixes two bugs that affect the availability of MSC4133 until the
next spec release.

1. The servlet didn't recognise the unstable endpoint even when the
homeserver advertised it
 2. The HS didn't advertise support for the stable prefixed version

Would only have been a problem until the next spec release but it's nice
to have it work before then.
2025-09-09 09:53:08 +01:00
Amin Farjadi
74fdbc7b75 Fix typo in structured_logging.md for file handler config (#18872) 2025-09-09 09:51:36 +01:00
Jason Little
4d55f2f301 fix: Use the Enum's value for the dictionary key when responding to an admin request for experimental features (#18874)
While exploring bring up of using `orjson`, exposed an interesting flaw.
The stdlib `json` encoder seems to be ok with coercing a `str` from an
`Enum`(specifically, a `Class[str, Enum]`). The `orjson` encoder does
not like that this is a class and not a proper `str` per spec. Using the
`.value` of the enum as the key for the dict produced while answering a
`GET` admin request for experimental features seems to fix this.
2025-09-09 09:50:09 +01:00
reivilibre
dfccde9f60 Remove obsolete and experimental /sync/e2ee endpoint. (#18583)
Introduced in: https://github.com/element-hq/synapse/pull/17167

The endpoint was part of experiments for MSC3575 but does not feature in
that MSC.

Signed-off-by: Olivier 'reivilibre <oliverw@matrix.org>
2025-09-09 09:28:45 +01:00
102 changed files with 2242 additions and 901 deletions

View File

@@ -120,7 +120,7 @@ jobs:
uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1
- name: Install Cosign
uses: sigstore/cosign-installer@d58896d6a1865668819e1d91763c7751a165e159 # v3.9.2
uses: sigstore/cosign-installer@d7543c93d881b35a8faa02e8e3605f69b7a1ce62 # v3.10.0
- name: Calculate docker image tag
uses: docker/metadata-action@c1e51972afc2121e065aed6d45c65596fe445f3f # v5.8.0

View File

@@ -24,7 +24,7 @@ jobs:
mdbook-version: '0.4.17'
- name: Setup python
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
with:
python-version: "3.x"

View File

@@ -64,7 +64,7 @@ jobs:
run: echo 'window.SYNAPSE_VERSION = "${{ needs.pre.outputs.branch-version }}";' > ./docs/website_files/version.js
- name: Setup python
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
with:
python-version: "3.x"

View File

@@ -93,7 +93,7 @@ jobs:
-e POSTGRES_PASSWORD=postgres \
-e POSTGRES_INITDB_ARGS="--lc-collate C --lc-ctype C --encoding UTF8" \
postgres:${{ matrix.postgres-version }}
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
with:
python-version: "3.x"
- run: pip install .[all,test]
@@ -209,7 +209,7 @@ jobs:
- name: Prepare Complement's Prerequisites
run: synapse/.ci/scripts/setup_complement_prerequisites.sh
- uses: actions/setup-go@d35c59abb061a4a6fb18e82ac0862c26744d6ab5 # v5.5.0
- uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0
with:
cache-dependency-path: complement/go.sum
go-version-file: complement/go.mod

View File

@@ -17,7 +17,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
with:
python-version: '3.x'
- run: pip install tomli

View File

@@ -28,7 +28,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
with:
python-version: "3.x"
- id: set-distros
@@ -74,7 +74,7 @@ jobs:
${{ runner.os }}-buildx-
- name: Set up python
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
with:
python-version: "3.x"
@@ -134,7 +134,7 @@ jobs:
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
with:
# setup-python@v4 doesn't impose a default python version. Need to use 3.x
# here, because `python` on osx points to Python 2.7.
@@ -166,7 +166,7 @@ jobs:
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
with:
python-version: "3.10"

View File

@@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
with:
python-version: "3.x"
- name: Install check-jsonschema
@@ -41,7 +41,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
with:
python-version: "3.x"
- name: Install PyYAML

View File

@@ -107,7 +107,7 @@ jobs:
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
with:
python-version: "3.x"
- run: "pip install 'click==8.1.1' 'GitPython>=3.1.20'"
@@ -117,7 +117,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
with:
python-version: "3.x"
- run: .ci/scripts/check_lockfile.py
@@ -199,7 +199,7 @@ jobs:
with:
ref: ${{ github.event.pull_request.head.sha }}
fetch-depth: 0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
with:
python-version: "3.x"
- run: "pip install 'towncrier>=18.6.0rc1'"
@@ -327,7 +327,7 @@ jobs:
if: ${{ needs.changes.outputs.linting_readme == 'true' }}
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
with:
python-version: "3.x"
- run: "pip install rstcheck"
@@ -377,7 +377,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
with:
python-version: "3.x"
- id: get-matrix
@@ -468,7 +468,7 @@ jobs:
sudo apt-get -qq install build-essential libffi-dev python3-dev \
libxml2-dev libxslt-dev xmlsec1 zlib1g-dev libjpeg-dev libwebp-dev
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
with:
python-version: '3.9'
@@ -727,7 +727,7 @@ jobs:
- name: Prepare Complement's Prerequisites
run: synapse/.ci/scripts/setup_complement_prerequisites.sh
- uses: actions/setup-go@d35c59abb061a4a6fb18e82ac0862c26744d6ab5 # v5.5.0
- uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0
with:
cache-dependency-path: complement/go.sum
go-version-file: complement/go.mod

View File

@@ -182,7 +182,7 @@ jobs:
- name: Prepare Complement's Prerequisites
run: synapse/.ci/scripts/setup_complement_prerequisites.sh
- uses: actions/setup-go@d35c59abb061a4a6fb18e82ac0862c26744d6ab5 # v5.5.0
- uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0
with:
cache-dependency-path: complement/go.sum
go-version-file: complement/go.mod

View File

@@ -1,3 +1,10 @@
# Synapse 1.138.0 (2025-09-09)
No significant changes since 1.138.0rc1.
# Synapse 1.138.0rc1 (2025-09-02)
### Features

27
Cargo.lock generated
View File

@@ -753,9 +753,9 @@ checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956"
[[package]]
name = "log"
version = "0.4.27"
version = "0.4.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432"
[[package]]
name = "lru-slab"
@@ -1250,18 +1250,28 @@ dependencies = [
[[package]]
name = "serde"
version = "1.0.219"
version = "1.0.224"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6"
checksum = "6aaeb1e94f53b16384af593c71e20b095e958dab1d26939c1b70645c5cfbcc0b"
dependencies = [
"serde_core",
"serde_derive",
]
[[package]]
name = "serde_core"
version = "1.0.224"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32f39390fa6346e24defbcdd3d9544ba8a19985d0af74df8501fbfe9a64341ab"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.219"
version = "1.0.224"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00"
checksum = "87ff78ab5e8561c9a675bfc1785cb07ae721f0ee53329a595cefd8c04c2ac4e0"
dependencies = [
"proc-macro2",
"quote",
@@ -1270,14 +1280,15 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.143"
version = "1.0.145"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d401abef1d108fbd9cbaebc3e46611f4b1021f714a0597a71f41ee463f5f4a5a"
checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c"
dependencies = [
"itoa",
"memchr",
"ryu",
"serde",
"serde_core",
]
[[package]]

View File

@@ -0,0 +1 @@
Remove obsolete and experimental `/sync/e2ee` endpoint.

1
changelog.d/18641.bugfix Normal file
View File

@@ -0,0 +1 @@
Ensure all PDUs sent via `/send` pass canonical JSON checks.

View File

@@ -0,0 +1 @@
Add experimental support for [MSC4308: Thread Subscriptions extension to Sliding Sync](https://github.com/matrix-org/matrix-spec-proposals/pull/4308) when [MSC4306: Thread Subscriptions](https://github.com/matrix-org/matrix-spec-proposals/pull/4306) and [MSC4186: Simplified Sliding Sync](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) are enabled.

1
changelog.d/18819.misc Normal file
View File

@@ -0,0 +1 @@
Configure Synapse to run MSC4306: Thread Subscriptions Complement tests.

View File

@@ -0,0 +1 @@
Update push rules for experimental [MSC4306: Thread Subscriptions](https://github.com/matrix-org/matrix-doc/issues/4306) to follow newer draft.

View File

@@ -0,0 +1 @@
Add `get_media_upload_limits_for_user` and `on_media_upload_limit_exceeded` module API callbacks for media repository.

1
changelog.d/18856.doc Normal file
View File

@@ -0,0 +1 @@
Clarify Python dependency constraints in our deprecation policy.

1
changelog.d/18870.misc Normal file
View File

@@ -0,0 +1 @@
Remove `sentinel` logcontext usage where we log in `setup`, `start` and exit.

1
changelog.d/18874.misc Normal file
View File

@@ -0,0 +1 @@
Use the `Enum`'s value for the dictionary key when responding to an admin request for experimental features.

1
changelog.d/18875.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix prefixed support for MSC4133.

1
changelog.d/18886.misc Normal file
View File

@@ -0,0 +1 @@
Start background tasks after we fork the process (daemonize).

View File

@@ -0,0 +1 @@
Add an in-memory cache to `_get_e2e_cross_signing_signatures_for_devices` to reduce DB load.

1
changelog.d/18900.misc Normal file
View File

@@ -0,0 +1 @@
Better explain how we manage the logcontext in `run_in_background(...)` and `run_as_background_process(...)`.

1
changelog.d/18906.misc Normal file
View File

@@ -0,0 +1 @@
Better explain how we manage the logcontext in `run_in_background(...)` and `run_as_background_process(...)`.

1
changelog.d/18910.misc Normal file
View File

@@ -0,0 +1 @@
Replace usages of the deprecated `pkg_resources` interface in preparation of setuptools dropping it soon.

6
debian/changelog vendored
View File

@@ -1,3 +1,9 @@
matrix-synapse-py3 (1.138.0) stable; urgency=medium
* New Synapse release 1.138.0.
-- Synapse Packaging team <packages@matrix.org> Tue, 09 Sep 2025 11:21:25 +0100
matrix-synapse-py3 (1.138.0~rc1) stable; urgency=medium
* New synapse release 1.138.0rc1.

View File

@@ -133,6 +133,8 @@ experimental_features:
msc3984_appservice_key_query: true
# Invite filtering
msc4155_enabled: true
# Thread Subscriptions
msc4306_enabled: true
server_notices:
system_mxid_localpart: _server

View File

@@ -1,13 +1,11 @@
Deprecation Policy for Platform Dependencies
============================================
# Deprecation Policy
Synapse has a number of platform dependencies, including Python, Rust,
PostgreSQL and SQLite. This document outlines the policy towards which versions
we support, and when we drop support for versions in the future.
Synapse has a number of **platform dependencies** (Python, Rust, PostgreSQL, and SQLite)
and **application dependencies** (Python and Rust packages). This document outlines the
policy towards which versions we support, and when we drop support for versions in the
future.
Policy
------
## Platform Dependencies
Synapse follows the upstream support life cycles for Python and PostgreSQL,
i.e. when a version reaches End of Life Synapse will withdraw support for that
@@ -26,8 +24,8 @@ The oldest supported version of SQLite is the version
[provided](https://packages.debian.org/bullseye/libsqlite3-0) by
[Debian oldstable](https://wiki.debian.org/DebianOldStable).
Context
-------
### Context
It is important for system admins to have a clear understanding of the platform
requirements of Synapse and its deprecation policies so that they can
@@ -50,4 +48,42 @@ the ecosystem.
On a similar note, SQLite does not generally have a concept of "supported
release"; bugfixes are published for the latest minor release only. We chose to
track Debian's oldstable as this is relatively conservative, predictably updated
and is consistent with the `.deb` packages released by Matrix.org.
and is consistent with the `.deb` packages released by Matrix.org.
## Application dependencies
For application-level Python dependencies, we often specify loose version constraints
(ex. `>=X.Y.Z`) to be forwards compatible with any new versions. Upper bounds (`<A.B.C`)
are only added when necessary to prevent known incompatibilities.
When selecting a minimum version, while we are mindful of the impact on downstream
package maintainers, our primary focus is on the maintainability and progress of Synapse
itself.
For developers, a Python dependency version can be considered a "no-brainer" upgrade once it is
available in both the latest [Debian Stable](https://packages.debian.org/stable/) and
[Ubuntu LTS](https://launchpad.net/ubuntu) repositories. No need to burden yourself with
extra scrutiny or consideration at this point.
We aggressively update Rust dependencies. Since these are statically linked and managed
entirely by `cargo` during build, they *can* pose no ongoing maintenance burden on others.
This allows us to freely upgrade to leverage the latest ecosystem advancements assuming
they don't have their own system-level dependencies.
### Context
Because Python dependencies can easily be managed in a virtual environment, we are less
concerned about the criteria for selecting minimum versions. The only thing of concern
is making sure we're not making it unnecessarily difficult for downstream package
maintainers. Generally, this just means avoiding the bleeding edge for a few months.
The situation for Rust dependencies is fundamentally different. For packagers, the
concerns around Python dependency versions do not apply. The `cargo` tool handles
downloading and building all libraries to satisfy dependencies, and these libraries are
statically linked into the final binary. This means that from a packager's perspective,
the Rust dependency versions are an internal build detail, not a runtime dependency to
be managed on the target system. Consequently, we have even greater flexibility to
upgrade Rust dependencies as needed for the project. Some distros (e.g. Fedora) do
package Rust libraries, but this appears to be the outlier rather than the norm.

View File

@@ -59,6 +59,28 @@ def do_request_handling():
logger.debug("phew")
```
### The `sentinel` context
The default logcontext is `synapse.logging.context.SENTINEL_CONTEXT`, which is an empty
sentinel value to represent the root logcontext. This is what is used when there is no
other logcontext set. The phrase "clear/reset the logcontext" means to set the current
logcontext to the `sentinel` logcontext.
No CPU/database usage metrics are recorded against the `sentinel` logcontext.
Ideally, nothing from the Synapse homeserver would be logged against the `sentinel`
logcontext as we want to know which server the logs came from. In practice, this is not
always the case yet especially outside of request handling.
Global things outside of Synapse (e.g. Twisted reactor code) should run in the
`sentinel` logcontext. It's only when it calls into application code that a logcontext
gets activated. This means the reactor should be started in the `sentinel` logcontext,
and any time an awaitable yields control back to the reactor, it should reset the
logcontext to be the `sentinel` logcontext. This is important to avoid leaking the
current logcontext to the reactor (which would then get picked up and associated with
the next thing the reactor does).
## Using logcontexts with awaitables
Awaitables break the linear flow of code so that there is no longer a single entry point

View File

@@ -64,3 +64,68 @@ If multiple modules implement this callback, they will be considered in order. I
returns `True`, Synapse falls through to the next one. The value of the first callback that
returns `False` will be used. If this happens, Synapse will not call any of the subsequent
implementations of this callback.
### `get_media_upload_limits_for_user`
_First introduced in Synapse v1.139.0_
```python
async def get_media_upload_limits_for_user(user_id: str, size: int) -> Optional[List[synapse.module_api.MediaUploadLimit]]
```
**<span style="color:red">
Caution: This callback is currently experimental. The method signature or behaviour
may change without notice.
</span>**
Called when processing a request to store content in the media repository. This can be used to dynamically override
the [media upload limits configuration](../usage/configuration/config_documentation.html#media_upload_limits).
The arguments passed to this callback are:
* `user_id`: The Matrix user ID of the user (e.g. `@alice:example.com`) making the request.
If the callback returns a list then it will be used as the limits instead of those in the configuration (if any).
If an empty list is returned then no limits are applied (**warning:** users will be able
to upload as much data as they desire).
If multiple modules implement this callback, they will be considered in order. If a
callback returns `None`, Synapse falls through to the next one. The value of the first
callback that does not return `None` will be used. If this happens, Synapse will not call
any of the subsequent implementations of this callback.
If there are no registered modules, or if all modules return `None`, then
the default
[media upload limits configuration](../usage/configuration/config_documentation.html#media_upload_limits)
will be used.
### `on_media_upload_limit_exceeded`
_First introduced in Synapse v1.139.0_
```python
async def on_media_upload_limit_exceeded(user_id: str, limit: synapse.module_api.MediaUploadLimit, sent_bytes: int, attempted_bytes: int) -> None
```
**<span style="color:red">
Caution: This callback is currently experimental. The method signature or behaviour
may change without notice.
</span>**
Called when a user attempts to upload media that would exceed a
[configured media upload limit](../usage/configuration/config_documentation.html#media_upload_limits).
This callback will only be called on workers which handle
[POST /_matrix/media/v3/upload](https://spec.matrix.org/v1.15/client-server-api/#post_matrixmediav3upload)
requests.
This could be used to inform the user that they have reached a media upload limit through
some external method.
The arguments passed to this callback are:
* `user_id`: The Matrix user ID of the user (e.g. `@alice:example.com`) making the request.
* `limit`: The `synapse.module_api.MediaUploadLimit` representing the limit that was reached.
* `sent_bytes`: The number of bytes already sent during the period of the limit.
* `attempted_bytes`: The number of bytes that the user attempted to send.

View File

@@ -35,7 +35,7 @@ handlers:
loggers:
synapse:
level: INFO
handlers: [remote]
handlers: [file]
synapse.storage.SQL:
level: WARNING
```

View File

@@ -2168,9 +2168,12 @@ max_upload_size: 60M
### `media_upload_limits`
*(array)* A list of media upload limits defining how much data a given user can upload in a given time period.
These limits are applied in addition to the `max_upload_size` limit above (which applies to individual uploads).
An empty list means no limits are applied.
These settings can be overridden using the `get_media_upload_limits_for_user` module API [callback](../../modules/media_repository_callbacks.md#get_media_upload_limits_for_user).
Defaults to `[]`.
Example configuration:

50
poetry.lock generated
View File

@@ -34,15 +34,15 @@ tests-mypy = ["mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" a
[[package]]
name = "authlib"
version = "1.6.1"
version = "1.6.3"
description = "The ultimate Python library in building OAuth and OpenID Connect servers and clients."
optional = true
python-versions = ">=3.9"
groups = ["main"]
markers = "extra == \"all\" or extra == \"jwt\" or extra == \"oidc\""
files = [
{file = "authlib-1.6.1-py2.py3-none-any.whl", hash = "sha256:e9d2031c34c6309373ab845afc24168fe9e93dc52d252631f52642f21f5ed06e"},
{file = "authlib-1.6.1.tar.gz", hash = "sha256:4dffdbb1460ba6ec8c17981a4c67af7d8af131231b5a36a88a1e8c80c111cdfd"},
{file = "authlib-1.6.3-py2.py3-none-any.whl", hash = "sha256:7ea0f082edd95a03b7b72edac65ec7f8f68d703017d7e37573aee4fc603f2a48"},
{file = "authlib-1.6.3.tar.gz", hash = "sha256:9f7a982cc395de719e4c2215c5707e7ea690ecf84f1ab126f28c053f4219e610"},
]
[package.dependencies]
@@ -919,14 +919,14 @@ i18n = ["Babel (>=2.7)"]
[[package]]
name = "jsonschema"
version = "4.25.0"
version = "4.25.1"
description = "An implementation of JSON Schema validation for Python"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "jsonschema-4.25.0-py3-none-any.whl", hash = "sha256:24c2e8da302de79c8b9382fee3e76b355e44d2a4364bb207159ce10b517bd716"},
{file = "jsonschema-4.25.0.tar.gz", hash = "sha256:e63acf5c11762c0e6672ffb61482bdf57f0876684d8d249c0fe2d730d48bc55f"},
{file = "jsonschema-4.25.1-py3-none-any.whl", hash = "sha256:3fba0169e345c7175110351d456342c364814cfcf3b964ba4587f22915230a63"},
{file = "jsonschema-4.25.1.tar.gz", hash = "sha256:e4a9655ce0da0c0b67a085847e00a3a51449e1157f4f75e9fb5aa545e122eb85"},
]
[package.dependencies]
@@ -1531,14 +1531,14 @@ files = [
[[package]]
name = "phonenumbers"
version = "9.0.12"
version = "9.0.13"
description = "Python version of Google's common library for parsing, formatting, storing and validating international phone numbers."
optional = false
python-versions = "*"
groups = ["main"]
files = [
{file = "phonenumbers-9.0.12-py2.py3-none-any.whl", hash = "sha256:900633afc3e12191458d710262df5efc117838bd1e2e613b64fa254a86bb20a1"},
{file = "phonenumbers-9.0.12.tar.gz", hash = "sha256:ccadff6b949494bd606836d8c9678bee5b55cb1cbad1e98bf7adae108e6fd0be"},
{file = "phonenumbers-9.0.13-py2.py3-none-any.whl", hash = "sha256:b97661e177773e7509c6d503e0f537cd0af22aa3746231654590876eb9430915"},
{file = "phonenumbers-9.0.13.tar.gz", hash = "sha256:eca06e01382412c45316868f86a44bb217c02f9ee7196589041556a2f54a7639"},
]
[[package]]
@@ -1774,14 +1774,14 @@ files = [
[[package]]
name = "pydantic"
version = "2.11.7"
version = "2.11.9"
description = "Data validation using Python type hints"
optional = false
python-versions = ">=3.9"
groups = ["main", "dev"]
files = [
{file = "pydantic-2.11.7-py3-none-any.whl", hash = "sha256:dde5df002701f6de26248661f6835bbe296a47bf73990135c7d07ce741b9623b"},
{file = "pydantic-2.11.7.tar.gz", hash = "sha256:d989c3c6cb79469287b1569f7447a17848c998458d49ebe294e975b9baf0f0db"},
{file = "pydantic-2.11.9-py3-none-any.whl", hash = "sha256:c42dd626f5cfc1c6950ce6205ea58c93efa406da65f479dcb4029d5934857da2"},
{file = "pydantic-2.11.9.tar.gz", hash = "sha256:6b8ffda597a14812a7975c90b82a8a2e777d9257aba3453f973acd3c032a18e2"},
]
[package.dependencies]
@@ -2747,14 +2747,14 @@ files = [
[[package]]
name = "towncrier"
version = "24.8.0"
version = "25.8.0"
description = "Building newsfiles for your project."
optional = false
python-versions = ">=3.8"
python-versions = ">=3.9"
groups = ["dev"]
files = [
{file = "towncrier-24.8.0-py3-none-any.whl", hash = "sha256:9343209592b839209cdf28c339ba45792fbfe9775b5f9c177462fd693e127d8d"},
{file = "towncrier-24.8.0.tar.gz", hash = "sha256:013423ee7eed102b2f393c287d22d95f66f1a3ea10a4baa82d298001a7f18af3"},
{file = "towncrier-25.8.0-py3-none-any.whl", hash = "sha256:b953d133d98f9aeae9084b56a3563fd2519dfc6ec33f61c9cd2c61ff243fb513"},
{file = "towncrier-25.8.0.tar.gz", hash = "sha256:eef16d29f831ad57abb3ae32a0565739866219f1ebfbdd297d32894eb9940eb1"},
]
[package.dependencies]
@@ -2971,14 +2971,14 @@ files = [
[[package]]
name = "types-psycopg2"
version = "2.9.21.20250809"
version = "2.9.21.20250915"
description = "Typing stubs for psycopg2"
optional = false
python-versions = ">=3.9"
groups = ["dev"]
files = [
{file = "types_psycopg2-2.9.21.20250809-py3-none-any.whl", hash = "sha256:59b7b0ed56dcae9efae62b8373497274fc1a0484bdc5135cdacbe5a8f44e1d7b"},
{file = "types_psycopg2-2.9.21.20250809.tar.gz", hash = "sha256:b7c2cbdcf7c0bd16240f59ba694347329b0463e43398de69784ea4dee45f3c6d"},
{file = "types_psycopg2-2.9.21.20250915-py3-none-any.whl", hash = "sha256:eefe5ccdc693fc086146e84c9ba437bb278efe1ef330b299a0cb71169dc6c55f"},
{file = "types_psycopg2-2.9.21.20250915.tar.gz", hash = "sha256:bfeb8f54c32490e7b5edc46215ab4163693192bc90407b4a023822de9239f5c8"},
]
[[package]]
@@ -3011,14 +3011,14 @@ files = [
[[package]]
name = "types-requests"
version = "2.32.4.20250611"
version = "2.32.4.20250809"
description = "Typing stubs for requests"
optional = false
python-versions = ">=3.9"
groups = ["dev"]
files = [
{file = "types_requests-2.32.4.20250611-py3-none-any.whl", hash = "sha256:ad2fe5d3b0cb3c2c902c8815a70e7fb2302c4b8c1f77bdcd738192cdb3878072"},
{file = "types_requests-2.32.4.20250611.tar.gz", hash = "sha256:741c8777ed6425830bf51e54d6abe245f79b4dcb9019f1622b773463946bf826"},
{file = "types_requests-2.32.4.20250809-py3-none-any.whl", hash = "sha256:f73d1832fb519ece02c85b1f09d5f0dd3108938e7d47e7f94bbfa18a6782b163"},
{file = "types_requests-2.32.4.20250809.tar.gz", hash = "sha256:d8060de1c8ee599311f56ff58010fb4902f462a1470802cf9f6ed27bc46c4df3"},
]
[package.dependencies]
@@ -3026,14 +3026,14 @@ urllib3 = ">=2"
[[package]]
name = "types-setuptools"
version = "80.9.0.20250809"
version = "80.9.0.20250822"
description = "Typing stubs for setuptools"
optional = false
python-versions = ">=3.9"
groups = ["dev"]
files = [
{file = "types_setuptools-80.9.0.20250809-py3-none-any.whl", hash = "sha256:7c6539b4c7ac7b4ab4db2be66d8a58fb1e28affa3ee3834be48acafd94f5976a"},
{file = "types_setuptools-80.9.0.20250809.tar.gz", hash = "sha256:e986ba37ffde364073d76189e1d79d9928fb6f5278c7d07589cde353d0218864"},
{file = "types_setuptools-80.9.0.20250822-py3-none-any.whl", hash = "sha256:53bf881cb9d7e46ed12c76ef76c0aaf28cfe6211d3fab12e0b83620b1a8642c3"},
{file = "types_setuptools-80.9.0.20250822.tar.gz", hash = "sha256:070ea7716968ec67a84c7f7768d9952ff24d28b65b6594797a464f1b3066f965"},
]
[[package]]

View File

@@ -101,7 +101,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry]
name = "matrix-synapse"
version = "1.138.0rc1"
version = "1.138.0"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "AGPL-3.0-or-later"

View File

@@ -289,10 +289,10 @@ pub const BASE_APPEND_CONTENT_RULES: &[PushRule] = &[PushRule {
default_enabled: true,
}];
pub const BASE_APPEND_UNDERRIDE_RULES: &[PushRule] = &[
pub const BASE_APPEND_POSTCONTENT_RULES: &[PushRule] = &[
PushRule {
rule_id: Cow::Borrowed("global/content/.io.element.msc4306.rule.unsubscribed_thread"),
priority_class: 1,
rule_id: Cow::Borrowed("global/postcontent/.io.element.msc4306.rule.unsubscribed_thread"),
priority_class: 6,
conditions: Cow::Borrowed(&[Condition::Known(
KnownCondition::Msc4306ThreadSubscription { subscribed: false },
)]),
@@ -301,8 +301,8 @@ pub const BASE_APPEND_UNDERRIDE_RULES: &[PushRule] = &[
default_enabled: true,
},
PushRule {
rule_id: Cow::Borrowed("global/content/.io.element.msc4306.rule.subscribed_thread"),
priority_class: 1,
rule_id: Cow::Borrowed("global/postcontent/.io.element.msc4306.rule.subscribed_thread"),
priority_class: 6,
conditions: Cow::Borrowed(&[Condition::Known(
KnownCondition::Msc4306ThreadSubscription { subscribed: true },
)]),
@@ -310,6 +310,9 @@ pub const BASE_APPEND_UNDERRIDE_RULES: &[PushRule] = &[
default: true,
default_enabled: true,
},
];
pub const BASE_APPEND_UNDERRIDE_RULES: &[PushRule] = &[
PushRule {
rule_id: Cow::Borrowed("global/underride/.m.rule.call"),
priority_class: 1,
@@ -726,6 +729,7 @@ lazy_static! {
.iter()
.chain(BASE_APPEND_OVERRIDE_RULES.iter())
.chain(BASE_APPEND_CONTENT_RULES.iter())
.chain(BASE_APPEND_POSTCONTENT_RULES.iter())
.chain(BASE_APPEND_UNDERRIDE_RULES.iter())
.map(|rule| { (&*rule.rule_id, rule) })
.collect();

View File

@@ -527,6 +527,7 @@ impl PushRules {
.chain(base_rules::BASE_APPEND_OVERRIDE_RULES.iter())
.chain(self.content.iter())
.chain(base_rules::BASE_APPEND_CONTENT_RULES.iter())
.chain(base_rules::BASE_APPEND_POSTCONTENT_RULES.iter())
.chain(self.room.iter())
.chain(self.sender.iter())
.chain(self.underride.iter())

View File

@@ -2415,8 +2415,15 @@ properties:
A list of media upload limits defining how much data a given user can
upload in a given time period.
These limits are applied in addition to the `max_upload_size` limit above
(which applies to individual uploads).
An empty list means no limits are applied.
These settings can be overridden using the `get_media_upload_limits_for_user`
module API [callback](../../modules/media_repository_callbacks.md#get_media_upload_limits_for_user).
default: []
items:
time_period:

View File

@@ -230,6 +230,7 @@ test_packages=(
./tests/msc3967
./tests/msc4140
./tests/msc4155
./tests/msc4306
)
# Enable dirty runs, so tests will reuse the same container where possible.

View File

@@ -120,6 +120,13 @@ def main() -> None:
# DB.
hs.setup()
# This will cause all of the relevant storage classes to be instantiated and call
# `register_background_update_handler(...)`,
# `register_background_index_update(...)`,
# `register_background_validate_constraint(...)`, etc so they are available to use
# if we are asked to run those background updates.
hs.get_storage_controllers()
if args.run_background_updates:
run_background_updates(hs)

View File

@@ -72,7 +72,7 @@ from synapse.events.auto_accept_invites import InviteAutoAccepter
from synapse.events.presence_router import load_legacy_presence_router
from synapse.handlers.auth import load_legacy_password_auth_providers
from synapse.http.site import SynapseSite
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.context import LoggingContext, PreserveLoggingContext
from synapse.logging.opentracing import init_tracer
from synapse.metrics import install_gc_manager, register_threadpool
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -183,25 +183,23 @@ def start_reactor(
if gc_thresholds:
gc.set_threshold(*gc_thresholds)
install_gc_manager()
run_command()
# make sure that we run the reactor with the sentinel log context,
# otherwise other PreserveLoggingContext instances will get confused
# and complain when they see the logcontext arbitrarily swapping
# between the sentinel and `run` logcontexts.
#
# We also need to drop the logcontext before forking if we're daemonizing,
# otherwise the cputime metrics get confused about the per-thread resource usage
# appearing to go backwards.
with PreserveLoggingContext():
if daemonize:
assert pid_file is not None
# Reset the logging context when we start the reactor (whenever we yield control
# to the reactor, the `sentinel` logging context needs to be set so we don't
# leak the current logging context and erroneously apply it to the next task the
# reactor event loop picks up)
with PreserveLoggingContext():
run_command()
if print_pidfile:
print(pid_file)
if daemonize:
assert pid_file is not None
daemonize_process(pid_file, logger)
run()
if print_pidfile:
print(pid_file)
daemonize_process(pid_file, logger)
run()
def quit_with_error(error_string: str) -> NoReturn:
@@ -601,18 +599,38 @@ async def start(hs: "HomeServer") -> None:
hs.get_datastores().main.db_pool.start_profiling()
hs.get_pusherpool().start()
def log_shutdown() -> None:
with LoggingContext("log_shutdown"):
logger.info("Shutting down...")
# Log when we start the shut down process.
hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", logger.info, "Shutting down..."
)
hs.get_reactor().addSystemEventTrigger("before", "shutdown", log_shutdown)
setup_sentry(hs)
setup_sdnotify(hs)
# If background tasks are running on the main process or this is the worker in
# charge of them, start collecting the phone home stats and shared usage metrics.
# Register background tasks required by this server. This must be done
# somewhat manually due to the background tasks not being registered
# unless handlers are instantiated.
#
# While we could "start" these before the reactor runs, nothing will happen until
# the reactor is running, so we may as well do it here in `start`.
#
# Additionally, this means we also start them after we daemonize and fork the
# process which means we can avoid any potential problems with cputime metrics
# getting confused about the per-thread resource usage appearing to go backwards
# because we're comparing the resource usage (`rusage`) from the original process to
# the forked process.
if hs.config.worker.run_background_tasks:
hs.start_background_tasks()
# TODO: This should be moved to same pattern we use for other background tasks:
# Add to `REQUIRED_ON_BACKGROUND_TASK_STARTUP` and rely on
# `start_background_tasks` to start it.
await hs.get_common_usage_metrics_manager().setup()
# TODO: This feels like another pattern that should refactored as one of the
# `REQUIRED_ON_BACKGROUND_TASK_STARTUP`
start_phone_stats_home(hs)
# We now freeze all allocated objects in the hopes that (almost)

View File

@@ -355,7 +355,12 @@ def start(config_options: List[str]) -> None:
except Exception as e:
handle_startup_exception(e)
register_start(_base.start, hs)
async def start() -> None:
# Re-establish log context now that we're back from the reactor
with LoggingContext("start"):
await _base.start(hs)
register_start(start)
# redirect stdio to the logs, if configured.
if not hs.config.logging.no_redirect_stdio:

View File

@@ -377,15 +377,17 @@ def setup(config_options: List[str]) -> SynapseHomeServer:
handle_startup_exception(e)
async def start() -> None:
# Load the OIDC provider metadatas, if OIDC is enabled.
if hs.config.oidc.oidc_enabled:
oidc = hs.get_oidc_handler()
# Loading the provider metadata also ensures the provider config is valid.
await oidc.load_metadata()
# Re-establish log context now that we're back from the reactor
with LoggingContext("start"):
# Load the OIDC provider metadatas, if OIDC is enabled.
if hs.config.oidc.oidc_enabled:
oidc = hs.get_oidc_handler()
# Loading the provider metadata also ensures the provider config is valid.
await oidc.load_metadata()
await _base.start(hs)
await _base.start(hs)
hs.get_datastores().main.db_pool.updates.start_doing_background_updates()
hs.get_datastores().main.db_pool.updates.start_doing_background_updates()
register_start(start)

View File

@@ -22,6 +22,7 @@
import argparse
import errno
import importlib.resources as importlib_resources
import logging
import os
import re
@@ -46,7 +47,6 @@ from typing import (
import attr
import jinja2
import pkg_resources
import yaml
from synapse.types import StrSequence
@@ -174,8 +174,8 @@ class Config:
self.root = root_config
# Get the path to the default Synapse template directory
self.default_template_dir = pkg_resources.resource_filename(
"synapse", "res/templates"
self.default_template_dir = str(
importlib_resources.files("synapse").joinpath("res").joinpath("templates")
)
@staticmethod

View File

@@ -590,5 +590,5 @@ class ExperimentalConfig(Config):
self.msc4293_enabled: bool = experimental.get("msc4293_enabled", False)
# MSC4306: Thread Subscriptions
# (and MSC4308: sliding sync extension for thread subscriptions)
# (and MSC4308: Thread Subscriptions extension to Sliding Sync)
self.msc4306_enabled: bool = experimental.get("msc4306_enabled", False)

View File

@@ -18,13 +18,13 @@
# [This file includes modifications made by New Vector Limited]
#
#
import importlib.resources as importlib_resources
import json
import re
from typing import Any, Dict, Iterable, List, Optional, Pattern
from urllib import parse as urlparse
import attr
import pkg_resources
from synapse.types import JsonDict, StrSequence
@@ -64,7 +64,12 @@ class OembedConfig(Config):
"""
# Whether to use the packaged providers.json file.
if not oembed_config.get("disable_default_providers") or False:
with pkg_resources.resource_stream("synapse", "res/providers.json") as s:
path = (
importlib_resources.files("synapse")
.joinpath("res")
.joinpath("providers.json")
)
with path.open("r", encoding="utf-8") as s:
providers = json.load(s)
yield from self._parse_and_validate_provider(

View File

@@ -120,11 +120,19 @@ def parse_thumbnail_requirements(
@attr.s(auto_attribs=True, slots=True, frozen=True)
class MediaUploadLimit:
"""A limit on the amount of data a user can upload in a given time
period."""
"""
Represents a limit on the amount of data a user can upload in a given time
period.
These can be configured through the `media_upload_limits` [config option](https://element-hq.github.io/synapse/latest/usage/configuration/config_documentation.html#media_upload_limits)
or via the `get_media_upload_limits_for_user` module API [callback](https://element-hq.github.io/synapse/latest/modules/media_repository_callbacks.html#get_media_upload_limits_for_user).
"""
max_bytes: int
"""The maximum number of bytes that can be uploaded in the given time period."""
time_period_ms: int
"""The time period in milliseconds."""
class ContentRepositoryConfig(Config):

View File

@@ -26,7 +26,7 @@ from synapse.api.constants import EduTypes
from synapse.api.errors import HttpResponseException
from synapse.events import EventBase
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.federation.units import Edu, Transaction, serialize_and_filter_pdus
from synapse.logging.opentracing import (
extract_text_map,
set_tag,
@@ -119,7 +119,7 @@ class TransactionManager:
transaction_id=txn_id,
origin=self.server_name,
destination=destination,
pdus=[p.get_pdu_json() for p in pdus],
pdus=serialize_and_filter_pdus(pdus),
edus=[edu.get_dict() for edu in edus],
)

View File

@@ -135,7 +135,7 @@ class PublicRoomList(BaseFederationServlet):
if not self.allow_access:
raise FederationDeniedError(origin)
limit = parse_integer_from_args(query, "limit", 0)
limit: Optional[int] = parse_integer_from_args(query, "limit", 0)
since_token = parse_string_from_args(query, "since", None)
include_all_networks = parse_boolean_from_args(
query, "include_all_networks", default=False

View File

@@ -211,7 +211,7 @@ class SlidingSyncHandler:
Args:
sync_config: Sync configuration
to_token: The point in the stream to sync up to.
to_token: The latest point in the stream to sync up to.
from_token: The point in the stream to sync from. Token of the end of the
previous batch. May be `None` if this is the initial sync request.
"""

View File

@@ -27,7 +27,7 @@ from typing import (
cast,
)
from typing_extensions import assert_never
from typing_extensions import TypeAlias, assert_never
from synapse.api.constants import AccountDataTypes, EduTypes
from synapse.handlers.receipts import ReceiptEventSource
@@ -40,6 +40,7 @@ from synapse.types import (
SlidingSyncStreamToken,
StrCollection,
StreamToken,
ThreadSubscriptionsToken,
)
from synapse.types.handlers.sliding_sync import (
HaveSentRoomFlag,
@@ -54,6 +55,13 @@ from synapse.util.async_helpers import (
gather_optional_coroutines,
)
_ThreadSubscription: TypeAlias = (
SlidingSyncResult.Extensions.ThreadSubscriptionsExtension.ThreadSubscription
)
_ThreadUnsubscription: TypeAlias = (
SlidingSyncResult.Extensions.ThreadSubscriptionsExtension.ThreadUnsubscription
)
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -68,6 +76,7 @@ class SlidingSyncExtensionHandler:
self.event_sources = hs.get_event_sources()
self.device_handler = hs.get_device_handler()
self.push_rules_handler = hs.get_push_rules_handler()
self._enable_thread_subscriptions = hs.config.experimental.msc4306_enabled
@trace
async def get_extensions_response(
@@ -93,7 +102,7 @@ class SlidingSyncExtensionHandler:
actual_room_ids: The actual room IDs in the the Sliding Sync response.
actual_room_response_map: A map of room ID to room results in the the
Sliding Sync response.
to_token: The point in the stream to sync up to.
to_token: The latest point in the stream to sync up to.
from_token: The point in the stream to sync from.
"""
@@ -156,18 +165,32 @@ class SlidingSyncExtensionHandler:
from_token=from_token,
)
thread_subs_coro = None
if (
sync_config.extensions.thread_subscriptions is not None
and self._enable_thread_subscriptions
):
thread_subs_coro = self.get_thread_subscriptions_extension_response(
sync_config=sync_config,
thread_subscriptions_request=sync_config.extensions.thread_subscriptions,
to_token=to_token,
from_token=from_token,
)
(
to_device_response,
e2ee_response,
account_data_response,
receipts_response,
typing_response,
thread_subs_response,
) = await gather_optional_coroutines(
to_device_coro,
e2ee_coro,
account_data_coro,
receipts_coro,
typing_coro,
thread_subs_coro,
)
return SlidingSyncResult.Extensions(
@@ -176,6 +199,7 @@ class SlidingSyncExtensionHandler:
account_data=account_data_response,
receipts=receipts_response,
typing=typing_response,
thread_subscriptions=thread_subs_response,
)
def find_relevant_room_ids_for_extension(
@@ -877,3 +901,72 @@ class SlidingSyncExtensionHandler:
return SlidingSyncResult.Extensions.TypingExtension(
room_id_to_typing_map=room_id_to_typing_map,
)
async def get_thread_subscriptions_extension_response(
self,
sync_config: SlidingSyncConfig,
thread_subscriptions_request: SlidingSyncConfig.Extensions.ThreadSubscriptionsExtension,
to_token: StreamToken,
from_token: Optional[SlidingSyncStreamToken],
) -> Optional[SlidingSyncResult.Extensions.ThreadSubscriptionsExtension]:
"""Handle Thread Subscriptions extension (MSC4308)
Args:
sync_config: Sync configuration
thread_subscriptions_request: The thread_subscriptions extension from the request
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from.
Returns:
the response (None if empty or thread subscriptions are disabled)
"""
if not thread_subscriptions_request.enabled:
return None
limit = thread_subscriptions_request.limit
if from_token:
from_stream_id = from_token.stream_token.thread_subscriptions_key
else:
from_stream_id = StreamToken.START.thread_subscriptions_key
to_stream_id = to_token.thread_subscriptions_key
updates = await self.store.get_latest_updated_thread_subscriptions_for_user(
user_id=sync_config.user.to_string(),
from_id=from_stream_id,
to_id=to_stream_id,
limit=limit,
)
if len(updates) == 0:
return None
subscribed_threads: Dict[str, Dict[str, _ThreadSubscription]] = {}
unsubscribed_threads: Dict[str, Dict[str, _ThreadUnsubscription]] = {}
for stream_id, room_id, thread_root_id, subscribed, automatic in updates:
if subscribed:
subscribed_threads.setdefault(room_id, {})[thread_root_id] = (
_ThreadSubscription(
automatic=automatic,
bump_stamp=stream_id,
)
)
else:
unsubscribed_threads.setdefault(room_id, {})[thread_root_id] = (
_ThreadUnsubscription(bump_stamp=stream_id)
)
prev_batch = None
if len(updates) == limit:
# Tell the client about a potential gap where there may be more
# thread subscriptions for it to backpaginate.
# We subtract one because the 'later in the stream' bound is inclusive,
# and we already saw the element at index 0.
prev_batch = ThreadSubscriptionsToken(updates[0][0] - 1)
return SlidingSyncResult.Extensions.ThreadSubscriptionsExtension(
subscribed=subscribed_threads,
unsubscribed=unsubscribed_threads,
prev_batch=prev_batch,
)

View File

@@ -20,7 +20,6 @@
#
import itertools
import logging
from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
@@ -28,14 +27,11 @@ from typing import (
Dict,
FrozenSet,
List,
Literal,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
overload,
)
import attr
@@ -120,25 +116,6 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
SyncRequestKey = Tuple[Any, ...]
class SyncVersion(Enum):
"""
Enum for specifying the version of sync request. This is used to key which type of
sync response that we are generating.
This is different than the `sync_type` you might see used in other code below; which
specifies the sub-type sync request (e.g. initial_sync, full_state_sync,
incremental_sync) and is really only relevant for the `/sync` v2 endpoint.
"""
# These string values are semantically significant because they are used in the the
# metrics
# Traditional `/sync` endpoint
SYNC_V2 = "sync_v2"
# Part of MSC3575 Sliding Sync
E2EE_SYNC = "e2ee_sync"
@attr.s(slots=True, frozen=True, auto_attribs=True)
class SyncConfig:
user: UserID
@@ -308,26 +285,6 @@ class SyncResult:
)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class E2eeSyncResult:
"""
Attributes:
next_batch: Token for the next sync
to_device: List of direct messages for the device.
device_lists: List of user_ids whose devices have changed
device_one_time_keys_count: Dict of algorithm to count for one time keys
for this device
device_unused_fallback_key_types: List of key types that have an unused fallback
key
"""
next_batch: StreamToken
to_device: List[JsonDict]
device_lists: DeviceListUpdates
device_one_time_keys_count: JsonMapping
device_unused_fallback_key_types: List[str]
class SyncHandler:
def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
@@ -373,52 +330,15 @@ class SyncHandler:
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
@overload
async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.SYNC_V2],
request_key: SyncRequestKey,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> SyncResult: ...
@overload
async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.E2EE_SYNC],
request_key: SyncRequestKey,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> E2eeSyncResult: ...
@overload
async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SyncConfig,
sync_version: SyncVersion,
request_key: SyncRequestKey,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> Union[SyncResult, E2eeSyncResult]: ...
async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SyncConfig,
sync_version: SyncVersion,
request_key: SyncRequestKey,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> Union[SyncResult, E2eeSyncResult]:
) -> SyncResult:
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
@@ -433,8 +353,7 @@ class SyncHandler:
full_state: Whether to return the full state for each room.
Returns:
When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
When `SyncVersion.E2EE_SYNC`, returns a `E2eeSyncResult`.
returns a full `SyncResult`.
"""
# If the user is not part of the mau group, then check that limits have
# not been exceeded (if not part of the group by this point, almost certain
@@ -446,7 +365,6 @@ class SyncHandler:
request_key,
self._wait_for_sync_for_user,
sync_config,
sync_version,
since_token,
timeout,
full_state,
@@ -455,48 +373,14 @@ class SyncHandler:
logger.debug("Returning sync response for %s", user_id)
return res
@overload
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.SYNC_V2],
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
cache_context: ResponseCacheContext[SyncRequestKey],
) -> SyncResult: ...
@overload
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.E2EE_SYNC],
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
cache_context: ResponseCacheContext[SyncRequestKey],
) -> E2eeSyncResult: ...
@overload
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
cache_context: ResponseCacheContext[SyncRequestKey],
) -> Union[SyncResult, E2eeSyncResult]: ...
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
cache_context: ResponseCacheContext[SyncRequestKey],
) -> Union[SyncResult, E2eeSyncResult]:
) -> SyncResult:
"""The start of the machinery that produces a /sync response.
See https://spec.matrix.org/v1.1/client-server-api/#syncing for full details.
@@ -517,7 +401,7 @@ class SyncHandler:
else:
sync_type = "incremental_sync"
sync_label = f"{sync_version}:{sync_type}"
sync_label = f"sync_v2:{sync_type}"
context = current_context()
if context:
@@ -578,19 +462,15 @@ class SyncHandler:
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result: Union[
SyncResult, E2eeSyncResult
] = await self.current_sync_for_user(
sync_config, sync_version, since_token, full_state=full_state
result = await self.current_sync_for_user(
sync_config, since_token, full_state=full_state
)
else:
# Otherwise, we wait for something to happen and report it to the user.
async def current_sync_callback(
before_token: StreamToken, after_token: StreamToken
) -> Union[SyncResult, E2eeSyncResult]:
return await self.current_sync_for_user(
sync_config, sync_version, since_token
)
) -> SyncResult:
return await self.current_sync_for_user(sync_config, since_token)
result = await self.notifier.wait_for_events(
sync_config.user.to_string(),
@@ -623,43 +503,15 @@ class SyncHandler:
return result
@overload
async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.SYNC_V2],
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> SyncResult: ...
@overload
async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.E2EE_SYNC],
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> E2eeSyncResult: ...
@overload
async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> Union[SyncResult, E2eeSyncResult]: ...
async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> Union[SyncResult, E2eeSyncResult]:
) -> SyncResult:
"""
Generates the response body of a sync result, represented as a
`SyncResult`/`E2eeSyncResult`.
`SyncResult`.
This is a wrapper around `generate_sync_result` which starts an open tracing
span to track the sync. See `generate_sync_result` for the next part of your
@@ -672,28 +524,15 @@ class SyncHandler:
full_state: Whether to return the full state for each room.
Returns:
When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
When `SyncVersion.E2EE_SYNC`, returns a `E2eeSyncResult`.
returns a full `SyncResult`.
"""
with start_active_span("sync.current_sync_for_user"):
log_kv({"since_token": since_token})
# Go through the `/sync` v2 path
if sync_version == SyncVersion.SYNC_V2:
sync_result: Union[
SyncResult, E2eeSyncResult
] = await self.generate_sync_result(
sync_config, since_token, full_state
)
# Go through the MSC3575 Sliding Sync `/sync/e2ee` path
elif sync_version == SyncVersion.E2EE_SYNC:
sync_result = await self.generate_e2ee_sync_result(
sync_config, since_token
)
else:
raise Exception(
f"Unknown sync_version (this is a Synapse problem): {sync_version}"
)
sync_result = await self.generate_sync_result(
sync_config, since_token, full_state
)
set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
return sync_result
@@ -1968,102 +1807,6 @@ class SyncHandler:
next_batch=sync_result_builder.now_token,
)
async def generate_e2ee_sync_result(
self,
sync_config: SyncConfig,
since_token: Optional[StreamToken] = None,
) -> E2eeSyncResult:
"""
Generates the response body of a MSC3575 Sliding Sync `/sync/e2ee` result.
This is represented by a `E2eeSyncResult` struct, which is built from small
pieces using a `SyncResultBuilder`. The `sync_result_builder` is passed as a
mutable ("inout") parameter to various helper functions. These retrieve and
process the data which forms the sync body, often writing to the
`sync_result_builder` to store their output.
At the end, we transfer data from the `sync_result_builder` to a new `E2eeSyncResult`
instance to signify that the sync calculation is complete.
"""
user_id = sync_config.user.to_string()
app_service = self.store.get_app_service_by_user_id(user_id)
if app_service:
# We no longer support AS users using /sync directly.
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()
sync_result_builder = await self.get_sync_result_builder(
sync_config,
since_token,
full_state=False,
)
# 1. Calculate `to_device` events
await self._generate_sync_entry_for_to_device(sync_result_builder)
# 2. Calculate `device_lists`
# Device list updates are sent if a since token is provided.
device_lists = DeviceListUpdates()
include_device_list_updates = bool(since_token and since_token.device_list_key)
if include_device_list_updates:
# Note that _generate_sync_entry_for_rooms sets sync_result_builder.joined, which
# is used in calculate_user_changes below.
#
# TODO: Running `_generate_sync_entry_for_rooms()` is a lot of work just to
# figure out the membership changes/derived info needed for
# `_generate_sync_entry_for_device_list()`. In the future, we should try to
# refactor this away.
(
newly_joined_rooms,
newly_left_rooms,
) = await self._generate_sync_entry_for_rooms(sync_result_builder)
# This uses the sync_result_builder.joined which is set in
# `_generate_sync_entry_for_rooms`, if that didn't find any joined
# rooms for some reason it is a no-op.
(
newly_joined_or_invited_or_knocked_users,
newly_left_users,
) = sync_result_builder.calculate_user_changes()
# include_device_list_updates can only be True if we have a
# since token.
assert since_token is not None
device_lists = await self._device_handler.generate_sync_entry_for_device_list(
user_id=user_id,
since_token=since_token,
now_token=sync_result_builder.now_token,
joined_room_ids=sync_result_builder.joined_room_ids,
newly_joined_rooms=newly_joined_rooms,
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
newly_left_rooms=newly_left_rooms,
newly_left_users=newly_left_users,
)
# 3. Calculate `device_one_time_keys_count` and `device_unused_fallback_key_types`
device_id = sync_config.device_id
one_time_keys_count: JsonMapping = {}
unused_fallback_key_types: List[str] = []
if device_id:
# TODO: We should have a way to let clients differentiate between the states of:
# * no change in OTK count since the provided since token
# * the server has zero OTKs left for this device
# Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298
one_time_keys_count = await self.store.count_e2e_one_time_keys(
user_id, device_id
)
unused_fallback_key_types = list(
await self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
)
return E2eeSyncResult(
to_device=sync_result_builder.to_device,
device_lists=device_lists,
device_one_time_keys_count=one_time_keys_count,
device_unused_fallback_key_types=unused_fallback_key_types,
next_batch=sync_result_builder.now_token,
)
async def get_sync_result_builder(
self,
sync_config: SyncConfig,

View File

@@ -9,7 +9,7 @@ from synapse.storage.databases.main.thread_subscriptions import (
AutomaticSubscriptionConflicted,
ThreadSubscription,
)
from synapse.types import EventOrderings, UserID
from synapse.types import EventOrderings, StreamKeyType, UserID
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -22,6 +22,7 @@ class ThreadSubscriptionsHandler:
self.store = hs.get_datastores().main
self.event_handler = hs.get_event_handler()
self.auth = hs.get_auth()
self._notifier = hs.get_notifier()
async def get_thread_subscription_settings(
self,
@@ -132,6 +133,15 @@ class ThreadSubscriptionsHandler:
errcode=Codes.MSC4306_CONFLICTING_UNSUBSCRIPTION,
)
if outcome is not None:
# wake up user streams (e.g. sliding sync) on the same worker
self._notifier.on_new_event(
StreamKeyType.THREAD_SUBSCRIPTIONS,
# outcome is a stream_id
outcome,
users=[user_id.to_string()],
)
return outcome
async def unsubscribe_user_from_thread(
@@ -162,8 +172,19 @@ class ThreadSubscriptionsHandler:
logger.info("rejecting thread subscriptions change (thread not accessible)")
raise NotFoundError("No such thread root")
return await self.store.unsubscribe_user_from_thread(
outcome = await self.store.unsubscribe_user_from_thread(
user_id.to_string(),
event.room_id,
thread_root_event_id,
)
if outcome is not None:
# wake up user streams (e.g. sliding sync) on the same worker
self._notifier.on_new_event(
StreamKeyType.THREAD_SUBSCRIPTIONS,
# outcome is a stream_id
outcome,
users=[user_id.to_string()],
)
return outcome

View File

@@ -130,6 +130,16 @@ def parse_integer(
return parse_integer_from_args(args, name, default, required, negative)
@overload
def parse_integer_from_args(
args: Mapping[bytes, Sequence[bytes]],
name: str,
default: int,
required: Literal[False] = False,
negative: bool = False,
) -> int: ...
@overload
def parse_integer_from_args(
args: Mapping[bytes, Sequence[bytes]],

View File

@@ -227,7 +227,16 @@ LoggingContextOrSentinel = Union["LoggingContext", "_Sentinel"]
class _Sentinel:
"""Sentinel to represent the root context"""
"""
Sentinel to represent the root context
This should only be used for tasks outside of Synapse like when we yield control
back to the Twisted reactor (event loop) so we don't leak the current logging
context to other tasks that are scheduled next in the event loop.
Nothing from the Synapse homeserver should be logged with the sentinel context. i.e.
we should always know which server the logs are coming from.
"""
__slots__ = ["previous_context", "finished", "request", "tag"]
@@ -616,9 +625,17 @@ class LoggingContextFilter(logging.Filter):
class PreserveLoggingContext:
"""Context manager which replaces the logging context
"""
Context manager which replaces the logging context
The previous logging context is restored on exit."""
The previous logging context is restored on exit.
`make_deferred_yieldable` is pretty equivalent to using `with
PreserveLoggingContext():` (using the default sentinel context), i.e. it clears the
logcontext before awaiting (and so before execution passes back to the reactor) and
restores the old context once the awaitable completes (execution passes from the
reactor back to the code).
"""
__slots__ = ["_old_context", "_new_context"]
@@ -784,6 +801,15 @@ def run_in_background(
return from the function, and that the sentinel context is set once the
deferred returned by the function completes.
To explain how the log contexts work here:
- When `run_in_background` is called, the current context is stored ("original"),
we kick off the background task in the current context, and we restore that
original context before returning
- When the background task finishes, we don't want to leak our context into the
reactor which would erroneously get attached to the next operation picked up by
the event loop. We add a callback to the deferred which will clear the logging
context after it finishes and yields control back to the reactor.
Useful for wrapping functions that return a deferred or coroutine, which you don't
yield or await on (for instance because you want to pass it to
deferred.gatherResults()).
@@ -795,9 +821,15 @@ def run_in_background(
`f` doesn't raise any deferred exceptions, otherwise a scary-looking
CRITICAL error about an unhandled error will be logged without much
indication about where it came from.
Returns:
Deferred which returns the result of func, or `None` if func raises.
Note that the returned Deferred does not follow the synapse logcontext
rules.
"""
current = current_context()
calling_context = current_context()
try:
# (kick off the task in the current context)
res = f(*args, **kwargs)
except Exception:
# the assumption here is that the caller doesn't want to be disturbed
@@ -806,6 +838,9 @@ def run_in_background(
# `res` may be a coroutine, `Deferred`, some other kind of awaitable, or a plain
# value. Convert it to a `Deferred`.
#
# Wrapping the value in a deferred has the side effect of executing the coroutine,
# if it is one. If it's already a deferred, then we can just use that.
d: "defer.Deferred[R]"
if isinstance(res, typing.Coroutine):
# Wrap the coroutine in a `Deferred`.
@@ -820,20 +855,24 @@ def run_in_background(
# `res` is a plain value. Wrap it in a `Deferred`.
d = defer.succeed(res)
# The deferred has already completed
if d.called and not d.paused:
# The function should have maintained the logcontext, so we can
# optimise out the messing about
return d
# The function may have reset the context before returning, so
# we need to restore it now.
ctx = set_current_context(current)
# The function may have reset the context before returning, so we need to restore it
# now.
#
# Our goal is to have the caller logcontext unchanged after firing off the
# background task and returning.
set_current_context(calling_context)
# The original context will be restored when the deferred
# completes, but there is nothing waiting for it, so it will
# get leaked into the reactor or some other function which
# wasn't expecting it. We therefore need to reset the context
# here.
# The original logcontext will be restored when the deferred completes, but
# there is nothing waiting for it, so it will get leaked into the reactor (which
# would then get picked up by the next thing the reactor does). We therefore
# need to reset the logcontext here (set the `sentinel` logcontext) before
# yielding control back to the reactor.
#
# (If this feels asymmetric, consider it this way: we are
# effectively forking a new thread of execution. We are
@@ -841,7 +880,7 @@ def run_in_background(
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
d.addBoth(_set_context_cb, ctx)
d.addBoth(_set_context_cb, SENTINEL_CONTEXT)
return d
@@ -859,20 +898,34 @@ def run_coroutine_in_background(
coroutine directly rather than a function. We can do this because coroutines
do not run until called, and so calling an async function without awaiting
cannot change the log contexts.
"""
current = current_context()
This is an ergonomic helper so we can do this:
```python
run_coroutine_in_background(func1(arg1))
```
Rather than having to do this:
```python
run_in_background(lambda: func1(arg1))
```
"""
calling_context = current_context()
# Wrap the coroutine in a deferred, which will have the side effect of executing the
# coroutine in the background.
d = defer.ensureDeferred(coroutine)
# The function may have reset the context before returning, so
# we need to restore it now.
ctx = set_current_context(current)
# The function may have reset the context before returning, so we need to restore it
# now.
#
# Our goal is to have the caller logcontext unchanged after firing off the
# background task and returning.
set_current_context(calling_context)
# The original context will be restored when the deferred
# completes, but there is nothing waiting for it, so it will
# get leaked into the reactor or some other function which
# wasn't expecting it. We therefore need to reset the context
# here.
# The original logcontext will be restored when the deferred completes, but
# there is nothing waiting for it, so it will get leaked into the reactor (which
# would then get picked up by the next thing the reactor does). We therefore
# need to reset the logcontext here (set the `sentinel` logcontext) before
# yielding control back to the reactor.
#
# (If this feels asymmetric, consider it this way: we are
# effectively forking a new thread of execution. We are
@@ -880,7 +933,7 @@ def run_coroutine_in_background(
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
d.addBoth(_set_context_cb, ctx)
d.addBoth(_set_context_cb, SENTINEL_CONTEXT)
return d
@@ -888,24 +941,43 @@ T = TypeVar("T")
def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
"""Given a deferred, make it follow the Synapse logcontext rules:
If the deferred has completed, essentially does nothing (just returns another
completed deferred with the result/failure).
If the deferred has not yet completed, resets the logcontext before
returning a deferred. Then, when the deferred completes, restores the
current logcontext before running callbacks/errbacks.
(This is more-or-less the opposite operation to run_in_background.)
"""
Given a deferred, make it follow the Synapse logcontext rules:
- If the deferred has completed, essentially does nothing (just returns another
completed deferred with the result/failure).
- If the deferred has not yet completed, resets the logcontext before returning a
incomplete deferred. Then, when the deferred completes, restores the current
logcontext before running callbacks/errbacks.
This means the resultant deferred can be awaited without leaking the current
logcontext to the reactor (which would then get erroneously picked up by the next
thing the reactor does), and also means that the logcontext is preserved when the
deferred completes.
(This is more-or-less the opposite operation to run_in_background in terms of how it
handles log contexts.)
Pretty much equivalent to using `with PreserveLoggingContext():`, i.e. it clears the
logcontext before awaiting (and so before execution passes back to the reactor) and
restores the old context once the awaitable completes (execution passes from the
reactor back to the code).
"""
# The deferred has already completed
if deferred.called and not deferred.paused:
# it looks like this deferred is ready to run any callbacks we give it
# immediately. We may as well optimise out the logcontext faffery.
return deferred
# ok, we can't be sure that a yield won't block, so let's reset the
# logcontext, and add a callback to the deferred to restore it.
# Our goal is to have the caller logcontext unchanged after they yield/await the
# returned deferred.
#
# When the caller yield/await's the returned deferred, it may yield
# control back to the reactor. To avoid leaking the current logcontext to the
# reactor (which would then get erroneously picked up by the next thing the reactor
# does) while the deferred runs in the reactor event loop, we reset the logcontext
# and add a callback to the deferred to restore it so the caller's logcontext is
# active when the deferred completes.
prev_context = set_current_context(SENTINEL_CONTEXT)
deferred.addBoth(_set_context_cb, prev_context)
return deferred

View File

@@ -179,11 +179,13 @@ class MediaRepository:
# We get the media upload limits and sort them in descending order of
# time period, so that we can apply some optimizations.
self.media_upload_limits = hs.config.media.media_upload_limits
self.media_upload_limits.sort(
self.default_media_upload_limits = hs.config.media.media_upload_limits
self.default_media_upload_limits.sort(
key=lambda limit: limit.time_period_ms, reverse=True
)
self.media_repository_callbacks = hs.get_module_api_callbacks().media_repository
def _start_update_recently_accessed(self) -> Deferred:
return run_as_background_process(
"update_recently_accessed_media",
@@ -340,16 +342,27 @@ class MediaRepository:
# Check that the user has not exceeded any of the media upload limits.
# Use limits from module API if provided
media_upload_limits = (
await self.media_repository_callbacks.get_media_upload_limits_for_user(
auth_user.to_string()
)
)
# Otherwise use the default limits from config
if media_upload_limits is None:
# Note: the media upload limits are sorted so larger time periods are
# first.
media_upload_limits = self.default_media_upload_limits
# This is the total size of media uploaded by the user in the last
# `time_period_ms` milliseconds, or None if we haven't checked yet.
uploaded_media_size: Optional[int] = None
# Note: the media upload limits are sorted so larger time periods are
# first.
for limit in self.media_upload_limits:
for limit in media_upload_limits:
# We only need to check the amount of media uploaded by the user in
# this latest (smaller) time period if the amount of media uploaded
# in a previous (larger) time period is above the limit.
# in a previous (larger) time period is below the limit.
#
# This optimization means that in the common case where the user
# hasn't uploaded much media, we only need to query the database
@@ -363,6 +376,12 @@ class MediaRepository:
)
if uploaded_media_size + content_length > limit.max_bytes:
await self.media_repository_callbacks.on_media_upload_limit_exceeded(
user_id=auth_user.to_string(),
limit=limit,
sent_bytes=uploaded_media_size,
attempted_bytes=content_length,
)
raise SynapseError(
400, "Media upload limit exceeded", Codes.RESOURCE_LIMIT_EXCEEDED
)

View File

@@ -43,7 +43,7 @@ from typing import (
)
import attr
from pkg_resources import parse_version
from packaging.version import parse as parse_version
from prometheus_client import (
CollectorRegistry,
Counter,

View File

@@ -228,6 +228,11 @@ def run_as_background_process(
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
normal synapse async function).
Because the returned Deferred does not follow the synapse logcontext rules, awaiting
the result of this function will result in the log context being cleared (bad). In
order to properly await the result of this function and maintain the current log
context, use `make_deferred_yieldable`.
Args:
desc: a description for this background process type
server_name: The homeserver name that this background process is being run for
@@ -280,6 +285,20 @@ def run_as_background_process(
name=desc, **{SERVER_NAME_LABEL: server_name}
).dec()
# To explain how the log contexts work here:
# - When `run_as_background_process` is called, the current context is stored
# (using `PreserveLoggingContext`), we kick off the background task, and we
# restore the original context before returning (also part of
# `PreserveLoggingContext`).
# - The background task runs in its own new logcontext named after `desc`
# - When the background task finishes, we don't want to leak our background context
# into the reactor which would erroneously get attached to the next operation
# picked up by the event loop. We use `PreserveLoggingContext` to set the
# `sentinel` context and means the new `BackgroundProcessLoggingContext` will
# remember the `sentinel` context as its previous context to return to when it
# exits and yields control back to the reactor.
#
# TODO: Why can't we simplify to using `return run_in_background(run)`?
with PreserveLoggingContext():
# Note that we return a Deferred here so that it can be used in a
# looping_call and other places that expect a Deferred.

View File

@@ -50,6 +50,7 @@ from synapse.api.constants import ProfileFields
from synapse.api.errors import SynapseError
from synapse.api.presence import UserPresenceState
from synapse.config import ConfigError
from synapse.config.repository import MediaUploadLimit
from synapse.events import EventBase
from synapse.events.presence_router import (
GET_INTERESTED_USERS_CALLBACK,
@@ -94,7 +95,9 @@ from synapse.module_api.callbacks.account_validity_callbacks import (
)
from synapse.module_api.callbacks.media_repository_callbacks import (
GET_MEDIA_CONFIG_FOR_USER_CALLBACK,
GET_MEDIA_UPLOAD_LIMITS_FOR_USER_CALLBACK,
IS_USER_ALLOWED_TO_UPLOAD_MEDIA_OF_SIZE_CALLBACK,
ON_MEDIA_UPLOAD_LIMIT_EXCEEDED_CALLBACK,
)
from synapse.module_api.callbacks.ratelimit_callbacks import (
GET_RATELIMIT_OVERRIDE_FOR_USER_CALLBACK,
@@ -205,6 +208,7 @@ __all__ = [
"RoomAlias",
"UserProfile",
"RatelimitOverride",
"MediaUploadLimit",
]
logger = logging.getLogger(__name__)
@@ -462,6 +466,12 @@ class ModuleApi:
is_user_allowed_to_upload_media_of_size: Optional[
IS_USER_ALLOWED_TO_UPLOAD_MEDIA_OF_SIZE_CALLBACK
] = None,
get_media_upload_limits_for_user: Optional[
GET_MEDIA_UPLOAD_LIMITS_FOR_USER_CALLBACK
] = None,
on_media_upload_limit_exceeded: Optional[
ON_MEDIA_UPLOAD_LIMIT_EXCEEDED_CALLBACK
] = None,
) -> None:
"""Registers callbacks for media repository capabilities.
Added in Synapse v1.132.0.
@@ -469,6 +479,8 @@ class ModuleApi:
return self._callbacks.media_repository.register_callbacks(
get_media_config_for_user=get_media_config_for_user,
is_user_allowed_to_upload_media_of_size=is_user_allowed_to_upload_media_of_size,
get_media_upload_limits_for_user=get_media_upload_limits_for_user,
on_media_upload_limit_exceeded=on_media_upload_limit_exceeded,
)
def register_third_party_rules_callbacks(

View File

@@ -15,6 +15,7 @@
import logging
from typing import TYPE_CHECKING, Awaitable, Callable, List, Optional
from synapse.config.repository import MediaUploadLimit
from synapse.types import JsonDict
from synapse.util.async_helpers import delay_cancellation
from synapse.util.metrics import Measure
@@ -28,6 +29,14 @@ GET_MEDIA_CONFIG_FOR_USER_CALLBACK = Callable[[str], Awaitable[Optional[JsonDict
IS_USER_ALLOWED_TO_UPLOAD_MEDIA_OF_SIZE_CALLBACK = Callable[[str, int], Awaitable[bool]]
GET_MEDIA_UPLOAD_LIMITS_FOR_USER_CALLBACK = Callable[
[str], Awaitable[Optional[List[MediaUploadLimit]]]
]
ON_MEDIA_UPLOAD_LIMIT_EXCEEDED_CALLBACK = Callable[
[str, MediaUploadLimit, int, int], Awaitable[None]
]
class MediaRepositoryModuleApiCallbacks:
def __init__(self, hs: "HomeServer") -> None:
@@ -39,6 +48,12 @@ class MediaRepositoryModuleApiCallbacks:
self._is_user_allowed_to_upload_media_of_size_callbacks: List[
IS_USER_ALLOWED_TO_UPLOAD_MEDIA_OF_SIZE_CALLBACK
] = []
self._get_media_upload_limits_for_user_callbacks: List[
GET_MEDIA_UPLOAD_LIMITS_FOR_USER_CALLBACK
] = []
self._on_media_upload_limit_exceeded_callbacks: List[
ON_MEDIA_UPLOAD_LIMIT_EXCEEDED_CALLBACK
] = []
def register_callbacks(
self,
@@ -46,6 +61,12 @@ class MediaRepositoryModuleApiCallbacks:
is_user_allowed_to_upload_media_of_size: Optional[
IS_USER_ALLOWED_TO_UPLOAD_MEDIA_OF_SIZE_CALLBACK
] = None,
get_media_upload_limits_for_user: Optional[
GET_MEDIA_UPLOAD_LIMITS_FOR_USER_CALLBACK
] = None,
on_media_upload_limit_exceeded: Optional[
ON_MEDIA_UPLOAD_LIMIT_EXCEEDED_CALLBACK
] = None,
) -> None:
"""Register callbacks from module for each hook."""
if get_media_config_for_user is not None:
@@ -56,6 +77,16 @@ class MediaRepositoryModuleApiCallbacks:
is_user_allowed_to_upload_media_of_size
)
if get_media_upload_limits_for_user is not None:
self._get_media_upload_limits_for_user_callbacks.append(
get_media_upload_limits_for_user
)
if on_media_upload_limit_exceeded is not None:
self._on_media_upload_limit_exceeded_callbacks.append(
on_media_upload_limit_exceeded
)
async def get_media_config_for_user(self, user_id: str) -> Optional[JsonDict]:
for callback in self._get_media_config_for_user_callbacks:
with Measure(
@@ -83,3 +114,47 @@ class MediaRepositoryModuleApiCallbacks:
return res
return True
async def get_media_upload_limits_for_user(
self, user_id: str
) -> Optional[List[MediaUploadLimit]]:
"""
Get the first non-None list of MediaUploadLimits for the user from the registered callbacks.
If a list is returned it will be sorted in descending order of duration.
"""
for callback in self._get_media_upload_limits_for_user_callbacks:
with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
res: Optional[List[MediaUploadLimit]] = await delay_cancellation(
callback(user_id)
)
if res is not None: # to allow [] to be returned meaning no limit
# We sort them in descending order of time period
res.sort(key=lambda limit: limit.time_period_ms, reverse=True)
return res
return None
async def on_media_upload_limit_exceeded(
self,
user_id: str,
limit: MediaUploadLimit,
sent_bytes: int,
attempted_bytes: int,
) -> None:
for callback in self._on_media_upload_limit_exceeded_callbacks:
with Measure(
self.clock,
name=f"{callback.__module__}.{callback.__qualname__}",
server_name=self.server_name,
):
# Use a copy of the data in case the module modifies it
limit_copy = MediaUploadLimit(
max_bytes=limit.max_bytes, time_period_ms=limit.time_period_ms
)
await delay_cancellation(
callback(user_id, limit_copy, sent_bytes, attempted_bytes)
)

View File

@@ -532,6 +532,7 @@ class Notifier:
StreamKeyType.TO_DEVICE,
StreamKeyType.TYPING,
StreamKeyType.UN_PARTIAL_STATED_ROOMS,
StreamKeyType.THREAD_SUBSCRIPTIONS,
],
new_token: int,
users: Optional[Collection[Union[str, UserID]]] = None,

View File

@@ -91,7 +91,7 @@ def _rule_to_template(rule: PushRule) -> Optional[Dict[str, Any]]:
unscoped_rule_id = _rule_id_from_namespaced(rule.rule_id)
template_name = _priority_class_to_template_name(rule.priority_class)
if template_name in ["override", "underride"]:
if template_name in ["override", "underride", "postcontent"]:
templaterule = {"conditions": rule.conditions, "actions": rule.actions}
elif template_name in ["sender", "room"]:
templaterule = {"actions": rule.actions}

View File

@@ -19,10 +19,14 @@
#
#
# Integer literals for push rule `kind`s
# This is used to store them in the database.
PRIORITY_CLASS_MAP = {
"underride": 1,
"sender": 2,
"room": 3,
# MSC4306
"postcontent": 6,
"content": 4,
"override": 5,
}

View File

@@ -44,6 +44,7 @@ from synapse.replication.tcp.streams import (
UnPartialStatedEventStream,
UnPartialStatedRoomStream,
)
from synapse.replication.tcp.streams._base import ThreadSubscriptionsStream
from synapse.replication.tcp.streams.events import (
EventsStream,
EventsStreamEventRow,
@@ -255,6 +256,12 @@ class ReplicationDataHandler:
self._state_storage_controller.notify_event_un_partial_stated(
row.event_id
)
elif stream_name == ThreadSubscriptionsStream.NAME:
self.notifier.on_new_event(
StreamKeyType.THREAD_SUBSCRIPTIONS,
token,
users=[row.user_id for row in rows],
)
await self._presence_handler.process_replication_rows(
stream_name, instance_name, token, rows

View File

@@ -92,9 +92,9 @@ class ExperimentalFeaturesRestServlet(RestServlet):
user_features = {}
for feature in ExperimentalFeature:
if feature in enabled_features:
user_features[feature] = True
user_features[feature.value] = True
else:
user_features[feature] = False
user_features[feature.value] = False
return HTTPStatus.OK, {"features": user_features}
async def on_PUT(

View File

@@ -109,6 +109,12 @@ class ProfileFieldRestServlet(RestServlet):
self.hs = hs
self.profile_handler = hs.get_profile_handler()
self.auth = hs.get_auth()
if hs.config.experimental.msc4133_enabled:
self.PATTERNS.append(
re.compile(
r"^/_matrix/client/unstable/uk\.tcpip\.msc4133/profile/(?P<user_id>[^/]*)/(?P<field_name>[^/]*)"
)
)
async def on_GET(
self, request: SynapseRequest, user_id: str, field_name: str

View File

@@ -19,9 +19,11 @@
#
#
from http import HTTPStatus
from typing import TYPE_CHECKING, List, Tuple, Union
from synapse.api.errors import (
Codes,
NotFoundError,
StoreError,
SynapseError,
@@ -239,6 +241,15 @@ def _rule_spec_from_path(path: List[str]) -> RuleSpec:
def _rule_tuple_from_request_object(
rule_template: str, rule_id: str, req_obj: JsonDict
) -> Tuple[List[JsonDict], List[Union[str, JsonDict]]]:
if rule_template == "postcontent":
# postcontent is from MSC4306, which says that clients
# cannot create their own postcontent rules right now.
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"user-defined rules using `postcontent` are not accepted",
errcode=Codes.INVALID_PARAM,
)
if rule_template in ["override", "underride"]:
if "conditions" not in req_obj:
raise InvalidRuleException("Missing 'conditions'")

View File

@@ -23,6 +23,8 @@ import logging
from collections import defaultdict
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple, Union
import attr
from synapse.api.constants import AccountDataTypes, EduTypes, Membership, PresenceState
from synapse.api.errors import Codes, StoreError, SynapseError
from synapse.api.filtering import FilterCollection
@@ -42,7 +44,6 @@ from synapse.handlers.sync import (
KnockedSyncResult,
SyncConfig,
SyncResult,
SyncVersion,
)
from synapse.http.server import HttpServer
from synapse.http.servlet import (
@@ -267,7 +268,6 @@ class SyncRestServlet(RestServlet):
sync_result = await self.sync_handler.wait_for_sync_for_user(
requester,
sync_config,
SyncVersion.SYNC_V2,
request_key,
since_token=since_token,
timeout=timeout,
@@ -632,185 +632,23 @@ class SyncRestServlet(RestServlet):
return result
class SlidingSyncE2eeRestServlet(RestServlet):
"""
API endpoint for MSC3575 Sliding Sync `/sync/e2ee`. This is being introduced as part
of Sliding Sync but doesn't have any sliding window component. It's just a way to
get E2EE events without having to sit through a big initial sync (`/sync` v2). And
we can avoid encryption events being backed up by the main sync response.
Having To-Device messages split out to this sync endpoint also helps when clients
need to have 2 or more sync streams open at a time, e.g a push notification process
and a main process. This can cause the two processes to race to fetch the To-Device
events, resulting in the need for complex synchronisation rules to ensure the token
is correctly and atomically exchanged between processes.
GET parameters::
timeout(int): How long to wait for new events in milliseconds.
since(batch_token): Batch token when asking for incremental deltas.
Response JSON::
{
"next_batch": // batch token for the next /sync
"to_device": {
// list of to-device events
"events": [
{
"content: { "algorithm": "m.olm.v1.curve25519-aes-sha2", "ciphertext": { ... }, "org.matrix.msgid": "abcd", "session_id": "abcd" },
"type": "m.room.encrypted",
"sender": "@alice:example.com",
}
// ...
]
},
"device_lists": {
"changed": ["@alice:example.com"],
"left": ["@bob:example.com"]
},
"device_one_time_keys_count": {
"signed_curve25519": 50
},
"device_unused_fallback_key_types": [
"signed_curve25519"
]
}
"""
PATTERNS = client_patterns(
"/org.matrix.msc3575/sync/e2ee$", releases=[], v1=False, unstable=True
)
def __init__(self, hs: "HomeServer"):
super().__init__()
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
self.sync_handler = hs.get_sync_handler()
# Filtering only matters for the `device_lists` because it requires a bunch of
# derived information from rooms (see how `_generate_sync_entry_for_rooms()`
# prepares a bunch of data for `_generate_sync_entry_for_device_list()`).
self.only_member_events_filter_collection = FilterCollection(
self.hs,
{
"room": {
# We only care about membership events for the `device_lists`.
# Membership will tell us whether a user has joined/left a room and
# if there are new devices to encrypt for.
"timeline": {
"types": ["m.room.member"],
},
"state": {
"types": ["m.room.member"],
},
# We don't want any extra account_data generated because it's not
# returned by this endpoint. This helps us avoid work in
# `_generate_sync_entry_for_rooms()`
"account_data": {
"not_types": ["*"],
},
# We don't want any extra ephemeral data generated because it's not
# returned by this endpoint. This helps us avoid work in
# `_generate_sync_entry_for_rooms()`
"ephemeral": {
"not_types": ["*"],
},
},
# We don't want any extra account_data generated because it's not
# returned by this endpoint. (This is just here for good measure)
"account_data": {
"not_types": ["*"],
},
# We don't want any extra presence data generated because it's not
# returned by this endpoint. (This is just here for good measure)
"presence": {
"not_types": ["*"],
},
},
)
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req_experimental_feature(
request, allow_guest=True, feature=ExperimentalFeature.MSC3575
)
user = requester.user
device_id = requester.device_id
timeout = parse_integer(request, "timeout", default=0)
since = parse_string(request, "since")
sync_config = SyncConfig(
user=user,
filter_collection=self.only_member_events_filter_collection,
is_guest=requester.is_guest,
device_id=device_id,
use_state_after=False, # We don't return any rooms so this flag is a no-op
)
since_token = None
if since is not None:
since_token = await StreamToken.from_string(self.store, since)
# Request cache key
request_key = (
SyncVersion.E2EE_SYNC,
user,
timeout,
since,
)
# Gather data for the response
sync_result = await self.sync_handler.wait_for_sync_for_user(
requester,
sync_config,
SyncVersion.E2EE_SYNC,
request_key,
since_token=since_token,
timeout=timeout,
full_state=False,
)
# The client may have disconnected by now; don't bother to serialize the
# response if so.
if request._disconnected:
logger.info("Client has disconnected; not serializing response.")
return 200, {}
response: JsonDict = defaultdict(dict)
response["next_batch"] = await sync_result.next_batch.to_string(self.store)
if sync_result.to_device:
response["to_device"] = {"events": sync_result.to_device}
if sync_result.device_lists.changed:
response["device_lists"]["changed"] = list(sync_result.device_lists.changed)
if sync_result.device_lists.left:
response["device_lists"]["left"] = list(sync_result.device_lists.left)
# We always include this because https://github.com/vector-im/element-android/issues/3725
# The spec isn't terribly clear on when this can be omitted and how a client would tell
# the difference between "no keys present" and "nothing changed" in terms of whole field
# absent / individual key type entry absent
# Corresponding synapse issue: https://github.com/matrix-org/synapse/issues/10456
response["device_one_time_keys_count"] = sync_result.device_one_time_keys_count
# https://github.com/matrix-org/matrix-doc/blob/54255851f642f84a4f1aaf7bc063eebe3d76752b/proposals/2732-olm-fallback-keys.md
# states that this field should always be included, as long as the server supports the feature.
response["device_unused_fallback_key_types"] = (
sync_result.device_unused_fallback_key_types
)
return 200, response
class SlidingSyncRestServlet(RestServlet):
"""
API endpoint for MSC3575 Sliding Sync `/sync`. Allows for clients to request a
API endpoint for MSC4186 Simplified Sliding Sync `/sync`, which was historically derived
from MSC3575 (Sliding Sync; now abandoned). Allows for clients to request a
subset (sliding window) of rooms, state, and timeline events (just what they need)
in order to bootstrap quickly and subscribe to only what the client cares about.
Because the client can specify what it cares about, we can respond quickly and skip
all of the work we would normally have to do with a sync v2 response.
Extensions of various features are defined in:
- to-device messaging (MSC3885)
- end-to-end encryption (MSC3884)
- typing notifications (MSC3961)
- receipts (MSC3960)
- account data (MSC3959)
- thread subscriptions (MSC4308)
Request query parameters:
timeout: How long to wait for new events in milliseconds.
pos: Stream position token when asking for incremental deltas.
@@ -1247,11 +1085,49 @@ class SlidingSyncRestServlet(RestServlet):
"rooms": extensions.typing.room_id_to_typing_map,
}
# excludes both None and falsy `thread_subscriptions`
if extensions.thread_subscriptions:
serialized_extensions["io.element.msc4308.thread_subscriptions"] = (
_serialise_thread_subscriptions(extensions.thread_subscriptions)
)
return serialized_extensions
def _serialise_thread_subscriptions(
thread_subscriptions: SlidingSyncResult.Extensions.ThreadSubscriptionsExtension,
) -> JsonDict:
out: JsonDict = {}
if thread_subscriptions.subscribed:
out["subscribed"] = {
room_id: {
thread_root_id: attr.asdict(
change, filter=lambda _attr, v: v is not None
)
for thread_root_id, change in room_threads.items()
}
for room_id, room_threads in thread_subscriptions.subscribed.items()
}
if thread_subscriptions.unsubscribed:
out["unsubscribed"] = {
room_id: {
thread_root_id: attr.asdict(
change, filter=lambda _attr, v: v is not None
)
for thread_root_id, change in room_threads.items()
}
for room_id, room_threads in thread_subscriptions.unsubscribed.items()
}
if thread_subscriptions.prev_batch:
out["prev_batch"] = thread_subscriptions.prev_batch.to_string()
return out
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
SyncRestServlet(hs).register(http_server)
SlidingSyncRestServlet(hs).register(http_server)
SlidingSyncE2eeRestServlet(hs).register(http_server)

View File

@@ -1,21 +1,39 @@
from http import HTTPStatus
from typing import TYPE_CHECKING, Optional, Tuple
from typing import TYPE_CHECKING, Dict, Optional, Tuple
import attr
from typing_extensions import TypeAlias
from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.http.server import HttpServer
from synapse.http.servlet import (
RestServlet,
parse_and_validate_json_object_from_request,
parse_integer,
parse_string,
)
from synapse.http.site import SynapseRequest
from synapse.rest.client._base import client_patterns
from synapse.types import JsonDict, RoomID
from synapse.types import (
JsonDict,
RoomID,
SlidingSyncStreamToken,
ThreadSubscriptionsToken,
)
from synapse.types.handlers.sliding_sync import SlidingSyncResult
from synapse.types.rest import RequestBodyModel
from synapse.util.pydantic_models import AnyEventId
if TYPE_CHECKING:
from synapse.server import HomeServer
_ThreadSubscription: TypeAlias = (
SlidingSyncResult.Extensions.ThreadSubscriptionsExtension.ThreadSubscription
)
_ThreadUnsubscription: TypeAlias = (
SlidingSyncResult.Extensions.ThreadSubscriptionsExtension.ThreadUnsubscription
)
class ThreadSubscriptionsRestServlet(RestServlet):
PATTERNS = client_patterns(
@@ -100,6 +118,130 @@ class ThreadSubscriptionsRestServlet(RestServlet):
return HTTPStatus.OK, {}
class ThreadSubscriptionsPaginationRestServlet(RestServlet):
PATTERNS = client_patterns(
"/io.element.msc4308/thread_subscriptions$",
unstable=True,
releases=(),
)
CATEGORY = "Thread Subscriptions requests (unstable)"
# Maximum number of thread subscriptions to return in one request.
MAX_LIMIT = 512
def __init__(self, hs: "HomeServer"):
self.auth = hs.get_auth()
self.is_mine = hs.is_mine
self.store = hs.get_datastores().main
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
limit = min(
parse_integer(request, "limit", default=100, negative=False),
ThreadSubscriptionsPaginationRestServlet.MAX_LIMIT,
)
from_end_opt = parse_string(request, "from", required=False)
to_start_opt = parse_string(request, "to", required=False)
_direction = parse_string(request, "dir", required=True, allowed_values=("b",))
if limit <= 0:
# condition needed because `negative=False` still allows 0
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"limit must be greater than 0",
errcode=Codes.INVALID_PARAM,
)
if from_end_opt is not None:
try:
# because of backwards pagination, the `from` token is actually the
# bound closest to the end of the stream
end_stream_id = ThreadSubscriptionsToken.from_string(
from_end_opt
).stream_id
except ValueError:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"`from` is not a valid token",
errcode=Codes.INVALID_PARAM,
)
else:
end_stream_id = self.store.get_max_thread_subscriptions_stream_id()
if to_start_opt is not None:
# because of backwards pagination, the `to` token is actually the
# bound closest to the start of the stream
try:
start_stream_id = ThreadSubscriptionsToken.from_string(
to_start_opt
).stream_id
except ValueError:
# we also accept sliding sync `pos` tokens on this parameter
try:
sliding_sync_pos = await SlidingSyncStreamToken.from_string(
self.store, to_start_opt
)
start_stream_id = (
sliding_sync_pos.stream_token.thread_subscriptions_key
)
except ValueError:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"`to` is not a valid token",
errcode=Codes.INVALID_PARAM,
)
else:
# the start of time is ID 1; the lower bound is exclusive though
start_stream_id = 0
subscriptions = (
await self.store.get_latest_updated_thread_subscriptions_for_user(
requester.user.to_string(),
from_id=start_stream_id,
to_id=end_stream_id,
limit=limit,
)
)
subscribed_threads: Dict[str, Dict[str, JsonDict]] = {}
unsubscribed_threads: Dict[str, Dict[str, JsonDict]] = {}
for stream_id, room_id, thread_root_id, subscribed, automatic in subscriptions:
if subscribed:
subscribed_threads.setdefault(room_id, {})[thread_root_id] = (
attr.asdict(
_ThreadSubscription(
automatic=automatic,
bump_stamp=stream_id,
)
)
)
else:
unsubscribed_threads.setdefault(room_id, {})[thread_root_id] = (
attr.asdict(_ThreadUnsubscription(bump_stamp=stream_id))
)
result: JsonDict = {}
if subscribed_threads:
result["subscribed"] = subscribed_threads
if unsubscribed_threads:
result["unsubscribed"] = unsubscribed_threads
if len(subscriptions) == limit:
# We hit the limit, so there might be more entries to return.
# Generate a new token that has moved backwards, ready for the next
# request.
min_returned_stream_id, _, _, _, _ = subscriptions[0]
result["end"] = ThreadSubscriptionsToken(
# We subtract one because the 'later in the stream' bound is inclusive,
# and we already saw the element at index 0.
stream_id=min_returned_stream_id - 1
).to_string()
return HTTPStatus.OK, result
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
if hs.config.experimental.msc4306_enabled:
ThreadSubscriptionsRestServlet(hs).register(http_server)
ThreadSubscriptionsPaginationRestServlet(hs).register(http_server)

View File

@@ -175,6 +175,7 @@ class VersionsRestServlet(RestServlet):
"org.matrix.simplified_msc3575": msc3575_enabled,
# Arbitrary key-value profile fields.
"uk.tcpip.msc4133": self.config.experimental.msc4133_enabled,
"uk.tcpip.msc4133.stable": True,
# MSC4155: Invite filtering
"org.matrix.msc4155": self.config.experimental.msc4155_enabled,
# MSC4306: Support for thread subscriptions

View File

@@ -366,12 +366,6 @@ class HomeServer(metaclass=abc.ABCMeta):
self.datastores = Databases(self.DATASTORE_CLASS, self)
logger.info("Finished setting up.")
# Register background tasks required by this server. This must be done
# somewhat manually due to the background tasks not being registered
# unless handlers are instantiated.
if self.config.worker.run_background_tasks:
self.setup_background_tasks()
def __del__(self) -> None:
"""
Called when an the homeserver is garbage collected.
@@ -410,7 +404,7 @@ class HomeServer(metaclass=abc.ABCMeta):
appropriate listeners.
"""
def setup_background_tasks(self) -> None:
def start_background_tasks(self) -> None:
"""
Some handlers have side effects on instantiation (like registering
background updates). This function causes them to be fetched, and

View File

@@ -2653,8 +2653,7 @@ def make_in_list_sql_clause(
# These overloads ensure that `columns` and `iterable` values have the same length.
# Suppress "Single overload definition, multiple required" complaint.
@overload # type: ignore[misc]
@overload
def make_tuple_in_list_sql_clause(
database_engine: BaseDatabaseEngine,
columns: Tuple[str, str],
@@ -2662,6 +2661,14 @@ def make_tuple_in_list_sql_clause(
) -> Tuple[str, list]: ...
@overload
def make_tuple_in_list_sql_clause(
database_engine: BaseDatabaseEngine,
columns: Tuple[str, str, str],
iterable: Collection[Tuple[Any, Any, Any]],
) -> Tuple[str, list]: ...
def make_tuple_in_list_sql_clause(
database_engine: BaseDatabaseEngine,
columns: Tuple[str, ...],

View File

@@ -21,6 +21,7 @@
import itertools
import json
import logging
from typing import TYPE_CHECKING, Any, Collection, Iterable, List, Optional, Tuple
@@ -62,6 +63,12 @@ PURGE_HISTORY_CACHE_NAME = "ph_cache_fake"
# As above, but for invalidating room caches on room deletion
DELETE_ROOM_CACHE_NAME = "dr_cache_fake"
# This cache takes a list of tuples as its first argument, which requires
# special handling.
GET_E2E_CROSS_SIGNING_SIGNATURES_FOR_DEVICE_CACHE_NAME = (
"_get_e2e_cross_signing_signatures_for_device"
)
# How long between cache invalidation table cleanups, once we have caught up
# with the backlog.
REGULAR_CLEANUP_INTERVAL_MS = Config.parse_duration("1h")
@@ -270,6 +277,33 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
# room membership.
#
# self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined]
elif (
row.cache_func
== GET_E2E_CROSS_SIGNING_SIGNATURES_FOR_DEVICE_CACHE_NAME
):
# "keys" is a list of strings, where each string is a
# JSON-encoded representation of the tuple keys, i.e.
# keys: ['["@userid:domain", "DEVICEID"]','["@userid2:domain", "DEVICEID2"]']
#
# This is a side-effect of not being able to send nested
# information over replication.
for json_str in row.keys:
try:
user_id, device_id = json.loads(json_str)
except (json.JSONDecodeError, TypeError):
logger.error(
"Failed to deserialise cache key as valid JSON: %s",
json_str,
)
continue
# Invalidate each key.
#
# Note: .invalidate takes a tuple of arguments, hence the need
# to nest our tuple in another tuple.
self._get_e2e_cross_signing_signatures_for_device.invalidate( # type: ignore[attr-defined]
((user_id, device_id),)
)
else:
self._attempt_to_invalidate_cache(row.cache_func, row.keys)

View File

@@ -20,6 +20,7 @@
#
#
import abc
import json
from typing import (
TYPE_CHECKING,
Any,
@@ -354,15 +355,17 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
)
for batch in batch_iter(signature_query, 50):
cross_sigs_result = await self.db_pool.runInteraction(
"get_e2e_cross_signing_signatures_for_devices",
self._get_e2e_cross_signing_signatures_for_devices_txn,
batch,
cross_sigs_result = (
await self._get_e2e_cross_signing_signatures_for_devices(batch)
)
# add each cross-signing signature to the correct device in the result dict.
for user_id, key_id, device_id, signature in cross_sigs_result:
for (
user_id,
device_id,
), signature_list in cross_sigs_result.items():
target_device_result = result[user_id][device_id]
# We've only looked up cross-signatures for non-deleted devices with key
# data.
assert target_device_result is not None
@@ -373,7 +376,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
signing_user_signatures = target_device_signatures.setdefault(
user_id, {}
)
signing_user_signatures[key_id] = signature
for key_id, signature in signature_list:
signing_user_signatures[key_id] = signature
log_kv(result)
return result
@@ -479,41 +484,83 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
return result
def _get_e2e_cross_signing_signatures_for_devices_txn(
self, txn: LoggingTransaction, device_query: Iterable[Tuple[str, str]]
) -> List[Tuple[str, str, str, str]]:
"""Get cross-signing signatures for a given list of devices
Returns signatures made by the owners of the devices.
Returns: a list of results; each entry in the list is a tuple of
(user_id, key_id, target_device_id, signature).
@cached()
def _get_e2e_cross_signing_signatures_for_device(
self,
user_id_and_device_id: Tuple[str, str],
) -> Sequence[Tuple[str, str]]:
"""
signature_query_clauses = []
signature_query_params = []
The single-item version of `_get_e2e_cross_signing_signatures_for_devices`.
See @cachedList for why a separate method is needed.
"""
raise NotImplementedError()
for user_id, device_id in device_query:
signature_query_clauses.append(
"target_user_id = ? AND target_device_id = ? AND user_id = ?"
@cachedList(
cached_method_name="_get_e2e_cross_signing_signatures_for_device",
list_name="device_query",
)
async def _get_e2e_cross_signing_signatures_for_devices(
self, device_query: Iterable[Tuple[str, str]]
) -> Mapping[Tuple[str, str], Sequence[Tuple[str, str]]]:
"""Get cross-signing signatures for a given list of user IDs and devices.
Args:
An iterable containing tuples of (user ID, device ID).
Returns:
A mapping of results. The keys are the original (user_id, device_id)
tuple, while the value is the matching list of tuples of
(key_id, signature). The value will be an empty list if no
signatures exist for the device.
Given this method is annotated with `@cachedList`, the return dict's
keys match the tuples within `device_query`, so that cache entries can
be computed from the corresponding values.
As results are cached, the return type is immutable.
"""
def _get_e2e_cross_signing_signatures_for_devices_txn(
txn: LoggingTransaction, device_query: Iterable[Tuple[str, str]]
) -> Mapping[Tuple[str, str], Sequence[Tuple[str, str]]]:
where_clause_sql, where_clause_params = make_tuple_in_list_sql_clause(
self.database_engine,
columns=("target_user_id", "target_device_id", "user_id"),
iterable=[
(user_id, device_id, user_id) for user_id, device_id in device_query
],
)
signature_query_params.extend([user_id, device_id, user_id])
signature_sql = """
SELECT user_id, key_id, target_device_id, signature
FROM e2e_cross_signing_signatures WHERE %s
""" % (" OR ".join("(" + q + ")" for q in signature_query_clauses))
signature_sql = f"""
SELECT user_id, key_id, target_device_id, signature
FROM e2e_cross_signing_signatures WHERE {where_clause_sql}
"""
txn.execute(signature_sql, signature_query_params)
return cast(
List[
Tuple[
str,
str,
str,
str,
]
],
txn.fetchall(),
txn.execute(signature_sql, where_clause_params)
devices_and_signatures: Dict[Tuple[str, str], List[Tuple[str, str]]] = {}
# `@cachedList` requires we return one key for every item in `device_query`.
# Pre-populate `devices_and_signatures` with each key so that none are missing.
#
# If any are missing, they will be cached as `None`, which is not
# what callers expected.
for user_id, device_id in device_query:
devices_and_signatures.setdefault((user_id, device_id), [])
# Populate the return dictionary with each found key_id and signature.
for user_id, key_id, target_device_id, signature in txn.fetchall():
signature_tuple = (key_id, signature)
devices_and_signatures[(user_id, target_device_id)].append(
signature_tuple
)
return devices_and_signatures
return await self.db_pool.runInteraction(
"_get_e2e_cross_signing_signatures_for_devices_txn",
_get_e2e_cross_signing_signatures_for_devices_txn,
device_query,
)
async def get_e2e_one_time_keys(
@@ -1772,26 +1819,71 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
user_id: the user who made the signatures
signatures: signatures to add
"""
await self.db_pool.simple_insert_many(
"e2e_cross_signing_signatures",
keys=(
"user_id",
"key_id",
"target_user_id",
"target_device_id",
"signature",
),
values=[
(
user_id,
item.signing_key_id,
item.target_user_id,
item.target_device_id,
item.signature,
)
def _store_e2e_cross_signing_signatures(
txn: LoggingTransaction,
signatures: "Iterable[SignatureListItem]",
) -> None:
self.db_pool.simple_insert_many_txn(
txn,
"e2e_cross_signing_signatures",
keys=(
"user_id",
"key_id",
"target_user_id",
"target_device_id",
"signature",
),
values=[
(
user_id,
item.signing_key_id,
item.target_user_id,
item.target_device_id,
item.signature,
)
for item in signatures
],
)
to_invalidate = [
# Each entry is a tuple of arguments to
# `_get_e2e_cross_signing_signatures_for_device`, which
# itself takes a tuple. Hence the double-tuple.
((user_id, item.target_device_id),)
for item in signatures
],
desc="add_e2e_signing_key",
]
if to_invalidate:
# Invalidate the local cache of this worker.
for cache_key in to_invalidate:
txn.call_after(
self._get_e2e_cross_signing_signatures_for_device.invalidate,
cache_key,
)
# Stream cache invalidate keys over replication.
#
# We can only send a primitive per function argument across
# replication.
#
# Encode the array of strings as a JSON string, and we'll unpack
# it on the other side.
to_send = [
(json.dumps([user_id, item.target_device_id]),)
for item in signatures
]
self._send_invalidation_to_replication_bulk(
txn,
cache_name=self._get_e2e_cross_signing_signatures_for_device.__name__,
key_tuples=to_send,
)
await self.db_pool.runInteraction(
"add_e2e_signing_key",
_store_e2e_cross_signing_signatures,
signatures,
)

View File

@@ -53,7 +53,7 @@ from synapse.storage.databases.main.stream import (
generate_pagination_where_clause,
)
from synapse.storage.engines import PostgresEngine
from synapse.types import JsonDict, MultiWriterStreamToken, StreamKeyType, StreamToken
from synapse.types import JsonDict, StreamKeyType, StreamToken
from synapse.util.caches.descriptors import cached, cachedList
if TYPE_CHECKING:
@@ -316,17 +316,8 @@ class RelationsWorkerStore(SQLBaseStore):
StreamKeyType.ROOM, next_key
)
else:
next_token = StreamToken(
room_key=next_key,
presence_key=0,
typing_key=0,
receipt_key=MultiWriterStreamToken(stream=0),
account_data_key=0,
push_rules_key=0,
to_device_key=0,
device_list_key=MultiWriterStreamToken(stream=0),
groups_key=0,
un_partial_stated_rooms_key=0,
next_token = StreamToken.START.copy_and_replace(
StreamKeyType.ROOM, next_key
)
return events[:limit], next_token

View File

@@ -492,7 +492,7 @@ class PerConnectionStateDB:
"""An equivalent to `PerConnectionState` that holds data in a format stored
in the DB.
The principle difference is that the tokens for the different streams are
The principal difference is that the tokens for the different streams are
serialized to strings.
When persisting this *only* contains updates to the state.

View File

@@ -505,6 +505,9 @@ class ThreadSubscriptionsWorkerStore(CacheInvalidationWorkerStore):
"""
return self._thread_subscriptions_id_gen.get_current_token()
def get_thread_subscriptions_stream_id_generator(self) -> MultiWriterIdGenerator:
return self._thread_subscriptions_id_gen
async def get_updated_thread_subscriptions(
self, *, from_id: int, to_id: int, limit: int
) -> List[Tuple[int, str, str, str]]:
@@ -538,34 +541,52 @@ class ThreadSubscriptionsWorkerStore(CacheInvalidationWorkerStore):
get_updated_thread_subscriptions_txn,
)
async def get_updated_thread_subscriptions_for_user(
async def get_latest_updated_thread_subscriptions_for_user(
self, user_id: str, *, from_id: int, to_id: int, limit: int
) -> List[Tuple[int, str, str]]:
"""Get updates to thread subscriptions for a specific user.
) -> List[Tuple[int, str, str, bool, Optional[bool]]]:
"""Get the latest updates to thread subscriptions for a specific user.
Args:
user_id: The ID of the user
from_id: The starting stream ID (exclusive)
to_id: The ending stream ID (inclusive)
limit: The maximum number of rows to return
If there are too many rows to return, rows from the start (closer to `from_id`)
will be omitted.
Returns:
A list of (stream_id, room_id, thread_root_event_id) tuples.
A list of (stream_id, room_id, thread_root_event_id, subscribed, automatic) tuples.
The row with lowest `stream_id` is the first row.
"""
def get_updated_thread_subscriptions_for_user_txn(
txn: LoggingTransaction,
) -> List[Tuple[int, str, str]]:
) -> List[Tuple[int, str, str, bool, Optional[bool]]]:
sql = """
SELECT stream_id, room_id, event_id
FROM thread_subscriptions
WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
WITH the_updates AS (
SELECT stream_id, room_id, event_id, subscribed, automatic
FROM thread_subscriptions
WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
ORDER BY stream_id DESC
LIMIT ?
)
SELECT stream_id, room_id, event_id, subscribed, automatic
FROM the_updates
ORDER BY stream_id ASC
LIMIT ?
"""
txn.execute(sql, (user_id, from_id, to_id, limit))
return [(row[0], row[1], row[2]) for row in txn]
return [
(
stream_id,
room_id,
event_id,
# SQLite integer to boolean conversions
bool(subscribed),
bool(automatic) if subscribed else None,
)
for (stream_id, room_id, event_id, subscribed, automatic) in txn
]
return await self.db_pool.runInteraction(
"get_updated_thread_subscriptions_for_user",

View File

@@ -0,0 +1,19 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- Work around https://github.com/element-hq/synapse/issues/18712 by advancing the
-- stream sequence.
-- This makes last_value of the sequence point to a position that will not get later
-- returned by nextval.
-- (For blank thread subscription streams, this means last_value = 2, nextval() = 3 after this line.)
SELECT nextval('thread_subscriptions_sequence');

View File

@@ -187,8 +187,12 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
Warning: Streams using this generator start at ID 2, because ID 1 is always assumed
to have been 'seen as persisted'.
Unclear if this extant behaviour is desirable for some reason.
When creating a new sequence for a new stream,
it will be necessary to use `START WITH 2`.
When creating a new sequence for a new stream, it will be necessary to advance it
so that position 1 is consumed.
DO NOT USE `START WITH 2` FOR THIS PURPOSE:
see https://github.com/element-hq/synapse/issues/18712
Instead, use `SELECT nextval('sequence_name');` immediately after the
`CREATE SEQUENCE` statement.
Args:
db_conn

View File

@@ -33,7 +33,6 @@ from synapse.logging.opentracing import trace
from synapse.streams import EventSource
from synapse.types import (
AbstractMultiWriterStreamToken,
MultiWriterStreamToken,
StreamKeyType,
StreamToken,
)
@@ -84,6 +83,7 @@ class EventSources:
un_partial_stated_rooms_key = self.store.get_un_partial_stated_rooms_token(
self._instance_name
)
thread_subscriptions_key = self.store.get_max_thread_subscriptions_stream_id()
token = StreamToken(
room_key=self.sources.room.get_current_key(),
@@ -97,6 +97,7 @@ class EventSources:
# Groups key is unused.
groups_key=0,
un_partial_stated_rooms_key=un_partial_stated_rooms_key,
thread_subscriptions_key=thread_subscriptions_key,
)
return token
@@ -123,6 +124,7 @@ class EventSources:
StreamKeyType.TO_DEVICE: self.store.get_to_device_id_generator(),
StreamKeyType.DEVICE_LIST: self.store.get_device_stream_id_generator(),
StreamKeyType.UN_PARTIAL_STATED_ROOMS: self.store.get_un_partial_stated_rooms_id_generator(),
StreamKeyType.THREAD_SUBSCRIPTIONS: self.store.get_thread_subscriptions_stream_id_generator(),
}
for _, key in StreamKeyType.__members__.items():
@@ -195,16 +197,7 @@ class EventSources:
Returns:
The current token for pagination.
"""
token = StreamToken(
room_key=await self.sources.room.get_current_key_for_room(room_id),
presence_key=0,
typing_key=0,
receipt_key=MultiWriterStreamToken(stream=0),
account_data_key=0,
push_rules_key=0,
to_device_key=0,
device_list_key=MultiWriterStreamToken(stream=0),
groups_key=0,
un_partial_stated_rooms_key=0,
return StreamToken.START.copy_and_replace(
StreamKeyType.ROOM,
await self.sources.room.get_current_key_for_room(room_id),
)
return token

View File

@@ -996,6 +996,7 @@ class StreamKeyType(Enum):
TO_DEVICE = "to_device_key"
DEVICE_LIST = "device_list_key"
UN_PARTIAL_STATED_ROOMS = "un_partial_stated_rooms_key"
THREAD_SUBSCRIPTIONS = "thread_subscriptions_key"
@attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -1003,7 +1004,7 @@ class StreamToken:
"""A collection of keys joined together by underscores in the following
order and which represent the position in their respective streams.
ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1_379`
ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1_379_4242`
1. `room_key`: `s2633508` which is a `RoomStreamToken`
- `RoomStreamToken`'s can also look like `t426-2633508` or `m56~2.58~3.59`
- See the docstring for `RoomStreamToken` for more details.
@@ -1016,6 +1017,7 @@ class StreamToken:
8. `device_list_key`: `265584`
9. `groups_key`: `1` (note that this key is now unused)
10. `un_partial_stated_rooms_key`: `379`
11. `thread_subscriptions_key`: 4242
You can see how many of these keys correspond to the various
fields in a "/sync" response:
@@ -1074,6 +1076,7 @@ class StreamToken:
# Note that the groups key is no longer used and may have bogus values.
groups_key: int
un_partial_stated_rooms_key: int
thread_subscriptions_key: int
_SEPARATOR = "_"
START: ClassVar["StreamToken"]
@@ -1101,6 +1104,7 @@ class StreamToken:
device_list_key,
groups_key,
un_partial_stated_rooms_key,
thread_subscriptions_key,
) = keys
return cls(
@@ -1116,6 +1120,7 @@ class StreamToken:
),
groups_key=int(groups_key),
un_partial_stated_rooms_key=int(un_partial_stated_rooms_key),
thread_subscriptions_key=int(thread_subscriptions_key),
)
except CancelledError:
raise
@@ -1138,6 +1143,7 @@ class StreamToken:
# if additional tokens are added.
str(self.groups_key),
str(self.un_partial_stated_rooms_key),
str(self.thread_subscriptions_key),
]
)
@@ -1202,6 +1208,7 @@ class StreamToken:
StreamKeyType.TO_DEVICE,
StreamKeyType.TYPING,
StreamKeyType.UN_PARTIAL_STATED_ROOMS,
StreamKeyType.THREAD_SUBSCRIPTIONS,
],
) -> int: ...
@@ -1257,7 +1264,8 @@ class StreamToken:
f"typing: {self.typing_key}, receipt: {self.receipt_key}, "
f"account_data: {self.account_data_key}, push_rules: {self.push_rules_key}, "
f"to_device: {self.to_device_key}, device_list: {self.device_list_key}, "
f"groups: {self.groups_key}, un_partial_stated_rooms: {self.un_partial_stated_rooms_key})"
f"groups: {self.groups_key}, un_partial_stated_rooms: {self.un_partial_stated_rooms_key},"
f"thread_subscriptions: {self.thread_subscriptions_key})"
)
@@ -1272,6 +1280,7 @@ StreamToken.START = StreamToken(
device_list_key=MultiWriterStreamToken(stream=0),
groups_key=0,
un_partial_stated_rooms_key=0,
thread_subscriptions_key=0,
)
@@ -1318,6 +1327,27 @@ class SlidingSyncStreamToken:
return f"{self.connection_position}/{stream_token_str}"
@attr.s(slots=True, frozen=True, auto_attribs=True)
class ThreadSubscriptionsToken:
"""
Token for a position in the thread subscriptions stream.
Format: `ts<stream_id>`
"""
stream_id: int
@staticmethod
def from_string(s: str) -> "ThreadSubscriptionsToken":
if not s.startswith("ts"):
raise ValueError("thread subscription token must start with `ts`")
return ThreadSubscriptionsToken(stream_id=int(s[2:]))
def to_string(self) -> str:
return f"ts{self.stream_id}"
@attr.s(slots=True, frozen=True, auto_attribs=True)
class PersistedPosition:
"""Position of a newly persisted row with instance that persisted it."""

View File

@@ -50,6 +50,7 @@ from synapse.types import (
SlidingSyncStreamToken,
StrCollection,
StreamToken,
ThreadSubscriptionsToken,
UserID,
)
from synapse.types.rest.client import SlidingSyncBody
@@ -357,11 +358,50 @@ class SlidingSyncResult:
def __bool__(self) -> bool:
return bool(self.room_id_to_typing_map)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class ThreadSubscriptionsExtension:
"""The Thread Subscriptions extension (MSC4308)
Attributes:
subscribed: map (room_id -> thread_root_id -> info) of new or changed subscriptions
unsubscribed: map (room_id -> thread_root_id -> info) of new unsubscriptions
prev_batch: if present, there is a gap and the client can use this token to backpaginate
"""
@attr.s(slots=True, frozen=True, auto_attribs=True)
class ThreadSubscription:
# always present when `subscribed`
automatic: Optional[bool]
# the same as our stream_id; useful for clients to resolve
# race conditions locally
bump_stamp: int
@attr.s(slots=True, frozen=True, auto_attribs=True)
class ThreadUnsubscription:
# the same as our stream_id; useful for clients to resolve
# race conditions locally
bump_stamp: int
# room_id -> event_id (of thread root) -> the subscription change
subscribed: Optional[Mapping[str, Mapping[str, ThreadSubscription]]]
# room_id -> event_id (of thread root) -> the unsubscription
unsubscribed: Optional[Mapping[str, Mapping[str, ThreadUnsubscription]]]
prev_batch: Optional[ThreadSubscriptionsToken]
def __bool__(self) -> bool:
return (
bool(self.subscribed)
or bool(self.unsubscribed)
or bool(self.prev_batch)
)
to_device: Optional[ToDeviceExtension] = None
e2ee: Optional[E2eeExtension] = None
account_data: Optional[AccountDataExtension] = None
receipts: Optional[ReceiptsExtension] = None
typing: Optional[TypingExtension] = None
thread_subscriptions: Optional[ThreadSubscriptionsExtension] = None
def __bool__(self) -> bool:
return bool(
@@ -370,6 +410,7 @@ class SlidingSyncResult:
or self.account_data
or self.receipts
or self.typing
or self.thread_subscriptions
)
next_pos: SlidingSyncStreamToken

View File

@@ -22,6 +22,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union
from synapse._pydantic_compat import (
Extra,
Field,
StrictBool,
StrictInt,
StrictStr,
@@ -364,11 +365,25 @@ class SlidingSyncBody(RequestBodyModel):
# Process all room subscriptions defined in the Room Subscription API. (This is the default.)
rooms: Optional[List[StrictStr]] = ["*"]
class ThreadSubscriptionsExtension(RequestBodyModel):
"""The Thread Subscriptions extension (MSC4308)
Attributes:
enabled
limit: maximum number of subscription changes to return (default 100)
"""
enabled: Optional[StrictBool] = False
limit: StrictInt = 100
to_device: Optional[ToDeviceExtension] = None
e2ee: Optional[E2eeExtension] = None
account_data: Optional[AccountDataExtension] = None
receipts: Optional[ReceiptsExtension] = None
typing: Optional[TypingExtension] = None
thread_subscriptions: Optional[ThreadSubscriptionsExtension] = Field(
alias="io.element.msc4308.thread_subscriptions"
)
conn_id: Optional[StrictStr]

View File

@@ -347,6 +347,7 @@ T2 = TypeVar("T2")
T3 = TypeVar("T3")
T4 = TypeVar("T4")
T5 = TypeVar("T5")
T6 = TypeVar("T6")
@overload
@@ -461,6 +462,23 @@ async def gather_optional_coroutines(
) -> Tuple[Optional[T1], Optional[T2], Optional[T3], Optional[T4], Optional[T5]]: ...
@overload
async def gather_optional_coroutines(
*coroutines: Unpack[
Tuple[
Optional[Coroutine[Any, Any, T1]],
Optional[Coroutine[Any, Any, T2]],
Optional[Coroutine[Any, Any, T3]],
Optional[Coroutine[Any, Any, T4]],
Optional[Coroutine[Any, Any, T5]],
Optional[Coroutine[Any, Any, T6]],
]
],
) -> Tuple[
Optional[T1], Optional[T2], Optional[T3], Optional[T4], Optional[T5], Optional[T6]
]: ...
async def gather_optional_coroutines(
*coroutines: Unpack[Tuple[Optional[Coroutine[Any, Any, T1]], ...]],
) -> Tuple[Optional[T1], ...]:

View File

@@ -579,9 +579,12 @@ def cachedList(
Used to do batch lookups for an already created cache. One of the arguments
is specified as a list that is iterated through to lookup keys in the
original cache. A new tuple consisting of the (deduplicated) keys that weren't in
the cache gets passed to the original function, which is expected to results
the cache gets passed to the original function, which is expected to result
in a map of key to value for each passed value. The new results are stored in the
original cache. Note that any missing values are cached as None.
original cache.
Note that any values in the input that end up being missing from both the
cache and the returned dictionary will be cached as `None`.
Args:
cached_method_name: The name of the single-item lookup method.

View File

@@ -29,6 +29,11 @@ import sys
from types import FrameType, TracebackType
from typing import NoReturn, Optional, Type
from synapse.logging.context import (
LoggingContext,
PreserveLoggingContext,
)
def daemonize_process(pid_file: str, logger: logging.Logger, chdir: str = "/") -> None:
"""daemonize the current process
@@ -64,8 +69,14 @@ def daemonize_process(pid_file: str, logger: logging.Logger, chdir: str = "/") -
pid_fh.write(old_pid)
sys.exit(1)
# Fork, creating a new process for the child.
process_id = os.fork()
# Stop the existing context *before* we fork the process. Otherwise the cputime
# metrics get confused about the per-thread resource usage appearing to go backwards
# because we're comparing the resource usage from the original process to the forked
# process. `PreserveLoggingContext` already takes care of restarting the original
# context *after* the block.
with PreserveLoggingContext():
# Fork, creating a new process for the child.
process_id = os.fork()
if process_id != 0:
# parent process: exit.
@@ -140,9 +151,10 @@ def daemonize_process(pid_file: str, logger: logging.Logger, chdir: str = "/") -
# Cleanup pid file at exit.
def exit() -> None:
logger.warning("Stopping daemon.")
os.remove(pid_file)
sys.exit(0)
with LoggingContext("atexit"):
logger.warning("Stopping daemon.")
os.remove(pid_file)
sys.exit(0)
atexit.register(exit)

View File

@@ -35,7 +35,7 @@ from synapse.config._base import RootConfig
from synapse.config.auto_accept_invites import AutoAcceptInvitesConfig
from synapse.events.auto_accept_invites import InviteAutoAccepter
from synapse.federation.federation_base import event_from_pdu_json
from synapse.handlers.sync import JoinedSyncResult, SyncRequestKey, SyncVersion
from synapse.handlers.sync import JoinedSyncResult, SyncRequestKey
from synapse.module_api import ModuleApi
from synapse.rest import admin
from synapse.rest.client import login, room
@@ -548,7 +548,6 @@ def sync_join(
testcase.hs.get_sync_handler().wait_for_sync_for_user(
requester,
sync_config,
SyncVersion.SYNC_V2,
generate_request_key(),
since_token,
)

View File

@@ -36,7 +36,7 @@ from synapse.server import HomeServer
from synapse.types import JsonDict, StreamToken, create_requester
from synapse.util import Clock
from tests.handlers.test_sync import SyncRequestKey, SyncVersion, generate_sync_config
from tests.handlers.test_sync import SyncRequestKey, generate_sync_config
from tests.unittest import (
FederatingHomeserverTestCase,
HomeserverTestCase,
@@ -532,7 +532,6 @@ def sync_presence(
testcase.hs.get_sync_handler().wait_for_sync_for_user(
requester,
sync_config,
SyncVersion.SYNC_V2,
generate_request_key(),
since_token,
)

View File

@@ -37,7 +37,6 @@ from synapse.handlers.sync import (
SyncConfig,
SyncRequestKey,
SyncResult,
SyncVersion,
TimelineBatch,
)
from synapse.rest import admin
@@ -113,7 +112,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
requester,
sync_config,
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -124,7 +122,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
requester,
sync_config,
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
),
ResourceLimitError,
@@ -142,7 +139,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
requester,
sync_config,
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
),
ResourceLimitError,
@@ -167,7 +163,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
sync_config=generate_sync_config(
user, device_id="dev", use_state_after=self.use_state_after
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -203,7 +198,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
sync_config=generate_sync_config(
user, use_state_after=self.use_state_after
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -218,7 +212,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
sync_config=generate_sync_config(
user, device_id="dev", use_state_after=self.use_state_after
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=initial_result.next_batch,
)
@@ -252,7 +245,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
sync_config=generate_sync_config(
user, use_state_after=self.use_state_after
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -267,7 +259,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
sync_config=generate_sync_config(
user, device_id="dev", use_state_after=self.use_state_after
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=initial_result.next_batch,
)
@@ -310,7 +301,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
create_requester(owner),
generate_sync_config(owner, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -336,7 +326,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
eve_requester,
eve_sync_config,
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -363,7 +352,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
eve_requester,
eve_sync_config,
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=eve_sync_after_ban.next_batch,
)
@@ -376,7 +364,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
eve_requester,
eve_sync_config,
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=None,
)
@@ -411,7 +398,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(alice, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -441,7 +427,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
),
use_state_after=self.use_state_after,
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=initial_sync_result.next_batch,
)
@@ -487,7 +472,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(alice, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -527,7 +511,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
),
use_state_after=self.use_state_after,
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=initial_sync_result.next_batch,
)
@@ -576,7 +559,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(alice, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -603,7 +585,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
),
use_state_after=self.use_state_after,
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=initial_sync_result.next_batch,
)
@@ -643,7 +624,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
),
use_state_after=self.use_state_after,
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=incremental_sync.next_batch,
)
@@ -717,7 +697,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(alice, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -743,7 +722,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
),
use_state_after=self.use_state_after,
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -769,7 +747,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(alice, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=initial_sync_result.next_batch,
)
@@ -833,7 +810,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
bob_requester,
generate_sync_config(bob, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -867,7 +843,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
filter_collection=FilterCollection(self.hs, filter_dict),
use_state_after=self.use_state_after,
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=None if initial_sync else initial_sync_result.next_batch,
)
@@ -967,7 +942,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
create_requester(user),
generate_sync_config(user, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -1016,7 +990,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
create_requester(user2),
generate_sync_config(user2, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -1042,7 +1015,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
create_requester(user),
generate_sync_config(user, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -1079,7 +1051,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
create_requester(user),
generate_sync_config(user, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=since_token,
timeout=0,
@@ -1134,7 +1105,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
create_requester(user),
generate_sync_config(user, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=since_token,
timeout=0,

View File

@@ -19,6 +19,7 @@
#
#
from http import HTTPStatus
from typing import Any, Optional
from unittest.mock import AsyncMock, patch
@@ -30,7 +31,7 @@ from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
from synapse.api.room_versions import RoomVersions
from synapse.push.bulk_push_rule_evaluator import BulkPushRuleEvaluator
from synapse.rest import admin
from synapse.rest.client import login, register, room
from synapse.rest.client import login, push_rule, register, room
from synapse.server import HomeServer
from synapse.types import JsonDict, create_requester
from synapse.util import Clock
@@ -44,6 +45,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
room.register_servlets,
login.register_servlets,
register.register_servlets,
push_rule.register_servlets,
]
def prepare(
@@ -494,6 +496,135 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
)
)
@override_config({"experimental_features": {"msc4306_enabled": True}})
def test_thread_subscriptions_suppression_after_keyword_mention_overrides(
self,
) -> None:
"""
Tests one of the purposes of the `postcontent` push rule section:
When a keyword mention is configured (in the `content` section),
it does not get suppressed by the thread being unsubscribed.
"""
# add a keyword mention to alice's push rules
channel = self.make_request(
"PUT",
"/_matrix/client/v3/pushrules/global/content/biscuits",
{"pattern": "biscuits", "actions": ["notify"]},
access_token=self.token,
)
self.assertEqual(channel.code, HTTPStatus.OK)
bulk_evaluator = BulkPushRuleEvaluator(self.hs)
(thread_root_id,) = self.helper.send_messages(self.room_id, 1, tok=self.token)
self.assertFalse(
self._create_and_process(
bulk_evaluator,
{
"msgtype": "m.text",
"body": "do you want some cookies?",
"m.relates_to": {
"rel_type": RelationTypes.THREAD,
"event_id": thread_root_id,
},
},
type="m.room.message",
),
"alice is not subscribed to thread and does not have a mention on 'cookies' so should not be notified",
)
self.assertTrue(
self._create_and_process(
bulk_evaluator,
{
"msgtype": "m.text",
"body": "biscuits are available in the kitchen",
"m.relates_to": {
"rel_type": RelationTypes.THREAD,
"event_id": thread_root_id,
},
},
type="m.room.message",
),
"alice is not subscribed to thread but DOES have a mention on 'biscuits' so should be notified",
)
@override_config({"experimental_features": {"msc4306_enabled": True}})
def test_thread_subscriptions_notification_before_keywords_and_mentions(
self,
) -> None:
"""
Tests one of the purposes of the `postcontent` push rule section:
When a room is set to (what is commonly known as) 'keywords & mentions', we still receive notifications
for messages in threads that we are subscribed to.
Effectively making this 'keywords, mentions & subscriptions'
"""
# add a 'keywords & mentions' setting to the room alice's push rules
# In case this rule isn't clear: by adding a rule in the `room` section that does nothing,
# it stops execution of the push rules before we fall through to the `underride` section,
# where intuitively many kinds of messages will ambiently generate notifications.
# Mentions and keywords are triggered before the `room` block, so this doesn't suppress those.
channel = self.make_request(
"PUT",
f"/_matrix/client/v3/pushrules/global/room/{self.room_id}",
{"actions": []},
access_token=self.token,
)
self.assertEqual(channel.code, HTTPStatus.OK)
bulk_evaluator = BulkPushRuleEvaluator(self.hs)
(thread_root_id,) = self.helper.send_messages(self.room_id, 1, tok=self.token)
# sanity check that our mentions still work
self.assertFalse(
self._create_and_process(
bulk_evaluator,
{
"msgtype": "m.text",
"body": "this is a plain message with no mention",
},
type="m.room.message",
),
"alice should not be notified (mentions & keywords room setting)",
)
self.assertTrue(
self._create_and_process(
bulk_evaluator,
{
"msgtype": "m.text",
"body": "this is a message that mentions alice",
},
type="m.room.message",
),
"alice should be notified (mentioned)",
)
# let's have alice subscribe to the thread
self.get_success(
self.hs.get_datastores().main.subscribe_user_to_thread(
self.alice,
self.room_id,
thread_root_id,
automatic_event_orderings=None,
)
)
self.assertTrue(
self._create_and_process(
bulk_evaluator,
{
"msgtype": "m.text",
"body": "some message in the thread",
"m.relates_to": {
"rel_type": RelationTypes.THREAD,
"event_id": thread_root_id,
},
},
type="m.room.message",
),
"alice is subscribed to thread so should be notified",
)
def test_with_disabled_thread_subscriptions(self) -> None:
"""
Test what happens with threaded events when MSC4306 is disabled.

View File

@@ -18,12 +18,12 @@
#
#
import email.message
import importlib.resources as importlib_resources
import os
from http import HTTPStatus
from typing import Any, Dict, List, Sequence, Tuple
import attr
import pkg_resources
from parameterized import parameterized
from twisted.internet.defer import Deferred
@@ -59,11 +59,12 @@ class EmailPusherTests(HomeserverTestCase):
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
config = self.default_config()
templates = (
importlib_resources.files("synapse").joinpath("res").joinpath("templates")
)
config["email"] = {
"enable_notifs": True,
"template_dir": os.path.abspath(
pkg_resources.resource_filename("synapse", "res/templates")
),
"template_dir": os.path.abspath(str(templates)),
"expiry_template_html": "notice_expiry.html",
"expiry_template_text": "notice_expiry.txt",
"notif_template_html": "notif_mail.html",

View File

@@ -2244,7 +2244,7 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
def test_topo_token_is_accepted(self) -> None:
"""Test Topo Token is accepted."""
token = "t1-0_0_0_0_0_0_0_0_0_0"
token = "t1-0_0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET",
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
@@ -2258,7 +2258,7 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
"""Test that stream token is accepted for forward pagination."""
token = "s0_0_0_0_0_0_0_0_0_0"
token = "s0_0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET",
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),

View File

@@ -0,0 +1,497 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
import logging
from http import HTTPStatus
from typing import List, Optional, Tuple, cast
from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
from synapse.rest.client import login, room, sync, thread_subscriptions
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.util import Clock
from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase
logger = logging.getLogger(__name__)
# The name of the extension. Currently unstable-prefixed.
EXT_NAME = "io.element.msc4308.thread_subscriptions"
class SlidingSyncThreadSubscriptionsExtensionTestCase(SlidingSyncBase):
"""
Test the thread subscriptions extension in the Sliding Sync API.
"""
maxDiff = None
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
room.register_servlets,
sync.register_servlets,
thread_subscriptions.register_servlets,
]
def default_config(self) -> JsonDict:
config = super().default_config()
config["experimental_features"] = {"msc4306_enabled": True}
return config
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.storage_controllers = hs.get_storage_controllers()
super().prepare(reactor, clock, hs)
def test_no_data_initial_sync(self) -> None:
"""
Test enabling thread subscriptions extension during initial sync with no data.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
sync_body = {
"lists": {},
"extensions": {
EXT_NAME: {
"enabled": True,
}
},
}
# Sync
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Assert
self.assertNotIn(EXT_NAME, response_body["extensions"])
def test_no_data_incremental_sync(self) -> None:
"""
Test enabling thread subscriptions extension during incremental sync with no data.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
initial_sync_body: JsonDict = {
"lists": {},
}
# Initial sync
response_body, sync_pos = self.do_sync(initial_sync_body, tok=user1_tok)
# Incremental sync with extension enabled
sync_body = {
"lists": {},
"extensions": {
EXT_NAME: {
"enabled": True,
}
},
}
response_body, _ = self.do_sync(sync_body, tok=user1_tok, since=sync_pos)
# Assert
self.assertNotIn(
EXT_NAME,
response_body["extensions"],
response_body,
)
def test_thread_subscription_initial_sync(self) -> None:
"""
Test thread subscriptions appear in initial sync response.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
thread_root_resp = self.helper.send(room_id, body="Thread root", tok=user1_tok)
thread_root_id = thread_root_resp["event_id"]
# get the baseline stream_id of the thread_subscriptions stream
# before we write any data.
# Required because the initial value differs between SQLite and Postgres.
base = self.store.get_max_thread_subscriptions_stream_id()
self._subscribe_to_thread(user1_id, room_id, thread_root_id)
sync_body = {
"lists": {},
"extensions": {
EXT_NAME: {
"enabled": True,
}
},
}
# Sync
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Assert
self.assertEqual(
response_body["extensions"][EXT_NAME],
{
"subscribed": {
room_id: {
thread_root_id: {
"automatic": False,
"bump_stamp": base + 1,
}
}
}
},
)
def test_thread_subscription_incremental_sync(self) -> None:
"""
Test new thread subscriptions appear in incremental sync response.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
sync_body = {
"lists": {},
"extensions": {
EXT_NAME: {
"enabled": True,
}
},
}
thread_root_resp = self.helper.send(room_id, body="Thread root", tok=user1_tok)
thread_root_id = thread_root_resp["event_id"]
# get the baseline stream_id of the thread_subscriptions stream
# before we write any data.
# Required because the initial value differs between SQLite and Postgres.
base = self.store.get_max_thread_subscriptions_stream_id()
# Initial sync
_, sync_pos = self.do_sync(sync_body, tok=user1_tok)
logger.info("Synced to: %r, now subscribing to thread", sync_pos)
# Subscribe
self._subscribe_to_thread(user1_id, room_id, thread_root_id)
# Incremental sync
response_body, sync_pos = self.do_sync(sync_body, tok=user1_tok, since=sync_pos)
logger.info("Synced to: %r", sync_pos)
# Assert
self.assertEqual(
response_body["extensions"][EXT_NAME],
{
"subscribed": {
room_id: {
thread_root_id: {
"automatic": False,
"bump_stamp": base + 1,
}
}
}
},
)
def test_unsubscribe_from_thread(self) -> None:
"""
Test unsubscribing from a thread.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
thread_root_resp = self.helper.send(room_id, body="Thread root", tok=user1_tok)
thread_root_id = thread_root_resp["event_id"]
# get the baseline stream_id of the thread_subscriptions stream
# before we write any data.
# Required because the initial value differs between SQLite and Postgres.
base = self.store.get_max_thread_subscriptions_stream_id()
self._subscribe_to_thread(user1_id, room_id, thread_root_id)
sync_body = {
"lists": {},
"extensions": {
EXT_NAME: {
"enabled": True,
}
},
}
response_body, sync_pos = self.do_sync(sync_body, tok=user1_tok)
# Assert: Subscription present
self.assertIn(EXT_NAME, response_body["extensions"])
self.assertEqual(
response_body["extensions"][EXT_NAME],
{
"subscribed": {
room_id: {
thread_root_id: {"automatic": False, "bump_stamp": base + 1}
}
}
},
)
# Unsubscribe
self._unsubscribe_from_thread(user1_id, room_id, thread_root_id)
# Incremental sync
response_body, sync_pos = self.do_sync(sync_body, tok=user1_tok, since=sync_pos)
# Assert: Unsubscription present
self.assertEqual(
response_body["extensions"][EXT_NAME],
{"unsubscribed": {room_id: {thread_root_id: {"bump_stamp": base + 2}}}},
)
def test_multiple_thread_subscriptions(self) -> None:
"""
Test handling of multiple thread subscriptions.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
# Create thread roots
thread_root_resp1 = self.helper.send(
room_id, body="Thread root 1", tok=user1_tok
)
thread_root_id1 = thread_root_resp1["event_id"]
thread_root_resp2 = self.helper.send(
room_id, body="Thread root 2", tok=user1_tok
)
thread_root_id2 = thread_root_resp2["event_id"]
thread_root_resp3 = self.helper.send(
room_id, body="Thread root 3", tok=user1_tok
)
thread_root_id3 = thread_root_resp3["event_id"]
# get the baseline stream_id of the thread_subscriptions stream
# before we write any data.
# Required because the initial value differs between SQLite and Postgres.
base = self.store.get_max_thread_subscriptions_stream_id()
# Subscribe to threads
self._subscribe_to_thread(user1_id, room_id, thread_root_id1)
self._subscribe_to_thread(user1_id, room_id, thread_root_id2)
self._subscribe_to_thread(user1_id, room_id, thread_root_id3)
sync_body = {
"lists": {},
"extensions": {
EXT_NAME: {
"enabled": True,
}
},
}
# Sync
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Assert
self.assertEqual(
response_body["extensions"][EXT_NAME],
{
"subscribed": {
room_id: {
thread_root_id1: {
"automatic": False,
"bump_stamp": base + 1,
},
thread_root_id2: {
"automatic": False,
"bump_stamp": base + 2,
},
thread_root_id3: {
"automatic": False,
"bump_stamp": base + 3,
},
}
}
},
)
def test_limit_parameter(self) -> None:
"""
Test limit parameter in thread subscriptions extension.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
# Create 5 thread roots and subscribe to each
thread_root_ids = []
for i in range(5):
thread_root_resp = self.helper.send(
room_id, body=f"Thread root {i}", tok=user1_tok
)
thread_root_ids.append(thread_root_resp["event_id"])
self._subscribe_to_thread(user1_id, room_id, thread_root_ids[-1])
sync_body = {
"lists": {},
"extensions": {EXT_NAME: {"enabled": True, "limit": 3}},
}
# Sync
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Assert
thread_subscriptions = response_body["extensions"][EXT_NAME]
self.assertEqual(
len(thread_subscriptions["subscribed"][room_id]), 3, thread_subscriptions
)
def test_limit_and_companion_backpagination(self) -> None:
"""
Create 1 thread subscription, do a sync, create 4 more,
then sync with a limit of 2 and fill in the gap
using the companion /thread_subscriptions endpoint.
"""
thread_root_ids: List[str] = []
def make_subscription() -> None:
thread_root_resp = self.helper.send(
room_id, body="Some thread root", tok=user1_tok
)
thread_root_ids.append(thread_root_resp["event_id"])
self._subscribe_to_thread(user1_id, room_id, thread_root_ids[-1])
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
# get the baseline stream_id of the thread_subscriptions stream
# before we write any data.
# Required because the initial value differs between SQLite and Postgres.
base = self.store.get_max_thread_subscriptions_stream_id()
# Make our first subscription
make_subscription()
# Sync for the first time
sync_body = {
"lists": {},
"extensions": {EXT_NAME: {"enabled": True, "limit": 2}},
}
sync_resp, first_sync_pos = self.do_sync(sync_body, tok=user1_tok)
thread_subscriptions = sync_resp["extensions"][EXT_NAME]
self.assertEqual(
thread_subscriptions["subscribed"],
{
room_id: {
thread_root_ids[0]: {"automatic": False, "bump_stamp": base + 1},
}
},
)
# Get our pos for the next sync
first_sync_pos = sync_resp["pos"]
# Create 5 more thread subscriptions and subscribe to each
for _ in range(5):
make_subscription()
# Now sync again. Our limit is 2,
# so we should get the latest 2 subscriptions,
# with a gap of 3 more subscriptions in the middle
sync_resp, _pos = self.do_sync(sync_body, tok=user1_tok, since=first_sync_pos)
thread_subscriptions = sync_resp["extensions"][EXT_NAME]
self.assertEqual(
thread_subscriptions["subscribed"],
{
room_id: {
thread_root_ids[4]: {"automatic": False, "bump_stamp": base + 5},
thread_root_ids[5]: {"automatic": False, "bump_stamp": base + 6},
}
},
)
# 1st backpagination: expecting a page with 2 subscriptions
page, end_tok = self._do_backpaginate(
from_tok=thread_subscriptions["prev_batch"],
to_tok=first_sync_pos,
limit=2,
access_token=user1_tok,
)
self.assertIsNotNone(end_tok, "backpagination should continue")
self.assertEqual(
page["subscribed"],
{
room_id: {
thread_root_ids[2]: {"automatic": False, "bump_stamp": base + 3},
thread_root_ids[3]: {"automatic": False, "bump_stamp": base + 4},
}
},
)
# 2nd backpagination: expecting a page with only 1 subscription
# and no other token for further backpagination
assert end_tok is not None
page, end_tok = self._do_backpaginate(
from_tok=end_tok, to_tok=first_sync_pos, limit=2, access_token=user1_tok
)
self.assertIsNone(end_tok, "backpagination should have finished")
self.assertEqual(
page["subscribed"],
{
room_id: {
thread_root_ids[1]: {"automatic": False, "bump_stamp": base + 2},
}
},
)
def _do_backpaginate(
self, *, from_tok: str, to_tok: str, limit: int, access_token: str
) -> Tuple[JsonDict, Optional[str]]:
channel = self.make_request(
"GET",
"/_matrix/client/unstable/io.element.msc4308/thread_subscriptions"
f"?from={from_tok}&to={to_tok}&limit={limit}&dir=b",
access_token=access_token,
)
self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
body = channel.json_body
return body, cast(Optional[str], body.get("end"))
def _subscribe_to_thread(
self, user_id: str, room_id: str, thread_root_id: str
) -> None:
"""
Helper method to subscribe a user to a thread.
"""
self.get_success(
self.store.subscribe_user_to_thread(
user_id=user_id,
room_id=room_id,
thread_root_event_id=thread_root_id,
automatic_event_orderings=None,
)
)
def _unsubscribe_from_thread(
self, user_id: str, room_id: str, thread_root_id: str
) -> None:
"""
Helper method to unsubscribe a user from a thread.
"""
self.get_success(
self.store.unsubscribe_user_from_thread(
user_id=user_id,
room_id=room_id,
thread_root_event_id=thread_root_id,
)
)

View File

@@ -18,6 +18,7 @@
# [This file includes modifications made by New Vector Limited]
#
#
import importlib.resources as importlib_resources
import os
import re
from email.parser import Parser
@@ -25,8 +26,6 @@ from http import HTTPStatus
from typing import Any, Dict, List, Optional, Union
from unittest.mock import Mock
import pkg_resources
from twisted.internet.interfaces import IReactorTCP
from twisted.internet.testing import MemoryReactor
@@ -59,11 +58,12 @@ class PasswordResetTestCase(unittest.HomeserverTestCase):
config = self.default_config()
# Email config.
templates = (
importlib_resources.files("synapse").joinpath("res").joinpath("templates")
)
config["email"] = {
"enable_notifs": False,
"template_dir": os.path.abspath(
pkg_resources.resource_filename("synapse", "res/templates")
),
"template_dir": os.path.abspath(str(templates)),
"smtp_host": "127.0.0.1",
"smtp_port": 20,
"require_transport_security": False,
@@ -798,11 +798,12 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
config = self.default_config()
# Email config.
templates = (
importlib_resources.files("synapse").joinpath("res").joinpath("templates")
)
config["email"] = {
"enable_notifs": False,
"template_dir": os.path.abspath(
pkg_resources.resource_filename("synapse", "res/templates")
),
"template_dir": os.path.abspath(str(templates)),
"smtp_host": "127.0.0.1",
"smtp_port": 20,
"require_transport_security": False,

View File

@@ -46,6 +46,7 @@ from twisted.web.resource import Resource
from synapse.api.errors import HttpResponseException
from synapse.api.ratelimiting import Ratelimiter
from synapse.config._base import Config
from synapse.config.oembed import OEmbedEndpointConfig
from synapse.http.client import MultipartResponse
from synapse.http.types import QueryParams
@@ -53,6 +54,7 @@ from synapse.logging.context import make_deferred_yieldable
from synapse.media._base import FileInfo, ThumbnailInfo
from synapse.media.thumbnailer import ThumbnailProvider
from synapse.media.url_previewer import IMAGE_CACHE_EXPIRY_MS
from synapse.module_api import MediaUploadLimit
from synapse.rest import admin
from synapse.rest.client import login, media
from synapse.server import HomeServer
@@ -2967,3 +2969,192 @@ class MediaUploadLimits(unittest.HomeserverTestCase):
# This will succeed as the weekly limit has reset
channel = self.upload_media(900)
self.assertEqual(channel.code, 200)
class MediaUploadLimitsModuleOverrides(unittest.HomeserverTestCase):
"""
This test case simulates a homeserver with media upload limits being overridden by the module API.
"""
servlets = [
media.register_servlets,
login.register_servlets,
admin.register_servlets,
]
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
config = self.default_config()
self.storage_path = self.mktemp()
self.media_store_path = self.mktemp()
os.mkdir(self.storage_path)
os.mkdir(self.media_store_path)
config["media_store_path"] = self.media_store_path
provider_config = {
"module": "synapse.media.storage_provider.FileStorageProviderBackend",
"store_local": True,
"store_synchronous": False,
"store_remote": True,
"config": {"directory": self.storage_path},
}
config["media_storage_providers"] = [provider_config]
# default limits to use
config["media_upload_limits"] = [
{"time_period": "1d", "max_size": "1K"},
{"time_period": "1w", "max_size": "3K"},
]
return self.setup_test_homeserver(config=config)
async def _get_media_upload_limits_for_user(
self,
user_id: str,
) -> Optional[List[MediaUploadLimit]]:
# user1 has custom limits
if user_id == self.user1:
# n.b. we return these in increasing duration order and Synapse will need to sort them correctly
return [
MediaUploadLimit(
time_period_ms=Config.parse_duration("1d"), max_bytes=5000
),
MediaUploadLimit(
time_period_ms=Config.parse_duration("1w"), max_bytes=15000
),
]
# user2 has no limits
if user_id == self.user2:
return []
# otherwise use default
return None
async def _on_media_upload_limit_exceeded(
self,
user_id: str,
limit: MediaUploadLimit,
sent_bytes: int,
attempted_bytes: int,
) -> None:
self.last_media_upload_limit_exceeded: Optional[dict[str, object]] = {
"user_id": user_id,
"limit": limit,
"sent_bytes": sent_bytes,
"attempted_bytes": attempted_bytes,
}
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.repo = hs.get_media_repository()
self.client = hs.get_federation_http_client()
self.store = hs.get_datastores().main
self.user1 = self.register_user("user1", "pass")
self.tok1 = self.login("user1", "pass")
self.user2 = self.register_user("user2", "pass")
self.tok2 = self.login("user2", "pass")
self.user3 = self.register_user("user3", "pass")
self.tok3 = self.login("user3", "pass")
self.last_media_upload_limit_exceeded = None
self.hs.get_module_api().register_media_repository_callbacks(
get_media_upload_limits_for_user=self._get_media_upload_limits_for_user,
on_media_upload_limit_exceeded=self._on_media_upload_limit_exceeded,
)
def create_resource_dict(self) -> Dict[str, Resource]:
resources = super().create_resource_dict()
resources["/_matrix/media"] = self.hs.get_media_repository_resource()
return resources
def upload_media(self, size: int, tok: str) -> FakeChannel:
"""Helper to upload media of a given size with a given token."""
return self.make_request(
"POST",
"/_matrix/media/v3/upload",
content=b"0" * size,
access_token=tok,
shorthand=False,
content_type=b"text/plain",
custom_headers=[("Content-Length", str(size))],
)
def test_upload_under_limit(self) -> None:
"""Test that uploading media under the limit works."""
# User 1 uploads 100 bytes
channel = self.upload_media(100, self.tok1)
self.assertEqual(channel.code, 200)
# User 2 (unlimited) uploads 100 bytes
channel = self.upload_media(100, self.tok2)
self.assertEqual(channel.code, 200)
# User 3 (default) uploads 100 bytes
channel = self.upload_media(100, self.tok3)
self.assertEqual(channel.code, 200)
self.assertEqual(self.last_media_upload_limit_exceeded, None)
def test_uses_custom_limit(self) -> None:
"""Test that uploading media over the module provided daily limit fails."""
# User 1 uploads 3000 bytes
channel = self.upload_media(3000, self.tok1)
self.assertEqual(channel.code, 200)
# User 1 attempts to upload 4000 bytes taking it over the limit
channel = self.upload_media(4000, self.tok1)
self.assertEqual(channel.code, 400)
assert self.last_media_upload_limit_exceeded is not None
self.assertEqual(self.last_media_upload_limit_exceeded["user_id"], self.user1)
self.assertEqual(
self.last_media_upload_limit_exceeded["limit"],
MediaUploadLimit(
max_bytes=5000, time_period_ms=Config.parse_duration("1d")
),
)
self.assertEqual(self.last_media_upload_limit_exceeded["sent_bytes"], 3000)
self.assertEqual(self.last_media_upload_limit_exceeded["attempted_bytes"], 4000)
# User 1 attempts to upload 20000 bytes which is over the weekly limit
# This tests that the limits have been sorted as expected
channel = self.upload_media(20000, self.tok1)
self.assertEqual(channel.code, 400)
assert self.last_media_upload_limit_exceeded is not None
self.assertEqual(self.last_media_upload_limit_exceeded["user_id"], self.user1)
self.assertEqual(
self.last_media_upload_limit_exceeded["limit"],
MediaUploadLimit(
max_bytes=15000, time_period_ms=Config.parse_duration("1w")
),
)
self.assertEqual(self.last_media_upload_limit_exceeded["sent_bytes"], 3000)
self.assertEqual(
self.last_media_upload_limit_exceeded["attempted_bytes"], 20000
)
def test_uses_unlimited(self) -> None:
"""Test that unlimited user is not limited when module returns []."""
# User 2 uploads 10000 bytes which is over the default limit
channel = self.upload_media(10000, self.tok2)
self.assertEqual(channel.code, 200)
self.assertEqual(self.last_media_upload_limit_exceeded, None)
def test_uses_defaults(self) -> None:
"""Test that the default limits are applied when module returned None."""
# User 3 uploads 500 bytes
channel = self.upload_media(500, self.tok3)
self.assertEqual(channel.code, 200)
# User 3 uploads 800 bytes which is over the limit
channel = self.upload_media(800, self.tok3)
self.assertEqual(channel.code, 400)
assert self.last_media_upload_limit_exceeded is not None
self.assertEqual(self.last_media_upload_limit_exceeded["user_id"], self.user3)
self.assertEqual(
self.last_media_upload_limit_exceeded["limit"],
MediaUploadLimit(
max_bytes=1024, time_period_ms=Config.parse_duration("1d")
),
)
self.assertEqual(self.last_media_upload_limit_exceeded["sent_bytes"], 500)
self.assertEqual(self.last_media_upload_limit_exceeded["attempted_bytes"], 800)

View File

@@ -18,6 +18,8 @@
# [This file includes modifications made by New Vector Limited]
#
#
from http import HTTPStatus
import synapse
from synapse.api.errors import Codes
from synapse.rest.client import login, push_rule, room
@@ -486,3 +488,23 @@ class PushRuleAttributesTestCase(HomeserverTestCase):
},
channel.json_body,
)
def test_no_user_defined_postcontent_rules(self) -> None:
"""
Tests that clients are not permitted to create MSC4306 `postcontent` rules.
"""
self.register_user("bob", "pass")
token = self.login("bob", "pass")
channel = self.make_request(
"PUT",
"/pushrules/global/postcontent/some.user.rule",
{},
access_token=token,
)
self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST)
self.assertEqual(
Codes.INVALID_PARAM,
channel.json_body["errcode"],
)

View File

@@ -20,12 +20,11 @@
#
#
import datetime
import importlib.resources as importlib_resources
import os
from typing import Any, Dict, List, Tuple
from unittest.mock import AsyncMock
import pkg_resources
from twisted.internet.testing import MemoryReactor
import synapse.rest.admin
@@ -981,11 +980,12 @@ class AccountValidityRenewalByEmailTestCase(unittest.HomeserverTestCase):
# Email config.
templates = (
importlib_resources.files("synapse").joinpath("res").joinpath("templates")
)
config["email"] = {
"enable_notifs": True,
"template_dir": os.path.abspath(
pkg_resources.resource_filename("synapse", "res/templates")
),
"template_dir": os.path.abspath(str(templates)),
"expiry_template_html": "notice_expiry.html",
"expiry_template_text": "notice_expiry.txt",
"notif_template_html": "notif_mail.html",

View File

@@ -2245,7 +2245,7 @@ class RoomMessageListTestCase(RoomBase):
self.room_id = self.helper.create_room_as(self.user_id)
def test_topo_token_is_accepted(self) -> None:
token = "t1-0_0_0_0_0_0_0_0_0_0"
token = "t1-0_0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
)
@@ -2256,7 +2256,7 @@ class RoomMessageListTestCase(RoomBase):
self.assertTrue("end" in channel.json_body)
def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
token = "s0_0_0_0_0_0_0_0_0_0"
token = "s0_0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
)

View File

@@ -18,27 +18,13 @@
# [This file includes modifications made by New Vector Limited]
#
#
from parameterized import parameterized_class
from synapse.api.constants import EduTypes
from synapse.rest import admin
from synapse.rest.client import login, sendtodevice, sync
from synapse.types import JsonDict
from tests.unittest import HomeserverTestCase, override_config
@parameterized_class(
("sync_endpoint", "experimental_features"),
[
("/sync", {}),
(
"/_matrix/client/unstable/org.matrix.msc3575/sync/e2ee",
# Enable sliding sync
{"msc3575_enabled": True},
),
],
)
class SendToDeviceTestCase(HomeserverTestCase):
"""
Test `/sendToDevice` will deliver messages across to people receiving them over `/sync`.
@@ -48,9 +34,6 @@ class SendToDeviceTestCase(HomeserverTestCase):
experimental_features: The experimental features homeserver config to use.
"""
sync_endpoint: str
experimental_features: JsonDict
servlets = [
admin.register_servlets,
login.register_servlets,
@@ -58,11 +41,6 @@ class SendToDeviceTestCase(HomeserverTestCase):
sync.register_servlets,
]
def default_config(self) -> JsonDict:
config = super().default_config()
config["experimental_features"] = self.experimental_features
return config
def test_user_to_user(self) -> None:
"""A to-device message from one user to another should get delivered"""
@@ -83,7 +61,7 @@ class SendToDeviceTestCase(HomeserverTestCase):
self.assertEqual(chan.code, 200, chan.result)
# check it appears
channel = self.make_request("GET", self.sync_endpoint, access_token=user2_tok)
channel = self.make_request("GET", "/sync", access_token=user2_tok)
self.assertEqual(channel.code, 200, channel.result)
expected_result = {
"events": [
@@ -99,7 +77,7 @@ class SendToDeviceTestCase(HomeserverTestCase):
# it should re-appear if we do another sync because the to-device message is not
# deleted until we acknowledge it by sending a `?since=...` parameter in the
# next sync request corresponding to the `next_batch` value from the response.
channel = self.make_request("GET", self.sync_endpoint, access_token=user2_tok)
channel = self.make_request("GET", "/sync", access_token=user2_tok)
self.assertEqual(channel.code, 200, channel.result)
self.assertEqual(channel.json_body["to_device"], expected_result)
@@ -107,7 +85,7 @@ class SendToDeviceTestCase(HomeserverTestCase):
sync_token = channel.json_body["next_batch"]
channel = self.make_request(
"GET",
f"{self.sync_endpoint}?since={sync_token}",
f"/sync?since={sync_token}",
access_token=user2_tok,
)
self.assertEqual(channel.code, 200, channel.result)
@@ -133,7 +111,7 @@ class SendToDeviceTestCase(HomeserverTestCase):
self.assertEqual(chan.code, 200, chan.result)
# now sync: we should get two of the three (because burst_count=2)
channel = self.make_request("GET", self.sync_endpoint, access_token=user2_tok)
channel = self.make_request("GET", "/sync", access_token=user2_tok)
self.assertEqual(channel.code, 200, channel.result)
msgs = channel.json_body["to_device"]["events"]
self.assertEqual(len(msgs), 2)
@@ -163,7 +141,7 @@ class SendToDeviceTestCase(HomeserverTestCase):
# ... which should arrive
channel = self.make_request(
"GET",
f"{self.sync_endpoint}?since={sync_token}",
f"/sync?since={sync_token}",
access_token=user2_tok,
)
self.assertEqual(channel.code, 200, channel.result)
@@ -198,7 +176,7 @@ class SendToDeviceTestCase(HomeserverTestCase):
)
# now sync: we should get two of the three
channel = self.make_request("GET", self.sync_endpoint, access_token=user2_tok)
channel = self.make_request("GET", "/sync", access_token=user2_tok)
self.assertEqual(channel.code, 200, channel.result)
msgs = channel.json_body["to_device"]["events"]
self.assertEqual(len(msgs), 2)
@@ -233,7 +211,7 @@ class SendToDeviceTestCase(HomeserverTestCase):
# ... which should arrive
channel = self.make_request(
"GET",
f"{self.sync_endpoint}?since={sync_token}",
f"/sync?since={sync_token}",
access_token=user2_tok,
)
self.assertEqual(channel.code, 200, channel.result)
@@ -258,7 +236,7 @@ class SendToDeviceTestCase(HomeserverTestCase):
user2_tok = self.login("u2", "pass", "d2")
# Do an initial sync
channel = self.make_request("GET", self.sync_endpoint, access_token=user2_tok)
channel = self.make_request("GET", "/sync", access_token=user2_tok)
self.assertEqual(channel.code, 200, channel.result)
sync_token = channel.json_body["next_batch"]
@@ -275,7 +253,7 @@ class SendToDeviceTestCase(HomeserverTestCase):
channel = self.make_request(
"GET",
f"{self.sync_endpoint}?since={sync_token}&timeout=300000",
f"/sync?since={sync_token}&timeout=300000",
access_token=user2_tok,
)
self.assertEqual(channel.code, 200, channel.result)
@@ -285,7 +263,7 @@ class SendToDeviceTestCase(HomeserverTestCase):
channel = self.make_request(
"GET",
f"{self.sync_endpoint}?since={sync_token}&timeout=300000",
f"/sync?since={sync_token}&timeout=300000",
access_token=user2_tok,
)
self.assertEqual(channel.code, 200, channel.result)

View File

@@ -22,7 +22,7 @@ import json
import logging
from typing import List
from parameterized import parameterized, parameterized_class
from parameterized import parameterized
from twisted.internet.testing import MemoryReactor
@@ -702,29 +702,11 @@ class SyncCacheTestCase(unittest.HomeserverTestCase):
self.assertEqual(channel.code, 200, channel.json_body)
@parameterized_class(
("sync_endpoint", "experimental_features"),
[
("/sync", {}),
(
"/_matrix/client/unstable/org.matrix.msc3575/sync/e2ee",
# Enable sliding sync
{"msc3575_enabled": True},
),
],
)
class DeviceListSyncTestCase(unittest.HomeserverTestCase):
"""
Tests regarding device list (`device_lists`) changes.
Attributes:
sync_endpoint: The endpoint under test to use for syncing.
experimental_features: The experimental features homeserver config to use.
"""
sync_endpoint: str
experimental_features: JsonDict
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
@@ -733,11 +715,6 @@ class DeviceListSyncTestCase(unittest.HomeserverTestCase):
devices.register_servlets,
]
def default_config(self) -> JsonDict:
config = super().default_config()
config["experimental_features"] = self.experimental_features
return config
def test_receiving_local_device_list_changes(self) -> None:
"""Tests that a local users that share a room receive each other's device list
changes.
@@ -767,7 +744,7 @@ class DeviceListSyncTestCase(unittest.HomeserverTestCase):
# Now have Bob initiate an initial sync (in order to get a since token)
channel = self.make_request(
"GET",
self.sync_endpoint,
"/sync",
access_token=bob_access_token,
)
self.assertEqual(channel.code, 200, channel.json_body)
@@ -777,7 +754,7 @@ class DeviceListSyncTestCase(unittest.HomeserverTestCase):
# which we hope will happen as a result of Alice updating their device list.
bob_sync_channel = self.make_request(
"GET",
f"{self.sync_endpoint}?since={next_batch_token}&timeout=30000",
f"/sync?since={next_batch_token}&timeout=30000",
access_token=bob_access_token,
# Start the request, then continue on.
await_result=False,
@@ -824,7 +801,7 @@ class DeviceListSyncTestCase(unittest.HomeserverTestCase):
# Have Bob initiate an initial sync (in order to get a since token)
channel = self.make_request(
"GET",
self.sync_endpoint,
"/sync",
access_token=bob_access_token,
)
self.assertEqual(channel.code, 200, channel.json_body)
@@ -834,7 +811,7 @@ class DeviceListSyncTestCase(unittest.HomeserverTestCase):
# which we hope will happen as a result of Alice updating their device list.
bob_sync_channel = self.make_request(
"GET",
f"{self.sync_endpoint}?since={next_batch_token}&timeout=1000",
f"/sync?since={next_batch_token}&timeout=1000",
access_token=bob_access_token,
# Start the request, then continue on.
await_result=False,
@@ -873,9 +850,7 @@ class DeviceListSyncTestCase(unittest.HomeserverTestCase):
)
# Request an initial sync
channel = self.make_request(
"GET", self.sync_endpoint, access_token=alice_access_token
)
channel = self.make_request("GET", "/sync", access_token=alice_access_token)
self.assertEqual(channel.code, 200, channel.json_body)
next_batch = channel.json_body["next_batch"]
@@ -883,7 +858,7 @@ class DeviceListSyncTestCase(unittest.HomeserverTestCase):
# It won't return until something has happened
incremental_sync_channel = self.make_request(
"GET",
f"{self.sync_endpoint}?since={next_batch}&timeout=30000",
f"/sync?since={next_batch}&timeout=30000",
access_token=alice_access_token,
await_result=False,
)
@@ -913,17 +888,6 @@ class DeviceListSyncTestCase(unittest.HomeserverTestCase):
)
@parameterized_class(
("sync_endpoint", "experimental_features"),
[
("/sync", {}),
(
"/_matrix/client/unstable/org.matrix.msc3575/sync/e2ee",
# Enable sliding sync
{"msc3575_enabled": True},
),
],
)
class DeviceOneTimeKeysSyncTestCase(unittest.HomeserverTestCase):
"""
Tests regarding device one time keys (`device_one_time_keys_count`) changes.
@@ -933,9 +897,6 @@ class DeviceOneTimeKeysSyncTestCase(unittest.HomeserverTestCase):
experimental_features: The experimental features homeserver config to use.
"""
sync_endpoint: str
experimental_features: JsonDict
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
@@ -943,11 +904,6 @@ class DeviceOneTimeKeysSyncTestCase(unittest.HomeserverTestCase):
devices.register_servlets,
]
def default_config(self) -> JsonDict:
config = super().default_config()
config["experimental_features"] = self.experimental_features
return config
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.e2e_keys_handler = hs.get_e2e_keys_handler()
@@ -964,9 +920,7 @@ class DeviceOneTimeKeysSyncTestCase(unittest.HomeserverTestCase):
)
# Request an initial sync
channel = self.make_request(
"GET", self.sync_endpoint, access_token=alice_access_token
)
channel = self.make_request("GET", "/sync", access_token=alice_access_token)
self.assertEqual(channel.code, 200, channel.json_body)
# Check for those one time key counts
@@ -1011,9 +965,7 @@ class DeviceOneTimeKeysSyncTestCase(unittest.HomeserverTestCase):
)
# Request an initial sync
channel = self.make_request(
"GET", self.sync_endpoint, access_token=alice_access_token
)
channel = self.make_request("GET", "/sync", access_token=alice_access_token)
self.assertEqual(channel.code, 200, channel.json_body)
# Check for those one time key counts
@@ -1024,17 +976,6 @@ class DeviceOneTimeKeysSyncTestCase(unittest.HomeserverTestCase):
)
@parameterized_class(
("sync_endpoint", "experimental_features"),
[
("/sync", {}),
(
"/_matrix/client/unstable/org.matrix.msc3575/sync/e2ee",
# Enable sliding sync
{"msc3575_enabled": True},
),
],
)
class DeviceUnusedFallbackKeySyncTestCase(unittest.HomeserverTestCase):
"""
Tests regarding device one time keys (`device_unused_fallback_key_types`) changes.
@@ -1044,9 +985,6 @@ class DeviceUnusedFallbackKeySyncTestCase(unittest.HomeserverTestCase):
experimental_features: The experimental features homeserver config to use.
"""
sync_endpoint: str
experimental_features: JsonDict
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
@@ -1054,11 +992,6 @@ class DeviceUnusedFallbackKeySyncTestCase(unittest.HomeserverTestCase):
devices.register_servlets,
]
def default_config(self) -> JsonDict:
config = super().default_config()
config["experimental_features"] = self.experimental_features
return config
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = self.hs.get_datastores().main
self.e2e_keys_handler = hs.get_e2e_keys_handler()
@@ -1078,9 +1011,7 @@ class DeviceUnusedFallbackKeySyncTestCase(unittest.HomeserverTestCase):
)
# Request an initial sync
channel = self.make_request(
"GET", self.sync_endpoint, access_token=alice_access_token
)
channel = self.make_request("GET", "/sync", access_token=alice_access_token)
self.assertEqual(channel.code, 200, channel.json_body)
# Check for those one time key counts
@@ -1122,9 +1053,7 @@ class DeviceUnusedFallbackKeySyncTestCase(unittest.HomeserverTestCase):
self.assertEqual(fallback_res, ["alg1"], fallback_res)
# Request an initial sync
channel = self.make_request(
"GET", self.sync_endpoint, access_token=alice_access_token
)
channel = self.make_request("GET", "/sync", access_token=alice_access_token)
self.assertEqual(channel.code, 200, channel.json_body)
# Check for the unused fallback key types

Some files were not shown because too many files have changed in this diff Show More