mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-07 01:20:16 +00:00
Compare commits
132 Commits
erikj/ss_p
...
anoa/docke
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bed85d3b16 | ||
|
|
f7a7a0dbd4 | ||
|
|
58deef5eba | ||
|
|
d427403c67 | ||
|
|
e9f9625d6b | ||
|
|
4be3bd41fd | ||
|
|
b3b1db4057 | ||
|
|
6c51f8649d | ||
|
|
69e9b75373 | ||
|
|
5d0514f29b | ||
|
|
4e5410fdae | ||
|
|
12d65a6778 | ||
|
|
1006c12eb2 | ||
|
|
57efc8c03e | ||
|
|
46c885f5b5 | ||
|
|
4b94a056bd | ||
|
|
a5e16a4ab5 | ||
|
|
80ad02e10e | ||
|
|
9512b84a72 | ||
|
|
22aa925523 | ||
|
|
0ab99369a1 | ||
|
|
6ececb8f2a | ||
|
|
2ce7a1edf7 | ||
|
|
ec885ffd33 | ||
|
|
11bc9a1b3a | ||
|
|
c5b379de66 | ||
|
|
adda2a4613 | ||
|
|
5d47138b46 | ||
|
|
d025b5ab50 | ||
|
|
ae6179b382 | ||
|
|
5dd6157972 | ||
|
|
1266138b66 | ||
|
|
24975eca4d | ||
|
|
451a9dc7b9 | ||
|
|
f6a3e5e1c2 | ||
|
|
05576f0b4b | ||
|
|
60aebdb27e | ||
|
|
b1b4b2944d | ||
|
|
bdcc9fa388 | ||
|
|
6a0c21fabd | ||
|
|
f40641c29b | ||
|
|
1bb528ee44 | ||
|
|
165f4ca776 | ||
|
|
475e192cbe | ||
|
|
43040a4051 | ||
|
|
b3e2d10f39 | ||
|
|
a5986ac229 | ||
|
|
006251a5d0 | ||
|
|
422f3ecec1 | ||
|
|
4e90221d87 | ||
|
|
e2610de208 | ||
|
|
e8c8924b81 | ||
|
|
e8e0f0fad7 | ||
|
|
beb7a951f4 | ||
|
|
d34f827ed8 | ||
|
|
9920417723 | ||
|
|
316d635906 | ||
|
|
8bbe66a9b9 | ||
|
|
d4e3ad04cd | ||
|
|
55c0391cc8 | ||
|
|
81e0f57800 | ||
|
|
ae4862c38f | ||
|
|
602956ef64 | ||
|
|
444b565c76 | ||
|
|
8068f31146 | ||
|
|
5210565c12 | ||
|
|
de955293cf | ||
|
|
93889eb2e7 | ||
|
|
ece66ba61c | ||
|
|
ef9ef99f59 | ||
|
|
cfbddc258f | ||
|
|
302534c348 | ||
|
|
f144b4c7e9 | ||
|
|
13dea6949b | ||
|
|
386cabda83 | ||
|
|
f53a3a56e2 | ||
|
|
2fc43e4219 | ||
|
|
b0d2aca164 | ||
|
|
f68e8d0021 | ||
|
|
89e7609f5c | ||
|
|
b89a66f831 | ||
|
|
b066b3aa04 | ||
|
|
e4b0cd87cc | ||
|
|
985b3ab58d | ||
|
|
afc3af7763 | ||
|
|
af2da0e47a | ||
|
|
ac8c9ac50d | ||
|
|
443a9eb335 | ||
|
|
aad26cb93f | ||
|
|
5173741c71 | ||
|
|
75e2c17d2a | ||
|
|
a851f6b237 | ||
|
|
c2e5e9e67c | ||
|
|
07a51d2a56 | ||
|
|
83fc225030 | ||
|
|
a9c0e27eb7 | ||
|
|
faf5b40520 | ||
|
|
af998e6c66 | ||
|
|
61b7c31772 | ||
|
|
3c8a116e1a | ||
|
|
51dd4df0a3 | ||
|
|
8881ad6d4b | ||
|
|
d40bc279ed | ||
|
|
d10872ee75 | ||
|
|
03937a1cae | ||
|
|
285de43e48 | ||
|
|
4900438712 | ||
|
|
cf982d2e32 | ||
|
|
7589565edd | ||
|
|
7ed23e072e | ||
|
|
4ac783549c | ||
|
|
1cb84aaab5 | ||
|
|
9b83fb7c16 | ||
|
|
c5b4be6d07 | ||
|
|
4c66a7cbed | ||
|
|
ebad618bf0 | ||
|
|
16af80b8fb | ||
|
|
e4a1f271b9 | ||
|
|
6b131a99fe | ||
|
|
76f7c91e44 | ||
|
|
b732d13d4c | ||
|
|
596b96411b | ||
|
|
f6c2b0ec2e | ||
|
|
a7fcac5648 | ||
|
|
e06e3c4004 | ||
|
|
60441059a3 | ||
|
|
1b197752b6 | ||
|
|
598a83d005 | ||
|
|
be603de2cb | ||
|
|
62523571ae | ||
|
|
5562a89168 | ||
|
|
59bcbcec0a |
@@ -53,7 +53,7 @@ if not IS_PR:
|
||||
"database": "sqlite",
|
||||
"extras": "all",
|
||||
}
|
||||
for version in ("3.9", "3.10", "3.11", "3.12")
|
||||
for version in ("3.9", "3.10", "3.11", "3.12", "3.13")
|
||||
)
|
||||
|
||||
trial_postgres_tests = [
|
||||
@@ -68,9 +68,9 @@ trial_postgres_tests = [
|
||||
if not IS_PR:
|
||||
trial_postgres_tests.append(
|
||||
{
|
||||
"python-version": "3.12",
|
||||
"python-version": "3.13",
|
||||
"database": "postgres",
|
||||
"postgres-version": "16",
|
||||
"postgres-version": "17",
|
||||
"extras": "all",
|
||||
}
|
||||
)
|
||||
|
||||
2
.github/workflows/docker.yml
vendored
2
.github/workflows/docker.yml
vendored
@@ -30,7 +30,7 @@ jobs:
|
||||
run: docker buildx inspect
|
||||
|
||||
- name: Install Cosign
|
||||
uses: sigstore/cosign-installer@v3.6.0
|
||||
uses: sigstore/cosign-installer@v3.7.0
|
||||
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
|
||||
253
CHANGES.md
253
CHANGES.md
@@ -1,3 +1,256 @@
|
||||
# Synapse 1.118.0 (2024-10-29)
|
||||
|
||||
No significant changes since 1.118.0rc1.
|
||||
|
||||
### Python 3.8 support will be dropped in the next release
|
||||
|
||||
Python 3.8 is now [end-of-life](https://devguide.python.org/versions/). As per our [Deprecation Policy for Platform Dependencies](https://element-hq.github.io/synapse/latest/deprecation_policy.html#policy), Synapse will be dropping support for Python 3.8 in the next release; Synapse 1.119.0.
|
||||
|
||||
Synapse 1.118.x will be the final release to support Python 3.8. If you are running Synapse with Python 3.8, please upgrade before the 1.119.0 release, due in less than one month.
|
||||
|
||||
### Python 3.13 and PostgreSQL 17 support
|
||||
|
||||
On the other end of the spectrum, Synapse 1.118.0 is the first release to support [Python 3.13](https://www.python.org/downloads/release/python-3130/)! [PostgreSQL 17](https://www.postgresql.org/about/news/postgresql-17-released-2936/) is also supported as of this release.
|
||||
|
||||
|
||||
# Synapse 1.118.0rc1 (2024-10-22)
|
||||
|
||||
### Features
|
||||
|
||||
- Added the `display_name_claim` option to the JWT configuration. This option allows specifying the claim key that contains the user's display name in the JWT payload. ([\#17708](https://github.com/element-hq/synapse/issues/17708))
|
||||
- Implement [MSC4210](https://github.com/matrix-org/matrix-spec-proposals/pull/4210): Remove legacy mentions. Contributed by @tulir @ Beeper. ([\#17783](https://github.com/element-hq/synapse/issues/17783))
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- Fix saving of PNG thumbnails, when the original image is in the CMYK color space. ([\#17736](https://github.com/element-hq/synapse/issues/17736))
|
||||
- Fix bug with sliding sync where the server would not return state that was added to the `required_state` config. ([\#17785](https://github.com/element-hq/synapse/issues/17785), [\#17805](https://github.com/element-hq/synapse/issues/17805))
|
||||
- Fix a bug in [MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) Sliding Sync that would cause rooms to stay forgotten and hidden even after rejoining. ([\#17835](https://github.com/element-hq/synapse/issues/17835))
|
||||
|
||||
### Improved Documentation
|
||||
|
||||
- Clarify when the `user_may_invite` and `user_may_send_3pid_invite` module callbacks are called. ([\#17627](https://github.com/element-hq/synapse/issues/17627))
|
||||
- Correct documentation to refer to the `--config-path` argument instead of `--config-file`. ([\#17802](https://github.com/element-hq/synapse/issues/17802))
|
||||
- Fix typo in `target_cache_memory_usage` docs. ([\#17825](https://github.com/element-hq/synapse/issues/17825))
|
||||
|
||||
### Internal Changes
|
||||
|
||||
- Slight optimization when fetching state/events for Sliding Sync. ([\#17718](https://github.com/element-hq/synapse/issues/17718))
|
||||
- Add Python 3.13 and Postgres 17 to the test matrix. ([\#17752](https://github.com/element-hq/synapse/issues/17752))
|
||||
- Test github token before running release script steps. ([\#17803](https://github.com/element-hq/synapse/issues/17803))
|
||||
- Build debian packages for new Ubuntu versions, and stop building for no longer supported versions. ([\#17824](https://github.com/element-hq/synapse/issues/17824))
|
||||
- Enable the `.org.matrix.msc4028.encrypted_event` push rule by default in accordance with [MSC4028](https://github.com/matrix-org/matrix-spec-proposals/pull/4028). Note that the corresponding experimental feature must still be switched on for this push rule to have any effect. ([\#17826](https://github.com/element-hq/synapse/issues/17826))
|
||||
- Fix some typing issues uncovered by upgrading mypy to 1.11.x. ([\#17842](https://github.com/element-hq/synapse/issues/17842))
|
||||
|
||||
|
||||
|
||||
### Updates to locked dependencies
|
||||
|
||||
* Bump mypy from 1.10.1 to 1.11.2. ([\#17842](https://github.com/element-hq/synapse/issues/17842))
|
||||
* Bump mypy-zope from 1.0.5 to 1.0.7. ([\#17827](https://github.com/element-hq/synapse/issues/17827))
|
||||
* Bump phonenumbers from 8.13.46 to 8.13.47. ([\#17797](https://github.com/element-hq/synapse/issues/17797))
|
||||
* Bump psycopg2 from 2.9.9 to 2.9.10. ([\#17843](https://github.com/element-hq/synapse/issues/17843))
|
||||
* Bump ruff from 0.6.8 to 0.6.9. ([\#17794](https://github.com/element-hq/synapse/issues/17794))
|
||||
* Bump sentry-sdk from 2.14.0 to 2.15.0. ([\#17795](https://github.com/element-hq/synapse/issues/17795))
|
||||
* Bump sentry-sdk from 2.15.0 to 2.16.0. ([\#17829](https://github.com/element-hq/synapse/issues/17829))
|
||||
* Bump sentry-sdk from 2.16.0 to 2.17.0. ([\#17844](https://github.com/element-hq/synapse/issues/17844))
|
||||
* Bump sigstore/cosign-installer from 3.6.0 to 3.7.0. ([\#17798](https://github.com/element-hq/synapse/issues/17798))
|
||||
* Bump tomli from 2.0.1 to 2.0.2. ([\#17796](https://github.com/element-hq/synapse/issues/17796))
|
||||
* Bump types-requests from 2.32.0.20240914 to 2.32.0.20241016. ([\#17841](https://github.com/element-hq/synapse/issues/17841))
|
||||
* Bump types-setuptools from 75.1.0.20240917 to 75.1.0.20241014. ([\#17828](https://github.com/element-hq/synapse/issues/17828))
|
||||
|
||||
# Synapse 1.117.0 (2024-10-15)
|
||||
|
||||
No significant changes since 1.117.0rc1.
|
||||
|
||||
|
||||
|
||||
|
||||
# Synapse 1.117.0rc1 (2024-10-08)
|
||||
|
||||
### Features
|
||||
|
||||
- Add config option `redis.password_path`. ([\#17717](https://github.com/element-hq/synapse/issues/17717))
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- Fix a rare bug introduced in v1.29.0 where invalidating a user's access token from a worker could raise an error. ([\#17779](https://github.com/element-hq/synapse/issues/17779))
|
||||
- In the response to `GET /_matrix/client/versions`, set the `unstable_features` flag for [MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140) to `false` when server configuration disables support for delayed events. ([\#17780](https://github.com/element-hq/synapse/issues/17780))
|
||||
- Improve input validation and room membership checks in admin redaction API. ([\#17792](https://github.com/element-hq/synapse/issues/17792))
|
||||
|
||||
### Improved Documentation
|
||||
|
||||
- Clarify the docstring of `test_forget_when_not_left`. ([\#17628](https://github.com/element-hq/synapse/issues/17628))
|
||||
- Add documentation note about PYTHONMALLOC for accurate jemalloc memory tracking. Contributed by @hensg. ([\#17709](https://github.com/element-hq/synapse/issues/17709))
|
||||
- Remove spurious "TODO UPDATE ALL THIS" note in the Debian installation docs. ([\#17749](https://github.com/element-hq/synapse/issues/17749))
|
||||
- Explain how load balancing works for `federation_sender_instances`. ([\#17776](https://github.com/element-hq/synapse/issues/17776))
|
||||
|
||||
### Internal Changes
|
||||
|
||||
- Minor performance increase for large accounts using sliding sync. ([\#17751](https://github.com/element-hq/synapse/issues/17751))
|
||||
- Increase performance of the notifier when there are many syncing users. ([\#17765](https://github.com/element-hq/synapse/issues/17765), [\#17766](https://github.com/element-hq/synapse/issues/17766))
|
||||
- Fix performance of streams that don't change often. ([\#17767](https://github.com/element-hq/synapse/issues/17767))
|
||||
- Improve performance of sliding sync connections that do not ask for any rooms. ([\#17768](https://github.com/element-hq/synapse/issues/17768))
|
||||
- Reduce overhead of sliding sync E2EE loops. ([\#17771](https://github.com/element-hq/synapse/issues/17771))
|
||||
- Sliding sync minor performance speed up using new table. ([\#17787](https://github.com/element-hq/synapse/issues/17787))
|
||||
- Sliding sync minor performance improvement by omitting unchanged data from incremental responses. ([\#17788](https://github.com/element-hq/synapse/issues/17788))
|
||||
- Speed up sliding sync when there are many active subscriptions. ([\#17789](https://github.com/element-hq/synapse/issues/17789))
|
||||
- Add missing license headers on new source files. ([\#17799](https://github.com/element-hq/synapse/issues/17799))
|
||||
|
||||
|
||||
|
||||
### Updates to locked dependencies
|
||||
|
||||
* Bump phonenumbers from 8.13.45 to 8.13.46. ([\#17773](https://github.com/element-hq/synapse/issues/17773))
|
||||
* Bump python-multipart from 0.0.10 to 0.0.12. ([\#17772](https://github.com/element-hq/synapse/issues/17772))
|
||||
* Bump regex from 1.10.6 to 1.11.0. ([\#17770](https://github.com/element-hq/synapse/issues/17770))
|
||||
* Bump ruff from 0.6.7 to 0.6.8. ([\#17774](https://github.com/element-hq/synapse/issues/17774))
|
||||
|
||||
# Synapse 1.116.0 (2024-10-01)
|
||||
|
||||
No significant changes since 1.116.0rc2.
|
||||
|
||||
|
||||
|
||||
|
||||
# Synapse 1.116.0rc2 (2024-09-26)
|
||||
|
||||
### Features
|
||||
|
||||
- Add implementation of restricting who can overwrite a state event as proposed by [MSC3757](https://github.com/matrix-org/matrix-spec-proposals/pull/3757). ([\#17513](https://github.com/element-hq/synapse/issues/17513))
|
||||
|
||||
|
||||
|
||||
|
||||
# Synapse 1.116.0rc1 (2024-09-25)
|
||||
|
||||
### Features
|
||||
|
||||
- Add initial implementation of delayed events as proposed by [MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140). ([\#17326](https://github.com/element-hq/synapse/issues/17326))
|
||||
- Add an asynchronous Admin API endpoint [to redact all a user's events](https://element-hq.github.io/synapse/v1.116/admin_api/user_admin_api.html#redact-all-the-events-of-a-user),
|
||||
and [an endpoint to check on the status of that redaction task](https://element-hq.github.io/synapse/v1.116/admin_api/user_admin_api.html#check-the-status-of-a-redaction-process). ([\#17506](https://github.com/element-hq/synapse/issues/17506))
|
||||
- Add support for the `tags` and `not_tags` filters for [MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) Sliding Sync. ([\#17662](https://github.com/element-hq/synapse/issues/17662))
|
||||
- Guests can use the new media endpoints to download media, as described by [MSC4189](https://github.com/matrix-org/matrix-spec-proposals/pull/4189). ([\#17675](https://github.com/element-hq/synapse/issues/17675))
|
||||
- Add config option `turn_shared_secret_path`. ([\#17690](https://github.com/element-hq/synapse/issues/17690))
|
||||
- Return room tags in [MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) Sliding Sync account data extension. ([\#17707](https://github.com/element-hq/synapse/issues/17707))
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- Make sure we get up-to-date state information when using the new [MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) Sliding Sync tables to derive room membership. ([\#17692](https://github.com/element-hq/synapse/issues/17692))
|
||||
- Fix bug where room account data would not correctly be sent down [MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) Sliding Sync for old rooms. ([\#17695](https://github.com/element-hq/synapse/issues/17695))
|
||||
- Fix a bug in [MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) Sliding Sync which could prevent /sync from working for certain user accounts. ([\#17727](https://github.com/element-hq/synapse/issues/17727), [\#17733](https://github.com/element-hq/synapse/issues/17733))
|
||||
- Ignore invites from ignored users in Sliding Sync. ([\#17729](https://github.com/element-hq/synapse/issues/17729))
|
||||
- Fix bug in [MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) Sliding Sync where the server would incorrectly return a negative bump stamp, which caused Element X apps to stop syncing. ([\#17748](https://github.com/element-hq/synapse/issues/17748))
|
||||
|
||||
### Internal Changes
|
||||
|
||||
- Import pydantic objects from the `_pydantic_compat` module.
|
||||
This allows `check_pydantic_models.py` to mock those pydantic objects
|
||||
only in the synapse module, and not interfere with pydantic objects in
|
||||
external dependencies. ([\#17667](https://github.com/element-hq/synapse/issues/17667))
|
||||
- Use [MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) Sliding Sync tables as a bulk shortcut for getting the max `event_stream_ordering` of rooms. ([\#17693](https://github.com/element-hq/synapse/issues/17693))
|
||||
- Speed up [MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) sliding sync requests a bit where there are many room changes. ([\#17696](https://github.com/element-hq/synapse/issues/17696))
|
||||
- Refactor [MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) sliding sync filter unit tests so the sliding sync API has better test coverage. ([\#17703](https://github.com/element-hq/synapse/issues/17703))
|
||||
- Fetch `bump_stamp`s more efficiently in [MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) Sliding Sync. ([\#17723](https://github.com/element-hq/synapse/issues/17723))
|
||||
- Shortcut for checking if certain background updates have completed (utilized in [MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) Sliding Sync). ([\#17724](https://github.com/element-hq/synapse/issues/17724))
|
||||
- More efficiently fetch rooms for [MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) Sliding Sync. ([\#17725](https://github.com/element-hq/synapse/issues/17725))
|
||||
- Fix `_bulk_get_max_event_pos` being inefficient. ([\#17728](https://github.com/element-hq/synapse/issues/17728))
|
||||
- Add cache to `get_tags_for_room(...)`. ([\#17730](https://github.com/element-hq/synapse/issues/17730))
|
||||
- Small performance improvement in speeding up [MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) Sliding Sync. ([\#17731](https://github.com/element-hq/synapse/issues/17731))
|
||||
- Minor speed up of initial [MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) sliding sync requests. ([\#17734](https://github.com/element-hq/synapse/issues/17734))
|
||||
- Remove usage of the deprecated `cgi` module, deprecated in Python 3.11 and removed in Python 3.13. ([\#17741](https://github.com/element-hq/synapse/issues/17741))
|
||||
- Fix typing of a variable that is not `Unknown` anymore after updating `treq`. ([\#17744](https://github.com/element-hq/synapse/issues/17744))
|
||||
|
||||
|
||||
|
||||
### Updates to locked dependencies
|
||||
|
||||
* Bump anyhow from 1.0.86 to 1.0.89. ([\#17685](https://github.com/element-hq/synapse/issues/17685), [\#17716](https://github.com/element-hq/synapse/issues/17716))
|
||||
* Bump bytes from 1.7.1 to 1.7.2. ([\#17743](https://github.com/element-hq/synapse/issues/17743))
|
||||
* Bump cryptography from 43.0.0 to 43.0.1. ([\#17689](https://github.com/element-hq/synapse/issues/17689))
|
||||
* Bump idna from 3.8 to 3.10. ([\#17758](https://github.com/element-hq/synapse/issues/17758))
|
||||
* Bump msgpack from 1.0.8 to 1.1.0. ([\#17759](https://github.com/element-hq/synapse/issues/17759))
|
||||
* Bump phonenumbers from 8.13.44 to 8.13.45. ([\#17762](https://github.com/element-hq/synapse/issues/17762))
|
||||
* Bump prometheus-client from 0.20.0 to 0.21.0. ([\#17746](https://github.com/element-hq/synapse/issues/17746))
|
||||
* Bump pyasn1 from 0.6.0 to 0.6.1. ([\#17714](https://github.com/element-hq/synapse/issues/17714))
|
||||
* Bump pyasn1-modules from 0.4.0 to 0.4.1. ([\#17747](https://github.com/element-hq/synapse/issues/17747))
|
||||
* Bump pydantic from 2.8.2 to 2.9.2. ([\#17756](https://github.com/element-hq/synapse/issues/17756))
|
||||
* Bump python-multipart from 0.0.9 to 0.0.10. ([\#17745](https://github.com/element-hq/synapse/issues/17745))
|
||||
* Bump ruff from 0.6.4 to 0.6.7. ([\#17715](https://github.com/element-hq/synapse/issues/17715), [\#17760](https://github.com/element-hq/synapse/issues/17760))
|
||||
* Bump sentry-sdk from 2.13.0 to 2.14.0. ([\#17712](https://github.com/element-hq/synapse/issues/17712))
|
||||
* Bump serde from 1.0.209 to 1.0.210. ([\#17686](https://github.com/element-hq/synapse/issues/17686))
|
||||
* Bump serde_json from 1.0.127 to 1.0.128. ([\#17687](https://github.com/element-hq/synapse/issues/17687))
|
||||
* Bump treq from 23.11.0 to 24.9.1. ([\#17744](https://github.com/element-hq/synapse/issues/17744))
|
||||
* Bump types-pyyaml from 6.0.12.20240808 to 6.0.12.20240917. ([\#17755](https://github.com/element-hq/synapse/issues/17755))
|
||||
* Bump types-requests from 2.32.0.20240712 to 2.32.0.20240914. ([\#17713](https://github.com/element-hq/synapse/issues/17713))
|
||||
* Bump types-setuptools from 74.1.0.20240907 to 75.1.0.20240917. ([\#17757](https://github.com/element-hq/synapse/issues/17757))
|
||||
|
||||
# Synapse 1.115.0 (2024-09-17)
|
||||
|
||||
No significant changes since 1.115.0rc2.
|
||||
|
||||
|
||||
|
||||
|
||||
# Synapse 1.115.0rc2 (2024-09-12)
|
||||
|
||||
### Internal Changes
|
||||
|
||||
- Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting. ([\#17652](https://github.com/element-hq/synapse/issues/17652))
|
||||
- Speed up sliding sync by reducing amount of data pulled out of the database for large rooms. ([\#17683](https://github.com/element-hq/synapse/issues/17683))
|
||||
|
||||
|
||||
|
||||
|
||||
# Synapse 1.115.0rc1 (2024-09-10)
|
||||
|
||||
### Features
|
||||
|
||||
- Improve cross-signing upload when using [MSC3861](https://github.com/matrix-org/matrix-spec-proposals/pull/3861) to use a custom UIA flow stage, with web fallback support. ([\#17509](https://github.com/element-hq/synapse/issues/17509))
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- Return `400 M_BAD_JSON` upon attempting to complete various room actions with a non-local user ID and unknown room ID, rather than an internal server error. ([\#17607](https://github.com/element-hq/synapse/issues/17607))
|
||||
- Fix authenticated media responses using a wrong limit when following redirects over federation. ([\#17626](https://github.com/element-hq/synapse/issues/17626))
|
||||
- Fix bug where we returned the wrong `bump_stamp` for invites in sliding sync response, causing incorrect ordering of invites in the room list. ([\#17674](https://github.com/element-hq/synapse/issues/17674))
|
||||
|
||||
### Improved Documentation
|
||||
|
||||
- Clarify that the admin api resource is only loaded on the main process and not workers. ([\#17590](https://github.com/element-hq/synapse/issues/17590))
|
||||
- Fixed typo in `saml2_config` config [example](https://element-hq.github.io/synapse/latest/usage/configuration/config_documentation.html#saml2_config). ([\#17594](https://github.com/element-hq/synapse/issues/17594))
|
||||
|
||||
### Deprecations and Removals
|
||||
|
||||
- Stabilise [MSC4156](https://github.com/matrix-org/matrix-spec-proposals/pull/4156) by removing the `msc4156_enabled` config setting and defaulting it to `true`. ([\#17650](https://github.com/element-hq/synapse/issues/17650))
|
||||
|
||||
### Internal Changes
|
||||
|
||||
- Update [MSC3861](https://github.com/matrix-org/matrix-spec-proposals/pull/3861) implementation: load the issuer and account management URLs from OIDC discovery. ([\#17407](https://github.com/element-hq/synapse/issues/17407))
|
||||
- Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting. ([\#17512](https://github.com/element-hq/synapse/issues/17512), [\#17632](https://github.com/element-hq/synapse/issues/17632), [\#17633](https://github.com/element-hq/synapse/issues/17633), [\#17634](https://github.com/element-hq/synapse/issues/17634), [\#17635](https://github.com/element-hq/synapse/issues/17635), [\#17636](https://github.com/element-hq/synapse/issues/17636), [\#17641](https://github.com/element-hq/synapse/issues/17641), [\#17654](https://github.com/element-hq/synapse/issues/17654), [\#17673](https://github.com/element-hq/synapse/issues/17673))
|
||||
- Store sliding sync per-connection state in the database. ([\#17599](https://github.com/element-hq/synapse/issues/17599), [\#17631](https://github.com/element-hq/synapse/issues/17631))
|
||||
- Make the sliding sync `PerConnectionState` class immutable. ([\#17600](https://github.com/element-hq/synapse/issues/17600))
|
||||
- Replace `isort` and `black` with `ruff`. ([\#17620](https://github.com/element-hq/synapse/issues/17620), [\#17643](https://github.com/element-hq/synapse/issues/17643))
|
||||
- Sliding Sync: Split up `get_room_membership_for_user_at_to_token`. ([\#17629](https://github.com/element-hq/synapse/issues/17629))
|
||||
- Use new database tables for sliding sync. ([\#17630](https://github.com/element-hq/synapse/issues/17630), [\#17649](https://github.com/element-hq/synapse/issues/17649))
|
||||
- Prevent duplicate tags being added to Sliding Sync traces. ([\#17655](https://github.com/element-hq/synapse/issues/17655))
|
||||
- Get `bump_stamp` from [new sliding sync tables](https://github.com/element-hq/synapse/pull/17512) which should be faster. ([\#17658](https://github.com/element-hq/synapse/issues/17658))
|
||||
- Speed up incremental Sliding Sync requests by avoiding extra work. ([\#17665](https://github.com/element-hq/synapse/issues/17665))
|
||||
- Small performance improvement in speeding up sliding sync. ([\#17666](https://github.com/element-hq/synapse/issues/17666), [\#17670](https://github.com/element-hq/synapse/issues/17670), [\#17672](https://github.com/element-hq/synapse/issues/17672))
|
||||
- Speed up sliding sync by reducing number of database calls. ([\#17684](https://github.com/element-hq/synapse/issues/17684))
|
||||
- Speed up sync by pulling out fewer events from the database. ([\#17688](https://github.com/element-hq/synapse/issues/17688))
|
||||
|
||||
|
||||
|
||||
### Updates to locked dependencies
|
||||
|
||||
* Bump authlib from 1.3.1 to 1.3.2. ([\#17679](https://github.com/element-hq/synapse/issues/17679))
|
||||
* Bump idna from 3.7 to 3.8. ([\#17682](https://github.com/element-hq/synapse/issues/17682))
|
||||
* Bump ruff from 0.6.2 to 0.6.4. ([\#17680](https://github.com/element-hq/synapse/issues/17680))
|
||||
* Bump towncrier from 24.7.1 to 24.8.0. ([\#17645](https://github.com/element-hq/synapse/issues/17645))
|
||||
* Bump twisted from 24.7.0rc1 to 24.7.0. ([\#17647](https://github.com/element-hq/synapse/issues/17647))
|
||||
* Bump types-pillow from 10.2.0.20240520 to 10.2.0.20240822. ([\#17644](https://github.com/element-hq/synapse/issues/17644))
|
||||
* Bump types-psycopg2 from 2.9.21.20240417 to 2.9.21.20240819. ([\#17646](https://github.com/element-hq/synapse/issues/17646))
|
||||
* Bump types-setuptools from 71.1.0.20240818 to 74.1.0.20240907. ([\#17681](https://github.com/element-hq/synapse/issues/17681))
|
||||
|
||||
# Synapse 1.114.0 (2024-09-02)
|
||||
|
||||
This release enables support for
|
||||
|
||||
32
Cargo.lock
generated
32
Cargo.lock
generated
@@ -13,9 +13,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.86"
|
||||
version = "1.0.90"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
|
||||
checksum = "37bf3594c4c988a53154954629820791dde498571819ae4ca50ca811e060cc95"
|
||||
|
||||
[[package]]
|
||||
name = "arc-swap"
|
||||
@@ -67,9 +67,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c"
|
||||
|
||||
[[package]]
|
||||
name = "bytes"
|
||||
version = "1.7.1"
|
||||
version = "1.7.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50"
|
||||
checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3"
|
||||
|
||||
[[package]]
|
||||
name = "cfg-if"
|
||||
@@ -444,9 +444,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "regex"
|
||||
version = "1.10.6"
|
||||
version = "1.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619"
|
||||
checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
@@ -456,9 +456,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "regex-automata"
|
||||
version = "0.4.6"
|
||||
version = "0.4.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea"
|
||||
checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
@@ -467,9 +467,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "regex-syntax"
|
||||
version = "0.8.3"
|
||||
version = "0.8.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56"
|
||||
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
|
||||
|
||||
[[package]]
|
||||
name = "ryu"
|
||||
@@ -485,18 +485,18 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.209"
|
||||
version = "1.0.210"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "99fce0ffe7310761ca6bf9faf5115afbc19688edd00171d81b1bb1b116c63e09"
|
||||
checksum = "c8e3592472072e6e22e0a54d5904d9febf8508f65fb8552499a1abc7d1078c3a"
|
||||
dependencies = [
|
||||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.209"
|
||||
version = "1.0.210"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a5831b979fd7b5439637af1752d535ff49f4860c0f341d1baeb6faf0f4242170"
|
||||
checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -505,9 +505,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde_json"
|
||||
version = "1.0.127"
|
||||
version = "1.0.132"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8043c06d9f82bd7271361ed64f415fe5e12a77fdb52e573e7f06a516dea329ad"
|
||||
checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03"
|
||||
dependencies = [
|
||||
"itoa",
|
||||
"memchr",
|
||||
|
||||
@@ -158,7 +158,7 @@ it:
|
||||
|
||||
We **strongly** recommend using a CAPTCHA, particularly if your homeserver is exposed to
|
||||
the public internet. Without it, anyone can freely register accounts on your homeserver.
|
||||
This can be exploited by attackers to create spambots targetting the rest of the Matrix
|
||||
This can be exploited by attackers to create spambots targeting the rest of the Matrix
|
||||
federation.
|
||||
|
||||
Your new user name will be formed partly from the ``server_name``, and partly
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
MSC3861: load the issuer and account management URLs from OIDC discovery.
|
||||
@@ -1 +0,0 @@
|
||||
Improve cross-signing upload when using [MSC3861](https://github.com/matrix-org/matrix-spec-proposals/pull/3861) to use a custom UIA flow stage, with web fallback support.
|
||||
@@ -1 +0,0 @@
|
||||
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.
|
||||
@@ -1 +0,0 @@
|
||||
Clarify that the admin api resource is only loaded on the main process and not workers.
|
||||
@@ -1 +0,0 @@
|
||||
Fixed typo in `saml2_config` config [example](https://element-hq.github.io/synapse/latest/usage/configuration/config_documentation.html#saml2_config).
|
||||
@@ -1 +0,0 @@
|
||||
Store sliding sync per-connection state in the database.
|
||||
@@ -1 +0,0 @@
|
||||
Make the sliding sync `PerConnectionState` class immutable.
|
||||
@@ -1 +0,0 @@
|
||||
Return `400 M_BAD_JSON` upon attempting to complete various room actions with a non-local user ID and unknown room ID, rather than an internal server error.
|
||||
@@ -1 +0,0 @@
|
||||
Replace `isort` and `black with `ruff`.
|
||||
@@ -1 +0,0 @@
|
||||
Fix authenticated media responses using a wrong limit when following redirects over federation.
|
||||
@@ -1 +0,0 @@
|
||||
Sliding Sync: Split up `get_room_membership_for_user_at_to_token`.
|
||||
@@ -1 +0,0 @@
|
||||
Use new database tables for sliding sync.
|
||||
@@ -1 +0,0 @@
|
||||
Store sliding sync per-connection state in the database.
|
||||
@@ -1 +0,0 @@
|
||||
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.
|
||||
@@ -1 +0,0 @@
|
||||
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.
|
||||
@@ -1 +0,0 @@
|
||||
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.
|
||||
@@ -1 +0,0 @@
|
||||
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.
|
||||
@@ -1 +0,0 @@
|
||||
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.
|
||||
@@ -1 +0,0 @@
|
||||
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.
|
||||
@@ -1 +0,0 @@
|
||||
Replace `isort` and `black with `ruff`.
|
||||
@@ -1 +0,0 @@
|
||||
Use new database tables for sliding sync.
|
||||
@@ -1 +0,0 @@
|
||||
Stabilise [MSC4156](https://github.com/matrix-org/matrix-spec-proposals/pull/4156) by removing the `msc4156_enabled` config setting and defaulting it to `true`.
|
||||
@@ -1 +0,0 @@
|
||||
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.
|
||||
@@ -1 +0,0 @@
|
||||
Prevent duplicate tags being added to Sliding Sync traces.
|
||||
@@ -1 +0,0 @@
|
||||
Get `bump_stamp` from [new sliding sync tables](https://github.com/element-hq/synapse/pull/17512) which should be faster.
|
||||
@@ -1 +0,0 @@
|
||||
Speed up incremental Sliding Sync requests by avoiding extra work.
|
||||
@@ -1 +0,0 @@
|
||||
Small performance improvement in speeding up sliding sync.
|
||||
@@ -1 +0,0 @@
|
||||
Small performance improvement in speeding up sliding sync.
|
||||
@@ -1 +0,0 @@
|
||||
Small performance improvement in speeding up sliding sync.
|
||||
@@ -1 +0,0 @@
|
||||
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.
|
||||
@@ -1 +0,0 @@
|
||||
Fix bug where we returned the wrong `bump_stamp` for invites in sliding sync response, causing incorrect ordering of invites in the room list.
|
||||
@@ -1 +0,0 @@
|
||||
Speed up sliding sync by reducing number of database calls.
|
||||
@@ -1 +0,0 @@
|
||||
Speed up sync by pulling out fewer events from the database.
|
||||
1
changelog.d/17786.misc
Normal file
1
changelog.d/17786.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add a test for downloading and thumbnailing a CMYK JPEG.
|
||||
1
changelog.d/17830.misc
Normal file
1
changelog.d/17830.misc
Normal file
@@ -0,0 +1 @@
|
||||
Include the destination in the error of 'Destination mismatch' on federation requests.
|
||||
2
changelog.d/17847.bugfix
Normal file
2
changelog.d/17847.bugfix
Normal file
@@ -0,0 +1,2 @@
|
||||
Fix a bug in the admin redact endpoint where the background task would not run if a worker was specified in
|
||||
the config option `run_background_tasks_on`.
|
||||
1
changelog.d/17861.bugfix
Normal file
1
changelog.d/17861.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix detection when the built Rust library was outdated when using source installations.
|
||||
1
changelog.d/17887.misc
Normal file
1
changelog.d/17887.misc
Normal file
@@ -0,0 +1 @@
|
||||
Bump the default Python version in the Synapse Dockerfile from 3.11 -> 3.12.
|
||||
@@ -20,8 +20,8 @@
|
||||
#
|
||||
|
||||
import argparse
|
||||
import cgi
|
||||
import datetime
|
||||
import html
|
||||
import json
|
||||
import urllib.request
|
||||
from typing import List
|
||||
@@ -85,7 +85,7 @@ def make_graph(pdus: List[dict], filename_prefix: str) -> None:
|
||||
"name": name,
|
||||
"type": pdu.get("pdu_type"),
|
||||
"state_key": pdu.get("state_key"),
|
||||
"content": cgi.escape(json.dumps(pdu.get("content")), quote=True),
|
||||
"content": html.escape(json.dumps(pdu.get("content")), quote=True),
|
||||
"time": t,
|
||||
"depth": pdu.get("depth"),
|
||||
}
|
||||
|
||||
60
debian/changelog
vendored
60
debian/changelog
vendored
@@ -1,3 +1,63 @@
|
||||
matrix-synapse-py3 (1.118.0) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.118.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 29 Oct 2024 15:29:53 +0100
|
||||
|
||||
matrix-synapse-py3 (1.118.0~rc1) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.118.0rc1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 22 Oct 2024 11:48:14 +0100
|
||||
|
||||
matrix-synapse-py3 (1.117.0) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.117.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 15 Oct 2024 10:46:30 +0100
|
||||
|
||||
matrix-synapse-py3 (1.117.0~rc1) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.117.0rc1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 08 Oct 2024 14:37:11 +0100
|
||||
|
||||
matrix-synapse-py3 (1.116.0) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.116.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 01 Oct 2024 11:14:07 +0100
|
||||
|
||||
matrix-synapse-py3 (1.116.0~rc2) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.116.0rc2.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Thu, 26 Sep 2024 13:28:43 +0000
|
||||
|
||||
matrix-synapse-py3 (1.116.0~rc1) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.116.0rc1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Wed, 25 Sep 2024 09:34:07 +0000
|
||||
|
||||
matrix-synapse-py3 (1.115.0) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.115.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 17 Sep 2024 14:32:10 +0100
|
||||
|
||||
matrix-synapse-py3 (1.115.0~rc2) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.115.0rc2.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Thu, 12 Sep 2024 11:10:15 +0100
|
||||
|
||||
matrix-synapse-py3 (1.115.0~rc1) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.115.0rc1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 10 Sep 2024 08:39:09 -0600
|
||||
|
||||
matrix-synapse-py3 (1.114.0) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.114.0.
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
# `poetry export | pip install -r /dev/stdin`, but beware: we have experienced bugs in
|
||||
# in `poetry export` in the past.
|
||||
|
||||
ARG PYTHON_VERSION=3.11
|
||||
ARG PYTHON_VERSION=3.12
|
||||
|
||||
###
|
||||
### Stage 0: generate requirements.txt
|
||||
|
||||
@@ -111,6 +111,9 @@ server_notices:
|
||||
system_mxid_avatar_url: ""
|
||||
room_name: "Server Alert"
|
||||
|
||||
# Enable delayed events (msc4140)
|
||||
max_event_delay_duration: 24h
|
||||
|
||||
|
||||
# Disable sync cache so that initial `/sync` requests are up-to-date.
|
||||
caches:
|
||||
|
||||
@@ -1361,3 +1361,86 @@ Returns a `404` HTTP status code if no user was found, with a response body like
|
||||
```
|
||||
|
||||
_Added in Synapse 1.72.0._
|
||||
|
||||
|
||||
## Redact all the events of a user
|
||||
|
||||
This endpoint allows an admin to redact the events of a given user. There are no restrictions on redactions for a
|
||||
local user. By default, we puppet the user who sent the message to redact it themselves. Redactions for non-local users are issued using the admin user, and will fail in rooms where the admin user is not admin/does not have the specified power level to issue redactions.
|
||||
|
||||
The API is
|
||||
```
|
||||
POST /_synapse/admin/v1/user/$user_id/redact
|
||||
|
||||
{
|
||||
"rooms": ["!roomid1", "!roomid2"]
|
||||
}
|
||||
```
|
||||
If an empty list is provided as the key for `rooms`, all events in all the rooms the user is member of will be redacted,
|
||||
otherwise all the events in the rooms provided in the request will be redacted.
|
||||
|
||||
The API starts redaction process running, and returns immediately with a JSON body with
|
||||
a redact id which can be used to query the status of the redaction process:
|
||||
|
||||
```json
|
||||
{
|
||||
"redact_id": "<opaque id>"
|
||||
}
|
||||
```
|
||||
|
||||
**Parameters**
|
||||
|
||||
The following parameters should be set in the URL:
|
||||
|
||||
- `user_id` - The fully qualified MXID of the user: for example, `@user:server.com`.
|
||||
|
||||
The following JSON body parameter must be provided:
|
||||
|
||||
- `rooms` - A list of rooms to redact the user's events in. If an empty list is provided all events in all rooms
|
||||
the user is a member of will be redacted
|
||||
|
||||
_Added in Synapse 1.116.0._
|
||||
|
||||
The following JSON body parameters are optional:
|
||||
|
||||
- `reason` - Reason the redaction is being requested, ie "spam", "abuse", etc. This will be included in each redaction event, and be visible to users.
|
||||
- `limit` - a limit on the number of the user's events to search for ones that can be redacted (events are redacted newest to oldest) in each room, defaults to 1000 if not provided
|
||||
|
||||
|
||||
## Check the status of a redaction process
|
||||
|
||||
It is possible to query the status of the background task for redacting a user's events.
|
||||
The status can be queried up to 24 hours after completion of the task,
|
||||
or until Synapse is restarted (whichever happens first).
|
||||
|
||||
The API is:
|
||||
|
||||
```
|
||||
GET /_synapse/admin/v1/user/redact_status/$redact_id
|
||||
```
|
||||
|
||||
A response body like the following is returned:
|
||||
|
||||
```
|
||||
{
|
||||
"status": "active",
|
||||
"failed_redactions": [],
|
||||
}
|
||||
```
|
||||
|
||||
**Parameters**
|
||||
|
||||
The following parameters should be set in the URL:
|
||||
|
||||
* `redact_id` - string - The ID for this redaction process, provided when the redaction was requested.
|
||||
|
||||
|
||||
**Response**
|
||||
|
||||
The following fields are returned in the JSON response body:
|
||||
|
||||
- `status` - string - one of scheduled/active/completed/failed, indicating the status of the redaction job
|
||||
- `failed_redactions` - dictionary - the keys of the dict are event ids the process was unable to redact, if any, and the values are
|
||||
the corresponding error that caused the redaction to fail
|
||||
|
||||
_Added in Synapse 1.116.0._
|
||||
@@ -76,8 +76,9 @@ _Changed in Synapse v1.62.0: `synapse.module_api.NOT_SPAM` and `synapse.module_a
|
||||
async def user_may_invite(inviter: str, invitee: str, room_id: str) -> Union["synapse.module_api.NOT_SPAM", "synapse.module_api.errors.Codes", bool]
|
||||
```
|
||||
|
||||
Called when processing an invitation. Both inviter and invitee are
|
||||
represented by their Matrix user ID (e.g. `@alice:example.com`).
|
||||
Called when processing an invitation, both when one is created locally or when
|
||||
receiving an invite over federation. Both inviter and invitee are represented by
|
||||
their Matrix user ID (e.g. `@alice:example.com`).
|
||||
|
||||
|
||||
The callback must return one of:
|
||||
@@ -112,7 +113,9 @@ async def user_may_send_3pid_invite(
|
||||
```
|
||||
|
||||
Called when processing an invitation using a third-party identifier (also called a 3PID,
|
||||
e.g. an email address or a phone number).
|
||||
e.g. an email address or a phone number). It is only called when a 3PID invite is created
|
||||
locally - not when one is received in a room over federation. If the 3PID is already associated
|
||||
with a Matrix ID, the spam check will go through the `user_may_invite` callback instead.
|
||||
|
||||
The inviter is represented by their Matrix user ID (e.g. `@alice:example.com`), and the
|
||||
invitee is represented by its medium (e.g. "email") and its address
|
||||
|
||||
@@ -52,8 +52,6 @@ architecture via <https://packages.matrix.org/debian/>.
|
||||
|
||||
To install the latest release:
|
||||
|
||||
TODO UPDATE ALL THIS
|
||||
|
||||
```sh
|
||||
sudo apt install -y lsb-release wget apt-transport-https
|
||||
sudo wget -O /usr/share/keyrings/matrix-org-archive-keyring.gpg https://packages.matrix.org/debian/matrix-org-archive-keyring.gpg
|
||||
@@ -316,7 +314,7 @@ sudo dnf group install "Development Tools"
|
||||
|
||||
*Note: The term "RHEL" below refers to both Red Hat Enterprise Linux and Rocky Linux. The distributions are 1:1 binary compatible.*
|
||||
|
||||
It's recommended to use the latest Python versions.
|
||||
It's recommended to use the latest Python versions.
|
||||
|
||||
RHEL 8 in particular ships with Python 3.6 by default which is EOL and therefore no longer supported by Synapse. RHEL 9 ship with Python 3.9 which is still supported by the Python core team as of this writing. However, newer Python versions provide significant performance improvements and they're available in official distributions' repositories. Therefore it's recommended to use them.
|
||||
|
||||
@@ -346,7 +344,7 @@ dnf install python3.12 python3.12-devel
|
||||
```
|
||||
Finally, install common prerequisites
|
||||
```bash
|
||||
dnf install libicu libicu-devel libpq5 libpq5-devel lz4 pkgconf
|
||||
dnf install libicu libicu-devel libpq5 libpq5-devel lz4 pkgconf
|
||||
dnf group install "Development Tools"
|
||||
```
|
||||
###### Using venv module instead of virtualenv command
|
||||
@@ -355,7 +353,7 @@ It's recommended to use Python venv module directly rather than the virtualenv c
|
||||
* On RHEL 9, virtualenv is only available on [EPEL](https://docs.fedoraproject.org/en-US/epel/).
|
||||
* On RHEL 8, virtualenv is based on Python 3.6. It does not support creating 3.11/3.12 virtual environments.
|
||||
|
||||
Here's an example of creating Python 3.12 virtual environment and installing Synapse from PyPI.
|
||||
Here's an example of creating Python 3.12 virtual environment and installing Synapse from PyPI.
|
||||
|
||||
```bash
|
||||
mkdir -p ~/synapse
|
||||
|
||||
@@ -255,6 +255,8 @@ line to `/etc/default/matrix-synapse`:
|
||||
|
||||
LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2
|
||||
|
||||
*Note*: You may need to set `PYTHONMALLOC=malloc` to ensure that `jemalloc` can accurately calculate memory usage. By default, Python uses its internal small-object allocator, which may interfere with jemalloc's ability to track memory consumption correctly. This could prevent the [cache_autotuning](../configuration/config_documentation.md#caches-and-associated-values) feature from functioning as expected, as the Python allocator may not reach the memory threshold set by `max_cache_memory_usage`, thus not triggering the cache eviction process.
|
||||
|
||||
This made a significant difference on Python 2.7 - it's unclear how
|
||||
much of an improvement it provides on Python 3.x.
|
||||
|
||||
|
||||
@@ -761,6 +761,19 @@ email:
|
||||
password_reset: "[%(server_name)s] Password reset"
|
||||
email_validation: "[%(server_name)s] Validate your email"
|
||||
```
|
||||
---
|
||||
### `max_event_delay_duration`
|
||||
|
||||
The maximum allowed duration by which sent events can be delayed, as per
|
||||
[MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140).
|
||||
Must be a positive value if set.
|
||||
|
||||
Defaults to no duration (`null`), which disallows sending delayed events.
|
||||
|
||||
Example configuration:
|
||||
```yaml
|
||||
max_event_delay_duration: 24h
|
||||
```
|
||||
|
||||
## Homeserver blocking
|
||||
Useful options for Synapse admins.
|
||||
@@ -1421,7 +1434,7 @@ number of entries that can be stored.
|
||||
Please see the [Config Conventions](#config-conventions) for information on how to specify memory size and cache expiry
|
||||
durations.
|
||||
* `max_cache_memory_usage` sets a ceiling on how much memory the cache can use before caches begin to be continuously evicted.
|
||||
They will continue to be evicted until the memory usage drops below the `target_memory_usage`, set in
|
||||
They will continue to be evicted until the memory usage drops below the `target_cache_memory_usage`, set in
|
||||
the setting below, or until the `min_cache_ttl` is hit. There is no default value for this option.
|
||||
* `target_cache_memory_usage` sets a rough target for the desired memory usage of the caches. There is no default value
|
||||
for this option.
|
||||
@@ -2315,6 +2328,22 @@ Example configuration:
|
||||
```yaml
|
||||
turn_shared_secret: "YOUR_SHARED_SECRET"
|
||||
```
|
||||
---
|
||||
### `turn_shared_secret_path`
|
||||
|
||||
An alternative to [`turn_shared_secret`](#turn_shared_secret):
|
||||
allows the shared secret to be specified in an external file.
|
||||
|
||||
The file should be a plain text file, containing only the shared secret.
|
||||
Synapse reads the shared secret from the given file once at startup.
|
||||
|
||||
Example configuration:
|
||||
```yaml
|
||||
turn_shared_secret_path: /path/to/secrets/file
|
||||
```
|
||||
|
||||
_Added in Synapse 1.116.0._
|
||||
|
||||
---
|
||||
### `turn_username` and `turn_password`
|
||||
|
||||
@@ -3693,6 +3722,8 @@ Additional sub-options for this setting include:
|
||||
Required if `enabled` is set to true.
|
||||
* `subject_claim`: Name of the claim containing a unique identifier for the user.
|
||||
Optional, defaults to `sub`.
|
||||
* `display_name_claim`: Name of the claim containing the display name for the user. Optional.
|
||||
If provided, the display name will be set to the value of this claim upon first login.
|
||||
* `issuer`: The issuer to validate the "iss" claim against. Optional. If provided the
|
||||
"iss" claim will be required and validated for all JSON web tokens.
|
||||
* `audiences`: A list of audiences to validate the "aud" claim against. Optional.
|
||||
@@ -3707,6 +3738,7 @@ jwt_config:
|
||||
secret: "provided-by-your-issuer"
|
||||
algorithm: "provided-by-your-issuer"
|
||||
subject_claim: "name_of_claim"
|
||||
display_name_claim: "name_of_claim"
|
||||
issuer: "provided-by-your-issuer"
|
||||
audiences:
|
||||
- "provided-by-your-issuer"
|
||||
@@ -4339,7 +4371,13 @@ It is possible to scale the processes that handle sending outbound federation re
|
||||
by running a [`generic_worker`](../../workers.md#synapseappgeneric_worker) and adding it's [`worker_name`](#worker_name) to
|
||||
a `federation_sender_instances` map. Doing so will remove handling of this function from
|
||||
the main process. Multiple workers can be added to this map, in which case the work is
|
||||
balanced across them.
|
||||
balanced across them.
|
||||
|
||||
The way that the load balancing works is any outbound federation request will be assigned
|
||||
to a federation sender worker based on the hash of the destination server name. This
|
||||
means that all requests being sent to the same destination will be processed by the same
|
||||
worker instance. Multiple `federation_sender_instances` are useful if there is a federation
|
||||
with multiple servers.
|
||||
|
||||
This configuration setting must be shared between all workers handling federation
|
||||
sending, and if changed all federation sender workers must be stopped at the same time
|
||||
@@ -4489,6 +4527,9 @@ This setting has the following sub-options:
|
||||
* `path`: The full path to a local Unix socket file. **If this is used, `host` and
|
||||
`port` are ignored.** Defaults to `/tmp/redis.sock'
|
||||
* `password`: Optional password if configured on the Redis instance.
|
||||
* `password_path`: Alternative to `password`, reading the password from an
|
||||
external file. The file should be a plain text file, containing only the
|
||||
password. Synapse reads the password from the given file once at startup.
|
||||
* `dbid`: Optional redis dbid if needs to connect to specific redis logical db.
|
||||
* `use_tls`: Whether to use tls connection. Defaults to false.
|
||||
* `certificate_file`: Optional path to the certificate file
|
||||
@@ -4502,13 +4543,16 @@ This setting has the following sub-options:
|
||||
|
||||
_Changed in Synapse 1.85.0: Added path option to use a local Unix socket_
|
||||
|
||||
_Changed in Synapse 1.116.0: Added password\_path_
|
||||
|
||||
Example configuration:
|
||||
```yaml
|
||||
redis:
|
||||
enabled: true
|
||||
host: localhost
|
||||
port: 6379
|
||||
password: <secret_password>
|
||||
password_path: <path_to_the_password_file>
|
||||
# OR password: <secret_password>
|
||||
dbid: <dbid>
|
||||
#use_tls: True
|
||||
#certificate_file: <path_to_the_certificate_file>
|
||||
|
||||
@@ -177,11 +177,11 @@ The following applies to Synapse installations that have been installed from sou
|
||||
|
||||
You can start the main Synapse process with Poetry by running the following command:
|
||||
```console
|
||||
poetry run synapse_homeserver --config-file [your homeserver.yaml]
|
||||
poetry run synapse_homeserver --config-path [your homeserver.yaml]
|
||||
```
|
||||
For worker setups, you can run the following command
|
||||
```console
|
||||
poetry run synapse_worker --config-file [your homeserver.yaml] --config-file [your worker.yaml]
|
||||
poetry run synapse_worker --config-path [your homeserver.yaml] --config-path [your worker.yaml]
|
||||
```
|
||||
## Available worker applications
|
||||
|
||||
@@ -290,6 +290,7 @@ information.
|
||||
Additionally, the following REST endpoints can be handled for GET requests:
|
||||
|
||||
^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/
|
||||
^/_matrix/client/unstable/org.matrix.msc4140/delayed_events
|
||||
|
||||
Pagination requests can also be handled, but all requests for a given
|
||||
room must be routed to the same instance. Additionally, care must be taken to
|
||||
|
||||
812
poetry.lock
generated
812
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -97,7 +97,7 @@ module-name = "synapse.synapse_rust"
|
||||
|
||||
[tool.poetry]
|
||||
name = "matrix-synapse"
|
||||
version = "1.114.0"
|
||||
version = "1.118.0"
|
||||
description = "Homeserver for the Matrix decentralised comms protocol"
|
||||
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
|
||||
license = "AGPL-3.0-or-later"
|
||||
@@ -320,7 +320,7 @@ all = [
|
||||
# failing on new releases. Keeping lower bounds loose here means that dependabot
|
||||
# can bump versions without having to update the content-hash in the lockfile.
|
||||
# This helps prevents merge conflicts when running a batch of dependabot updates.
|
||||
ruff = "0.6.4"
|
||||
ruff = "0.6.9"
|
||||
# Type checking only works with the pydantic.v1 compat module from pydantic v2
|
||||
pydantic = "^2"
|
||||
|
||||
|
||||
@@ -60,6 +60,7 @@ fn bench_match_exact(b: &mut Bencher) {
|
||||
true,
|
||||
vec![],
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -105,6 +106,7 @@ fn bench_match_word(b: &mut Bencher) {
|
||||
true,
|
||||
vec![],
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -150,6 +152,7 @@ fn bench_match_word_miss(b: &mut Bencher) {
|
||||
true,
|
||||
vec![],
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -195,6 +198,7 @@ fn bench_eval_message(b: &mut Bencher) {
|
||||
true,
|
||||
vec![],
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -205,6 +209,7 @@ fn bench_eval_message(b: &mut Bencher) {
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
);
|
||||
|
||||
b.iter(|| eval.run(&rules, Some("bob"), Some("person")));
|
||||
|
||||
@@ -81,7 +81,7 @@ pub const BASE_APPEND_OVERRIDE_RULES: &[PushRule] = &[
|
||||
))]),
|
||||
actions: Cow::Borrowed(&[Action::Notify]),
|
||||
default: true,
|
||||
default_enabled: false,
|
||||
default_enabled: true,
|
||||
},
|
||||
PushRule {
|
||||
rule_id: Cow::Borrowed("global/override/.m.rule.suppress_notices"),
|
||||
|
||||
@@ -105,6 +105,9 @@ pub struct PushRuleEvaluator {
|
||||
/// If MSC3931 (room version feature flags) is enabled. Usually controlled by the same
|
||||
/// flag as MSC1767 (extensible events core).
|
||||
msc3931_enabled: bool,
|
||||
|
||||
// If MSC4210 (remove legacy mentions) is enabled.
|
||||
msc4210_enabled: bool,
|
||||
}
|
||||
|
||||
#[pymethods]
|
||||
@@ -122,6 +125,7 @@ impl PushRuleEvaluator {
|
||||
related_event_match_enabled,
|
||||
room_version_feature_flags,
|
||||
msc3931_enabled,
|
||||
msc4210_enabled,
|
||||
))]
|
||||
pub fn py_new(
|
||||
flattened_keys: BTreeMap<String, JsonValue>,
|
||||
@@ -133,6 +137,7 @@ impl PushRuleEvaluator {
|
||||
related_event_match_enabled: bool,
|
||||
room_version_feature_flags: Vec<String>,
|
||||
msc3931_enabled: bool,
|
||||
msc4210_enabled: bool,
|
||||
) -> Result<Self, Error> {
|
||||
let body = match flattened_keys.get("content.body") {
|
||||
Some(JsonValue::Value(SimpleJsonValue::Str(s))) => s.clone().into_owned(),
|
||||
@@ -150,6 +155,7 @@ impl PushRuleEvaluator {
|
||||
related_event_match_enabled,
|
||||
room_version_feature_flags,
|
||||
msc3931_enabled,
|
||||
msc4210_enabled,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -176,7 +182,8 @@ impl PushRuleEvaluator {
|
||||
|
||||
// For backwards-compatibility the legacy mention rules are disabled
|
||||
// if the event contains the 'm.mentions' property.
|
||||
if self.has_mentions
|
||||
// Additionally, MSC4210 always disables the legacy rules.
|
||||
if (self.has_mentions || self.msc4210_enabled)
|
||||
&& (rule_id == "global/override/.m.rule.contains_display_name"
|
||||
|| rule_id == "global/content/.m.rule.contains_user_name"
|
||||
|| rule_id == "global/override/.m.rule.roomnotif")
|
||||
@@ -526,6 +533,7 @@ fn push_rule_evaluator() {
|
||||
true,
|
||||
vec![],
|
||||
true,
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -555,6 +563,7 @@ fn test_requires_room_version_supports_condition() {
|
||||
false,
|
||||
flags,
|
||||
true,
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -582,7 +591,7 @@ fn test_requires_room_version_supports_condition() {
|
||||
};
|
||||
let rules = PushRules::new(vec![custom_rule]);
|
||||
result = evaluator.run(
|
||||
&FilteredPushRules::py_new(rules, BTreeMap::new(), true, false, true, false),
|
||||
&FilteredPushRules::py_new(rules, BTreeMap::new(), true, false, true, false, false),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
@@ -534,6 +534,7 @@ pub struct FilteredPushRules {
|
||||
msc3381_polls_enabled: bool,
|
||||
msc3664_enabled: bool,
|
||||
msc4028_push_encrypted_events: bool,
|
||||
msc4210_enabled: bool,
|
||||
}
|
||||
|
||||
#[pymethods]
|
||||
@@ -546,6 +547,7 @@ impl FilteredPushRules {
|
||||
msc3381_polls_enabled: bool,
|
||||
msc3664_enabled: bool,
|
||||
msc4028_push_encrypted_events: bool,
|
||||
msc4210_enabled: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
push_rules,
|
||||
@@ -554,6 +556,7 @@ impl FilteredPushRules {
|
||||
msc3381_polls_enabled,
|
||||
msc3664_enabled,
|
||||
msc4028_push_encrypted_events,
|
||||
msc4210_enabled,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -596,6 +599,14 @@ impl FilteredPushRules {
|
||||
return false;
|
||||
}
|
||||
|
||||
if self.msc4210_enabled
|
||||
&& (rule.rule_id == "global/override/.m.rule.contains_display_name"
|
||||
|| rule.rule_id == "global/content/.m.rule.contains_user_name"
|
||||
|| rule.rule_id == "global/override/.m.rule.roomnotif")
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
true
|
||||
})
|
||||
.map(|r| {
|
||||
|
||||
@@ -32,8 +32,8 @@ DISTS = (
|
||||
"debian:sid", # (EOL not specified yet) (our EOL forced by Python 3.11 is 2027-10-24)
|
||||
"ubuntu:focal", # 20.04 LTS (EOL 2025-04) (our EOL forced by Python 3.8 is 2024-10-14)
|
||||
"ubuntu:jammy", # 22.04 LTS (EOL 2027-04) (our EOL forced by Python 3.10 is 2026-10-04)
|
||||
"ubuntu:lunar", # 23.04 (EOL 2024-01) (our EOL forced by Python 3.11 is 2027-10-24)
|
||||
"ubuntu:mantic", # 23.10 (EOL 2024-07) (our EOL forced by Python 3.11 is 2027-10-24)
|
||||
"ubuntu:noble", # 24.04 LTS (EOL 2029-06)
|
||||
"ubuntu:oracular", # 24.10 (EOL 2025-07)
|
||||
"debian:trixie", # (EOL not specified yet)
|
||||
)
|
||||
|
||||
|
||||
@@ -45,7 +45,6 @@ import traceback
|
||||
import unittest.mock
|
||||
from contextlib import contextmanager
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Callable,
|
||||
Dict,
|
||||
@@ -57,30 +56,17 @@ from typing import (
|
||||
)
|
||||
|
||||
from parameterized import parameterized
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import (
|
||||
BaseModel as PydanticBaseModel,
|
||||
conbytes,
|
||||
confloat,
|
||||
conint,
|
||||
constr,
|
||||
)
|
||||
from pydantic.v1.typing import get_args
|
||||
else:
|
||||
from pydantic import (
|
||||
BaseModel as PydanticBaseModel,
|
||||
conbytes,
|
||||
confloat,
|
||||
conint,
|
||||
constr,
|
||||
)
|
||||
from pydantic.typing import get_args
|
||||
|
||||
from typing_extensions import ParamSpec
|
||||
|
||||
from synapse._pydantic_compat import (
|
||||
BaseModel as PydanticBaseModel,
|
||||
conbytes,
|
||||
confloat,
|
||||
conint,
|
||||
constr,
|
||||
get_args,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
CONSTRAINED_TYPE_FACTORIES_WITH_STRICT_FLAG: List[Callable] = [
|
||||
@@ -183,22 +169,16 @@ def monkeypatch_pydantic() -> Generator[None, None, None]:
|
||||
# Most Synapse code ought to import the patched objects directly from
|
||||
# `pydantic`. But we also patch their containing modules `pydantic.main` and
|
||||
# `pydantic.types` for completeness.
|
||||
patch_basemodel1 = unittest.mock.patch(
|
||||
"pydantic.BaseModel", new=PatchedBaseModel
|
||||
patch_basemodel = unittest.mock.patch(
|
||||
"synapse._pydantic_compat.BaseModel", new=PatchedBaseModel
|
||||
)
|
||||
patch_basemodel2 = unittest.mock.patch(
|
||||
"pydantic.main.BaseModel", new=PatchedBaseModel
|
||||
)
|
||||
patches.enter_context(patch_basemodel1)
|
||||
patches.enter_context(patch_basemodel2)
|
||||
patches.enter_context(patch_basemodel)
|
||||
for factory in CONSTRAINED_TYPE_FACTORIES_WITH_STRICT_FLAG:
|
||||
wrapper: Callable = make_wrapper(factory)
|
||||
patch1 = unittest.mock.patch(f"pydantic.{factory.__name__}", new=wrapper)
|
||||
patch2 = unittest.mock.patch(
|
||||
f"pydantic.types.{factory.__name__}", new=wrapper
|
||||
patch = unittest.mock.patch(
|
||||
f"synapse._pydantic_compat.{factory.__name__}", new=wrapper
|
||||
)
|
||||
patches.enter_context(patch1)
|
||||
patches.enter_context(patch2)
|
||||
patches.enter_context(patch)
|
||||
yield
|
||||
|
||||
|
||||
|
||||
@@ -220,9 +220,11 @@ test_packages=(
|
||||
./tests/msc3874
|
||||
./tests/msc3890
|
||||
./tests/msc3391
|
||||
./tests/msc3757
|
||||
./tests/msc3930
|
||||
./tests/msc3902
|
||||
./tests/msc3967
|
||||
./tests/msc4140
|
||||
)
|
||||
|
||||
# Enable dirty runs, so tests will reuse the same container where possible.
|
||||
|
||||
@@ -360,7 +360,7 @@ def is_cacheable(
|
||||
# For a type alias, check if the underlying real type is cachable.
|
||||
return is_cacheable(mypy.types.get_proper_type(rt), signature, verbose)
|
||||
|
||||
elif isinstance(rt, UninhabitedType) and rt.is_noreturn:
|
||||
elif isinstance(rt, UninhabitedType):
|
||||
# There is no return value, just consider it cachable. This is only used
|
||||
# in tests.
|
||||
return True, None
|
||||
|
||||
@@ -40,7 +40,7 @@ import commonmark
|
||||
import git
|
||||
from click.exceptions import ClickException
|
||||
from git import GitCommandError, Repo
|
||||
from github import Github
|
||||
from github import BadCredentialsException, Github
|
||||
from packaging import version
|
||||
|
||||
|
||||
@@ -323,10 +323,8 @@ def tag(gh_token: Optional[str]) -> None:
|
||||
def _tag(gh_token: Optional[str]) -> None:
|
||||
"""Tags the release and generates a draft GitHub release"""
|
||||
|
||||
if gh_token:
|
||||
# Test that the GH Token is valid before continuing.
|
||||
gh = Github(gh_token)
|
||||
gh.get_user()
|
||||
# Test that the GH Token is valid before continuing.
|
||||
check_valid_gh_token(gh_token)
|
||||
|
||||
# Make sure we're in a git repo.
|
||||
repo = get_repo_and_check_clean_checkout()
|
||||
@@ -469,10 +467,8 @@ def upload(gh_token: Optional[str]) -> None:
|
||||
def _upload(gh_token: Optional[str]) -> None:
|
||||
"""Upload release to pypi."""
|
||||
|
||||
if gh_token:
|
||||
# Test that the GH Token is valid before continuing.
|
||||
gh = Github(gh_token)
|
||||
gh.get_user()
|
||||
# Test that the GH Token is valid before continuing.
|
||||
check_valid_gh_token(gh_token)
|
||||
|
||||
current_version = get_package_version()
|
||||
tag_name = f"v{current_version}"
|
||||
@@ -569,10 +565,8 @@ def wait_for_actions(gh_token: Optional[str]) -> None:
|
||||
|
||||
|
||||
def _wait_for_actions(gh_token: Optional[str]) -> None:
|
||||
if gh_token:
|
||||
# Test that the GH Token is valid before continuing.
|
||||
gh = Github(gh_token)
|
||||
gh.get_user()
|
||||
# Test that the GH Token is valid before continuing.
|
||||
check_valid_gh_token(gh_token)
|
||||
|
||||
# Find out the version and tag name.
|
||||
current_version = get_package_version()
|
||||
@@ -806,6 +800,22 @@ def get_repo_and_check_clean_checkout(
|
||||
return repo
|
||||
|
||||
|
||||
def check_valid_gh_token(gh_token: Optional[str]) -> None:
|
||||
"""Check that a github token is valid, if supplied"""
|
||||
|
||||
if not gh_token:
|
||||
# No github token supplied, so nothing to do.
|
||||
return
|
||||
|
||||
try:
|
||||
gh = Github(gh_token)
|
||||
|
||||
# We need to lookup name to trigger a request.
|
||||
_name = gh.get_user().name
|
||||
except BadCredentialsException as e:
|
||||
raise click.ClickException(f"Github credentials are bad: {e}")
|
||||
|
||||
|
||||
def find_ref(repo: git.Repo, ref_name: str) -> Optional[git.HEAD]:
|
||||
"""Find the branch/ref, looking first locally then in the remote."""
|
||||
if ref_name in repo.references:
|
||||
|
||||
@@ -19,6 +19,8 @@
|
||||
#
|
||||
#
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from packaging.version import Version
|
||||
|
||||
try:
|
||||
@@ -30,4 +32,64 @@ except ImportError:
|
||||
|
||||
HAS_PYDANTIC_V2: bool = Version(pydantic_version).major == 2
|
||||
|
||||
__all__ = ("HAS_PYDANTIC_V2",)
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import (
|
||||
BaseModel,
|
||||
Extra,
|
||||
Field,
|
||||
MissingError,
|
||||
PydanticValueError,
|
||||
StrictBool,
|
||||
StrictInt,
|
||||
StrictStr,
|
||||
ValidationError,
|
||||
conbytes,
|
||||
confloat,
|
||||
conint,
|
||||
constr,
|
||||
parse_obj_as,
|
||||
validator,
|
||||
)
|
||||
from pydantic.v1.error_wrappers import ErrorWrapper
|
||||
from pydantic.v1.typing import get_args
|
||||
else:
|
||||
from pydantic import (
|
||||
BaseModel,
|
||||
Extra,
|
||||
Field,
|
||||
MissingError,
|
||||
PydanticValueError,
|
||||
StrictBool,
|
||||
StrictInt,
|
||||
StrictStr,
|
||||
ValidationError,
|
||||
conbytes,
|
||||
confloat,
|
||||
conint,
|
||||
constr,
|
||||
parse_obj_as,
|
||||
validator,
|
||||
)
|
||||
from pydantic.error_wrappers import ErrorWrapper
|
||||
from pydantic.typing import get_args
|
||||
|
||||
__all__ = (
|
||||
"HAS_PYDANTIC_V2",
|
||||
"BaseModel",
|
||||
"constr",
|
||||
"conbytes",
|
||||
"conint",
|
||||
"confloat",
|
||||
"ErrorWrapper",
|
||||
"Extra",
|
||||
"Field",
|
||||
"get_args",
|
||||
"MissingError",
|
||||
"parse_obj_as",
|
||||
"PydanticValueError",
|
||||
"StrictBool",
|
||||
"StrictInt",
|
||||
"StrictStr",
|
||||
"ValidationError",
|
||||
"validator",
|
||||
)
|
||||
|
||||
@@ -338,7 +338,7 @@ class MSC3861DelegatedAuth(BaseAuth):
|
||||
logger.exception("Failed to introspect token")
|
||||
raise SynapseError(503, "Unable to introspect the access token")
|
||||
|
||||
logger.info(f"Introspection result: {introspection_result!r}")
|
||||
logger.debug("Introspection result: %r", introspection_result)
|
||||
|
||||
# TODO: introspection verification should be more extensive, especially:
|
||||
# - verify the audience
|
||||
|
||||
@@ -107,6 +107,8 @@ class RoomVersion:
|
||||
# support the flag. Unknown flags are ignored by the evaluator, making conditions
|
||||
# fail if used.
|
||||
msc3931_push_features: Tuple[str, ...] # values from PushRuleRoomFlag
|
||||
# MSC3757: Restricting who can overwrite a state event
|
||||
msc3757_enabled: bool
|
||||
|
||||
|
||||
class RoomVersions:
|
||||
@@ -128,6 +130,7 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=False,
|
||||
enforce_int_power_levels=False,
|
||||
msc3931_push_features=(),
|
||||
msc3757_enabled=False,
|
||||
)
|
||||
V2 = RoomVersion(
|
||||
"2",
|
||||
@@ -147,6 +150,7 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=False,
|
||||
enforce_int_power_levels=False,
|
||||
msc3931_push_features=(),
|
||||
msc3757_enabled=False,
|
||||
)
|
||||
V3 = RoomVersion(
|
||||
"3",
|
||||
@@ -166,6 +170,7 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=False,
|
||||
enforce_int_power_levels=False,
|
||||
msc3931_push_features=(),
|
||||
msc3757_enabled=False,
|
||||
)
|
||||
V4 = RoomVersion(
|
||||
"4",
|
||||
@@ -185,6 +190,7 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=False,
|
||||
enforce_int_power_levels=False,
|
||||
msc3931_push_features=(),
|
||||
msc3757_enabled=False,
|
||||
)
|
||||
V5 = RoomVersion(
|
||||
"5",
|
||||
@@ -204,6 +210,7 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=False,
|
||||
enforce_int_power_levels=False,
|
||||
msc3931_push_features=(),
|
||||
msc3757_enabled=False,
|
||||
)
|
||||
V6 = RoomVersion(
|
||||
"6",
|
||||
@@ -223,6 +230,7 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=False,
|
||||
enforce_int_power_levels=False,
|
||||
msc3931_push_features=(),
|
||||
msc3757_enabled=False,
|
||||
)
|
||||
V7 = RoomVersion(
|
||||
"7",
|
||||
@@ -242,6 +250,7 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=False,
|
||||
enforce_int_power_levels=False,
|
||||
msc3931_push_features=(),
|
||||
msc3757_enabled=False,
|
||||
)
|
||||
V8 = RoomVersion(
|
||||
"8",
|
||||
@@ -261,6 +270,7 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=False,
|
||||
enforce_int_power_levels=False,
|
||||
msc3931_push_features=(),
|
||||
msc3757_enabled=False,
|
||||
)
|
||||
V9 = RoomVersion(
|
||||
"9",
|
||||
@@ -280,6 +290,7 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=False,
|
||||
enforce_int_power_levels=False,
|
||||
msc3931_push_features=(),
|
||||
msc3757_enabled=False,
|
||||
)
|
||||
V10 = RoomVersion(
|
||||
"10",
|
||||
@@ -299,6 +310,7 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=True,
|
||||
enforce_int_power_levels=True,
|
||||
msc3931_push_features=(),
|
||||
msc3757_enabled=False,
|
||||
)
|
||||
MSC1767v10 = RoomVersion(
|
||||
# MSC1767 (Extensible Events) based on room version "10"
|
||||
@@ -319,6 +331,28 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=True,
|
||||
enforce_int_power_levels=True,
|
||||
msc3931_push_features=(PushRuleRoomFlag.EXTENSIBLE_EVENTS,),
|
||||
msc3757_enabled=False,
|
||||
)
|
||||
MSC3757v10 = RoomVersion(
|
||||
# MSC3757 (Restricting who can overwrite a state event) based on room version "10"
|
||||
"org.matrix.msc3757.10",
|
||||
RoomDisposition.UNSTABLE,
|
||||
EventFormatVersions.ROOM_V4_PLUS,
|
||||
StateResolutionVersions.V2,
|
||||
enforce_key_validity=True,
|
||||
special_case_aliases_auth=False,
|
||||
strict_canonicaljson=True,
|
||||
limit_notifications_power_levels=True,
|
||||
implicit_room_creator=False,
|
||||
updated_redaction_rules=False,
|
||||
restricted_join_rule=True,
|
||||
restricted_join_rule_fix=True,
|
||||
knock_join_rule=True,
|
||||
msc3389_relation_redactions=False,
|
||||
knock_restricted_join_rule=True,
|
||||
enforce_int_power_levels=True,
|
||||
msc3931_push_features=(),
|
||||
msc3757_enabled=True,
|
||||
)
|
||||
V11 = RoomVersion(
|
||||
"11",
|
||||
@@ -338,6 +372,28 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=True,
|
||||
enforce_int_power_levels=True,
|
||||
msc3931_push_features=(),
|
||||
msc3757_enabled=False,
|
||||
)
|
||||
MSC3757v11 = RoomVersion(
|
||||
# MSC3757 (Restricting who can overwrite a state event) based on room version "11"
|
||||
"org.matrix.msc3757.11",
|
||||
RoomDisposition.UNSTABLE,
|
||||
EventFormatVersions.ROOM_V4_PLUS,
|
||||
StateResolutionVersions.V2,
|
||||
enforce_key_validity=True,
|
||||
special_case_aliases_auth=False,
|
||||
strict_canonicaljson=True,
|
||||
limit_notifications_power_levels=True,
|
||||
implicit_room_creator=True, # Used by MSC3820
|
||||
updated_redaction_rules=True, # Used by MSC3820
|
||||
restricted_join_rule=True,
|
||||
restricted_join_rule_fix=True,
|
||||
knock_join_rule=True,
|
||||
msc3389_relation_redactions=False,
|
||||
knock_restricted_join_rule=True,
|
||||
enforce_int_power_levels=True,
|
||||
msc3931_push_features=(),
|
||||
msc3757_enabled=True,
|
||||
)
|
||||
|
||||
|
||||
@@ -355,6 +411,8 @@ KNOWN_ROOM_VERSIONS: Dict[str, RoomVersion] = {
|
||||
RoomVersions.V9,
|
||||
RoomVersions.V10,
|
||||
RoomVersions.V11,
|
||||
RoomVersions.MSC3757v10,
|
||||
RoomVersions.MSC3757v11,
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
#
|
||||
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
# Copyright 2016 OpenMarket Ltd
|
||||
# Copyright (C) 2023 New Vector, Ltd
|
||||
# Copyright (C) 2023-2024 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
@@ -65,6 +65,7 @@ from synapse.storage.databases.main.appservice import (
|
||||
)
|
||||
from synapse.storage.databases.main.censor_events import CensorEventsStore
|
||||
from synapse.storage.databases.main.client_ips import ClientIpWorkerStore
|
||||
from synapse.storage.databases.main.delayed_events import DelayedEventsStore
|
||||
from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore
|
||||
from synapse.storage.databases.main.devices import DeviceWorkerStore
|
||||
from synapse.storage.databases.main.directory import DirectoryWorkerStore
|
||||
@@ -161,6 +162,7 @@ class GenericWorkerStore(
|
||||
TaskSchedulerWorkerStore,
|
||||
ExperimentalFeaturesStore,
|
||||
SlidingSyncStore,
|
||||
DelayedEventsStore,
|
||||
):
|
||||
# Properties that multiple storage classes define. Tell mypy what the
|
||||
# expected type is.
|
||||
|
||||
@@ -18,17 +18,11 @@
|
||||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
from typing import TYPE_CHECKING, Any, Dict, Type, TypeVar
|
||||
from typing import Any, Dict, Type, TypeVar
|
||||
|
||||
import jsonschema
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import BaseModel, ValidationError, parse_obj_as
|
||||
else:
|
||||
from pydantic import BaseModel, ValidationError, parse_obj_as
|
||||
|
||||
from synapse._pydantic_compat import BaseModel, ValidationError, parse_obj_as
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.types import JsonDict, StrSequence
|
||||
|
||||
|
||||
@@ -447,3 +447,6 @@ class ExperimentalConfig(Config):
|
||||
|
||||
# MSC4151: Report room API (Client-Server API)
|
||||
self.msc4151_enabled: bool = experimental.get("msc4151_enabled", False)
|
||||
|
||||
# MSC4210: Remove legacy mentions
|
||||
self.msc4210_enabled: bool = experimental.get("msc4210_enabled", False)
|
||||
|
||||
@@ -38,6 +38,7 @@ class JWTConfig(Config):
|
||||
self.jwt_algorithm = jwt_config["algorithm"]
|
||||
|
||||
self.jwt_subject_claim = jwt_config.get("subject_claim", "sub")
|
||||
self.jwt_display_name_claim = jwt_config.get("display_name_claim")
|
||||
|
||||
# The issuer and audiences are optional, if provided, it is asserted
|
||||
# that the claims exist on the JWT.
|
||||
@@ -49,5 +50,6 @@ class JWTConfig(Config):
|
||||
self.jwt_secret = None
|
||||
self.jwt_algorithm = None
|
||||
self.jwt_subject_claim = None
|
||||
self.jwt_display_name_claim = None
|
||||
self.jwt_issuer = None
|
||||
self.jwt_audiences = None
|
||||
|
||||
@@ -21,10 +21,15 @@
|
||||
|
||||
from typing import Any
|
||||
|
||||
from synapse.config._base import Config
|
||||
from synapse.config._base import Config, ConfigError, read_file
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util.check_dependencies import check_requirements
|
||||
|
||||
CONFLICTING_PASSWORD_OPTS_ERROR = """\
|
||||
You have configured both `redis.password` and `redis.password_path`.
|
||||
These are mutually incompatible.
|
||||
"""
|
||||
|
||||
|
||||
class RedisConfig(Config):
|
||||
section = "redis"
|
||||
@@ -43,6 +48,17 @@ class RedisConfig(Config):
|
||||
self.redis_path = redis_config.get("path", None)
|
||||
self.redis_dbid = redis_config.get("dbid", None)
|
||||
self.redis_password = redis_config.get("password")
|
||||
redis_password_path = redis_config.get("password_path")
|
||||
if redis_password_path:
|
||||
if self.redis_password:
|
||||
raise ConfigError(CONFLICTING_PASSWORD_OPTS_ERROR)
|
||||
self.redis_password = read_file(
|
||||
redis_password_path,
|
||||
(
|
||||
"redis",
|
||||
"password_path",
|
||||
),
|
||||
).strip()
|
||||
|
||||
self.redis_use_tls = redis_config.get("use_tls", False)
|
||||
self.redis_certificate = redis_config.get("certificate_file", None)
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright 2014-2021 The Matrix.org Foundation C.I.C.
|
||||
# Copyright (C) 2023 New Vector, Ltd
|
||||
# Copyright (C) 2023-2024 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
@@ -780,6 +780,17 @@ class ServerConfig(Config):
|
||||
else:
|
||||
self.delete_stale_devices_after = None
|
||||
|
||||
# The maximum allowed delay duration for delayed events (MSC4140).
|
||||
max_event_delay_duration = config.get("max_event_delay_duration")
|
||||
if max_event_delay_duration is not None:
|
||||
self.max_event_delay_ms: Optional[int] = self.parse_duration(
|
||||
max_event_delay_duration
|
||||
)
|
||||
if self.max_event_delay_ms <= 0:
|
||||
raise ConfigError("max_event_delay_duration must be a positive value")
|
||||
else:
|
||||
self.max_event_delay_ms = None
|
||||
|
||||
def has_tls_listener(self) -> bool:
|
||||
return any(listener.is_tls() for listener in self.listeners)
|
||||
|
||||
|
||||
@@ -23,7 +23,12 @@ from typing import Any
|
||||
|
||||
from synapse.types import JsonDict
|
||||
|
||||
from ._base import Config
|
||||
from ._base import Config, ConfigError, read_file
|
||||
|
||||
CONFLICTING_SHARED_SECRET_OPTS_ERROR = """\
|
||||
You have configured both `turn_shared_secret` and `turn_shared_secret_path`.
|
||||
These are mutually incompatible.
|
||||
"""
|
||||
|
||||
|
||||
class VoipConfig(Config):
|
||||
@@ -32,6 +37,13 @@ class VoipConfig(Config):
|
||||
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
|
||||
self.turn_uris = config.get("turn_uris", [])
|
||||
self.turn_shared_secret = config.get("turn_shared_secret")
|
||||
turn_shared_secret_path = config.get("turn_shared_secret_path")
|
||||
if turn_shared_secret_path:
|
||||
if self.turn_shared_secret:
|
||||
raise ConfigError(CONFLICTING_SHARED_SECRET_OPTS_ERROR)
|
||||
self.turn_shared_secret = read_file(
|
||||
turn_shared_secret_path, ("turn_shared_secret_path",)
|
||||
).strip()
|
||||
self.turn_username = config.get("turn_username")
|
||||
self.turn_password = config.get("turn_password")
|
||||
self.turn_user_lifetime = self.parse_duration(
|
||||
|
||||
@@ -22,17 +22,17 @@
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
import attr
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import BaseModel, Extra, StrictBool, StrictInt, StrictStr
|
||||
else:
|
||||
from pydantic import BaseModel, Extra, StrictBool, StrictInt, StrictStr
|
||||
|
||||
from synapse._pydantic_compat import (
|
||||
BaseModel,
|
||||
Extra,
|
||||
StrictBool,
|
||||
StrictInt,
|
||||
StrictStr,
|
||||
)
|
||||
from synapse.config._base import (
|
||||
Config,
|
||||
ConfigError,
|
||||
|
||||
@@ -388,6 +388,7 @@ LENIENT_EVENT_BYTE_LIMITS_ROOM_VERSIONS = {
|
||||
RoomVersions.V9,
|
||||
RoomVersions.V10,
|
||||
RoomVersions.MSC1767v10,
|
||||
RoomVersions.MSC3757v10,
|
||||
}
|
||||
|
||||
|
||||
@@ -790,9 +791,10 @@ def get_send_level(
|
||||
|
||||
|
||||
def _can_send_event(event: "EventBase", auth_events: StateMap["EventBase"]) -> bool:
|
||||
state_key = event.get_state_key()
|
||||
power_levels_event = get_power_level_event(auth_events)
|
||||
|
||||
send_level = get_send_level(event.type, event.get("state_key"), power_levels_event)
|
||||
send_level = get_send_level(event.type, state_key, power_levels_event)
|
||||
user_level = get_user_power_level(event.user_id, auth_events)
|
||||
|
||||
if user_level < send_level:
|
||||
@@ -803,11 +805,34 @@ def _can_send_event(event: "EventBase", auth_events: StateMap["EventBase"]) -> b
|
||||
errcode=Codes.INSUFFICIENT_POWER,
|
||||
)
|
||||
|
||||
# Check state_key
|
||||
if hasattr(event, "state_key"):
|
||||
if event.state_key.startswith("@"):
|
||||
if event.state_key != event.user_id:
|
||||
raise AuthError(403, "You are not allowed to set others state")
|
||||
if (
|
||||
state_key is not None
|
||||
and state_key.startswith("@")
|
||||
and state_key != event.user_id
|
||||
):
|
||||
if event.room_version.msc3757_enabled:
|
||||
try:
|
||||
colon_idx = state_key.index(":", 1)
|
||||
suffix_idx = state_key.find("_", colon_idx + 1)
|
||||
state_key_user_id = (
|
||||
state_key[:suffix_idx] if suffix_idx != -1 else state_key
|
||||
)
|
||||
if not UserID.is_valid(state_key_user_id):
|
||||
raise ValueError
|
||||
except ValueError:
|
||||
raise SynapseError(
|
||||
400,
|
||||
"State key neither equals a valid user ID, nor starts with one plus an underscore",
|
||||
errcode=Codes.BAD_JSON,
|
||||
)
|
||||
if (
|
||||
# sender is owner of the state key
|
||||
state_key_user_id == event.user_id
|
||||
# sender has higher PL than the owner of the state key
|
||||
or user_level > get_user_power_level(state_key_user_id, auth_events)
|
||||
):
|
||||
return True
|
||||
raise AuthError(403, "You are not allowed to set others state")
|
||||
|
||||
return True
|
||||
|
||||
|
||||
@@ -19,17 +19,11 @@
|
||||
#
|
||||
#
|
||||
import collections.abc
|
||||
from typing import TYPE_CHECKING, List, Type, Union, cast
|
||||
from typing import List, Type, Union, cast
|
||||
|
||||
import jsonschema
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import Field, StrictBool, StrictStr
|
||||
else:
|
||||
from pydantic import Field, StrictBool, StrictStr
|
||||
|
||||
from synapse._pydantic_compat import Field, StrictBool, StrictStr
|
||||
from synapse.api.constants import (
|
||||
MAX_ALIAS_LENGTH,
|
||||
EventContentFields,
|
||||
|
||||
@@ -113,7 +113,7 @@ class Authenticator:
|
||||
):
|
||||
raise AuthenticationError(
|
||||
HTTPStatus.UNAUTHORIZED,
|
||||
"Destination mismatch in auth header",
|
||||
f"Destination mismatch in auth header, received: {destination!r}",
|
||||
Codes.UNAUTHORIZED,
|
||||
)
|
||||
if (
|
||||
|
||||
@@ -33,7 +33,7 @@ from synapse.replication.http.account_data import (
|
||||
ReplicationRemoveUserAccountDataRestServlet,
|
||||
)
|
||||
from synapse.streams import EventSource
|
||||
from synapse.types import JsonDict, StrCollection, StreamKeyType, UserID
|
||||
from synapse.types import JsonDict, JsonMapping, StrCollection, StreamKeyType, UserID
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
@@ -253,7 +253,7 @@ class AccountDataHandler:
|
||||
return response["max_stream_id"]
|
||||
|
||||
async def add_tag_to_room(
|
||||
self, user_id: str, room_id: str, tag: str, content: JsonDict
|
||||
self, user_id: str, room_id: str, tag: str, content: JsonMapping
|
||||
) -> int:
|
||||
"""Add a tag to a room for a user.
|
||||
|
||||
|
||||
@@ -21,13 +21,34 @@
|
||||
|
||||
import abc
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Sequence, Set
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Dict,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Set,
|
||||
Tuple,
|
||||
)
|
||||
|
||||
import attr
|
||||
|
||||
from synapse.api.constants import Direction, Membership
|
||||
from synapse.api.constants import Direction, EventTypes, Membership
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.events import EventBase
|
||||
from synapse.types import JsonMapping, RoomStreamToken, StateMap, UserID, UserInfo
|
||||
from synapse.types import (
|
||||
JsonMapping,
|
||||
Requester,
|
||||
RoomStreamToken,
|
||||
ScheduledTask,
|
||||
StateMap,
|
||||
TaskStatus,
|
||||
UserID,
|
||||
UserInfo,
|
||||
create_requester,
|
||||
)
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -35,6 +56,8 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
REDACT_ALL_EVENTS_ACTION_NAME = "redact_all_events"
|
||||
|
||||
|
||||
class AdminHandler:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
@@ -43,6 +66,22 @@ class AdminHandler:
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self._state_storage_controller = self._storage_controllers.state
|
||||
self._msc3866_enabled = hs.config.experimental.msc3866.enabled
|
||||
self.event_creation_handler = hs.get_event_creation_handler()
|
||||
self._task_scheduler = hs.get_task_scheduler()
|
||||
|
||||
self._task_scheduler.register_action(
|
||||
self._redact_all_events, REDACT_ALL_EVENTS_ACTION_NAME
|
||||
)
|
||||
|
||||
self.hs = hs
|
||||
|
||||
async def get_redact_task(self, redact_id: str) -> Optional[ScheduledTask]:
|
||||
"""Get the current status of an active redaction process
|
||||
|
||||
Args:
|
||||
redact_id: redact_id returned by start_redact_events.
|
||||
"""
|
||||
return await self._task_scheduler.get_task(redact_id)
|
||||
|
||||
async def get_whois(self, user: UserID) -> JsonMapping:
|
||||
connections = []
|
||||
@@ -313,6 +352,155 @@ class AdminHandler:
|
||||
|
||||
return writer.finished()
|
||||
|
||||
async def start_redact_events(
|
||||
self,
|
||||
user_id: str,
|
||||
rooms: list,
|
||||
requester: JsonMapping,
|
||||
reason: Optional[str],
|
||||
limit: Optional[int],
|
||||
) -> str:
|
||||
"""
|
||||
Start a task redacting the events of the given user in the given rooms
|
||||
|
||||
Args:
|
||||
user_id: the user ID of the user whose events should be redacted
|
||||
rooms: the rooms in which to redact the user's events
|
||||
requester: the user requesting the events
|
||||
reason: reason for requesting the redaction, ie spam, etc
|
||||
limit: limit on the number of events in each room to redact
|
||||
|
||||
Returns:
|
||||
a unique ID which can be used to query the status of the task
|
||||
"""
|
||||
active_tasks = await self._task_scheduler.get_tasks(
|
||||
actions=[REDACT_ALL_EVENTS_ACTION_NAME],
|
||||
resource_id=user_id,
|
||||
statuses=[TaskStatus.ACTIVE],
|
||||
)
|
||||
|
||||
if len(active_tasks) > 0:
|
||||
raise SynapseError(
|
||||
400, "Redact already in progress for user %s" % (user_id,)
|
||||
)
|
||||
|
||||
if not limit:
|
||||
limit = 1000
|
||||
|
||||
redact_id = await self._task_scheduler.schedule_task(
|
||||
REDACT_ALL_EVENTS_ACTION_NAME,
|
||||
resource_id=user_id,
|
||||
params={
|
||||
"rooms": rooms,
|
||||
"requester": requester,
|
||||
"user_id": user_id,
|
||||
"reason": reason,
|
||||
"limit": limit,
|
||||
},
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"starting redact events with redact_id %s",
|
||||
redact_id,
|
||||
)
|
||||
|
||||
return redact_id
|
||||
|
||||
async def _redact_all_events(
|
||||
self, task: ScheduledTask
|
||||
) -> Tuple[TaskStatus, Optional[Mapping[str, Any]], Optional[str]]:
|
||||
"""
|
||||
Task to redact all of a users events in the given rooms, tracking which, if any, events
|
||||
whose redaction failed
|
||||
"""
|
||||
|
||||
assert task.params is not None
|
||||
rooms = task.params.get("rooms")
|
||||
assert rooms is not None
|
||||
|
||||
r = task.params.get("requester")
|
||||
assert r is not None
|
||||
admin = Requester.deserialize(self._store, r)
|
||||
|
||||
user_id = task.params.get("user_id")
|
||||
assert user_id is not None
|
||||
|
||||
# puppet the user if they're ours, otherwise use admin to redact
|
||||
requester = create_requester(
|
||||
user_id if self.hs.is_mine_id(user_id) else admin.user.to_string(),
|
||||
authenticated_entity=admin.user.to_string(),
|
||||
)
|
||||
|
||||
reason = task.params.get("reason")
|
||||
limit = task.params.get("limit")
|
||||
assert limit is not None
|
||||
|
||||
result: Mapping[str, Any] = (
|
||||
task.result if task.result else {"failed_redactions": {}}
|
||||
)
|
||||
for room in rooms:
|
||||
room_version = await self._store.get_room_version(room)
|
||||
event_ids = await self._store.get_events_sent_by_user_in_room(
|
||||
user_id,
|
||||
room,
|
||||
limit,
|
||||
["m.room.member", "m.room.message"],
|
||||
)
|
||||
if not event_ids:
|
||||
# nothing to redact in this room
|
||||
continue
|
||||
|
||||
events = await self._store.get_events_as_list(event_ids)
|
||||
for event in events:
|
||||
# we care about join events but not other membership events
|
||||
if event.type == "m.room.member":
|
||||
content = event.content
|
||||
if content:
|
||||
if content.get("membership") == Membership.JOIN:
|
||||
pass
|
||||
else:
|
||||
continue
|
||||
relations = await self._store.get_relations_for_event(
|
||||
room, event.event_id, event, event_type=EventTypes.Redaction
|
||||
)
|
||||
|
||||
# if we've already successfully redacted this event then skip processing it
|
||||
if relations[0]:
|
||||
continue
|
||||
|
||||
event_dict = {
|
||||
"type": EventTypes.Redaction,
|
||||
"content": {"reason": reason} if reason else {},
|
||||
"room_id": room,
|
||||
"sender": user_id,
|
||||
}
|
||||
if room_version.updated_redaction_rules:
|
||||
event_dict["content"]["redacts"] = event.event_id
|
||||
else:
|
||||
event_dict["redacts"] = event.event_id
|
||||
|
||||
try:
|
||||
# set the prev event to the offending message to allow for redactions
|
||||
# to be processed in the case where the user has been kicked/banned before
|
||||
# redactions are requested
|
||||
(
|
||||
redaction,
|
||||
_,
|
||||
) = await self.event_creation_handler.create_and_send_nonmember_event(
|
||||
requester,
|
||||
event_dict,
|
||||
prev_event_ids=[event.event_id],
|
||||
ratelimit=False,
|
||||
)
|
||||
except Exception as ex:
|
||||
logger.info(
|
||||
f"Redaction of event {event.event_id} failed due to: {ex}"
|
||||
)
|
||||
result["failed_redactions"][event.event_id] = str(ex)
|
||||
await self._task_scheduler.update_task(task.id, result=result)
|
||||
|
||||
return TaskStatus.COMPLETE, result, None
|
||||
|
||||
|
||||
class ExfiltrationWriter(metaclass=abc.ABCMeta):
|
||||
"""Interface used to specify how to write exported data."""
|
||||
|
||||
498
synapse/handlers/delayed_events.py
Normal file
498
synapse/handlers/delayed_events.py
Normal file
@@ -0,0 +1,498 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2024 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, List, Optional, Set, Tuple
|
||||
|
||||
from twisted.internet.interfaces import IDelayedCall
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import ShadowBanError
|
||||
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
|
||||
from synapse.logging.opentracing import set_tag
|
||||
from synapse.metrics import event_processing_positions
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.replication.http.delayed_events import (
|
||||
ReplicationAddedDelayedEventRestServlet,
|
||||
)
|
||||
from synapse.storage.databases.main.delayed_events import (
|
||||
DelayedEventDetails,
|
||||
DelayID,
|
||||
EventType,
|
||||
StateKey,
|
||||
Timestamp,
|
||||
UserLocalpart,
|
||||
)
|
||||
from synapse.storage.databases.main.state_deltas import StateDelta
|
||||
from synapse.types import (
|
||||
JsonDict,
|
||||
Requester,
|
||||
RoomID,
|
||||
UserID,
|
||||
create_requester,
|
||||
)
|
||||
from synapse.util.events import generate_fake_event_id
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DelayedEventsHandler:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self._config = hs.config
|
||||
self._clock = hs.get_clock()
|
||||
self._request_ratelimiter = hs.get_request_ratelimiter()
|
||||
self._event_creation_handler = hs.get_event_creation_handler()
|
||||
self._room_member_handler = hs.get_room_member_handler()
|
||||
|
||||
self._next_delayed_event_call: Optional[IDelayedCall] = None
|
||||
|
||||
# The current position in the current_state_delta stream
|
||||
self._event_pos: Optional[int] = None
|
||||
|
||||
# Guard to ensure we only process event deltas one at a time
|
||||
self._event_processing = False
|
||||
|
||||
if hs.config.worker.worker_app is None:
|
||||
self._repl_client = None
|
||||
|
||||
async def _schedule_db_events() -> None:
|
||||
# We kick this off to pick up outstanding work from before the last restart.
|
||||
# Block until we're up to date.
|
||||
await self._unsafe_process_new_event()
|
||||
hs.get_notifier().add_replication_callback(self.notify_new_event)
|
||||
# Kick off again (without blocking) to catch any missed notifications
|
||||
# that may have fired before the callback was added.
|
||||
self._clock.call_later(0, self.notify_new_event)
|
||||
|
||||
# Delayed events that are already marked as processed on startup might not have been
|
||||
# sent properly on the last run of the server, so unmark them to send them again.
|
||||
# Caveat: this will double-send delayed events that successfully persisted, but failed
|
||||
# to be removed from the DB table of delayed events.
|
||||
# TODO: To avoid double-sending, scan the timeline to find which of these events were
|
||||
# already sent. To do so, must store delay_ids in sent events to retrieve them later.
|
||||
await self._store.unprocess_delayed_events()
|
||||
|
||||
events, next_send_ts = await self._store.process_timeout_delayed_events(
|
||||
self._get_current_ts()
|
||||
)
|
||||
|
||||
if next_send_ts:
|
||||
self._schedule_next_at(next_send_ts)
|
||||
|
||||
# Can send the events in background after having awaited on marking them as processed
|
||||
run_as_background_process(
|
||||
"_send_events",
|
||||
self._send_events,
|
||||
events,
|
||||
)
|
||||
|
||||
self._initialized_from_db = run_as_background_process(
|
||||
"_schedule_db_events", _schedule_db_events
|
||||
)
|
||||
else:
|
||||
self._repl_client = ReplicationAddedDelayedEventRestServlet.make_client(hs)
|
||||
|
||||
@property
|
||||
def _is_master(self) -> bool:
|
||||
return self._repl_client is None
|
||||
|
||||
def notify_new_event(self) -> None:
|
||||
"""
|
||||
Called when there may be more state event deltas to process,
|
||||
which should cancel pending delayed events for the same state.
|
||||
"""
|
||||
if self._event_processing:
|
||||
return
|
||||
|
||||
self._event_processing = True
|
||||
|
||||
async def process() -> None:
|
||||
try:
|
||||
await self._unsafe_process_new_event()
|
||||
finally:
|
||||
self._event_processing = False
|
||||
|
||||
run_as_background_process("delayed_events.notify_new_event", process)
|
||||
|
||||
async def _unsafe_process_new_event(self) -> None:
|
||||
# If self._event_pos is None then means we haven't fetched it from the DB yet
|
||||
if self._event_pos is None:
|
||||
self._event_pos = await self._store.get_delayed_events_stream_pos()
|
||||
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
|
||||
if self._event_pos > room_max_stream_ordering:
|
||||
# apparently, we've processed more events than exist in the database!
|
||||
# this can happen if events are removed with history purge or similar.
|
||||
logger.warning(
|
||||
"Event stream ordering appears to have gone backwards (%i -> %i): "
|
||||
"rewinding delayed events processor",
|
||||
self._event_pos,
|
||||
room_max_stream_ordering,
|
||||
)
|
||||
self._event_pos = room_max_stream_ordering
|
||||
|
||||
# Loop round handling deltas until we're up to date
|
||||
while True:
|
||||
with Measure(self._clock, "delayed_events_delta"):
|
||||
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
|
||||
if self._event_pos == room_max_stream_ordering:
|
||||
return
|
||||
|
||||
logger.debug(
|
||||
"Processing delayed events %s->%s",
|
||||
self._event_pos,
|
||||
room_max_stream_ordering,
|
||||
)
|
||||
(
|
||||
max_pos,
|
||||
deltas,
|
||||
) = await self._storage_controllers.state.get_current_state_deltas(
|
||||
self._event_pos, room_max_stream_ordering
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
"Handling %d state deltas for delayed events processing",
|
||||
len(deltas),
|
||||
)
|
||||
await self._handle_state_deltas(deltas)
|
||||
|
||||
self._event_pos = max_pos
|
||||
|
||||
# Expose current event processing position to prometheus
|
||||
event_processing_positions.labels("delayed_events").set(max_pos)
|
||||
|
||||
await self._store.update_delayed_events_stream_pos(max_pos)
|
||||
|
||||
async def _handle_state_deltas(self, deltas: List[StateDelta]) -> None:
|
||||
"""
|
||||
Process current state deltas to cancel pending delayed events
|
||||
that target the same state.
|
||||
"""
|
||||
for delta in deltas:
|
||||
logger.debug(
|
||||
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
|
||||
)
|
||||
|
||||
next_send_ts = await self._store.cancel_delayed_state_events(
|
||||
room_id=delta.room_id,
|
||||
event_type=delta.event_type,
|
||||
state_key=delta.state_key,
|
||||
)
|
||||
|
||||
if self._next_send_ts_changed(next_send_ts):
|
||||
self._schedule_next_at_or_none(next_send_ts)
|
||||
|
||||
async def add(
|
||||
self,
|
||||
requester: Requester,
|
||||
*,
|
||||
room_id: str,
|
||||
event_type: str,
|
||||
state_key: Optional[str],
|
||||
origin_server_ts: Optional[int],
|
||||
content: JsonDict,
|
||||
delay: int,
|
||||
) -> str:
|
||||
"""
|
||||
Creates a new delayed event and schedules its delivery.
|
||||
|
||||
Args:
|
||||
requester: The requester of the delayed event, who will be its owner.
|
||||
room_id: The ID of the room where the event should be sent to.
|
||||
event_type: The type of event to be sent.
|
||||
state_key: The state key of the event to be sent, or None if it is not a state event.
|
||||
origin_server_ts: The custom timestamp to send the event with.
|
||||
If None, the timestamp will be the actual time when the event is sent.
|
||||
content: The content of the event to be sent.
|
||||
delay: How long (in milliseconds) to wait before automatically sending the event.
|
||||
|
||||
Returns: The ID of the added delayed event.
|
||||
|
||||
Raises:
|
||||
SynapseError: if the delayed event fails validation checks.
|
||||
"""
|
||||
await self._request_ratelimiter.ratelimit(requester)
|
||||
|
||||
self._event_creation_handler.validator.validate_builder(
|
||||
self._event_creation_handler.event_builder_factory.for_room_version(
|
||||
await self._store.get_room_version(room_id),
|
||||
{
|
||||
"type": event_type,
|
||||
"content": content,
|
||||
"room_id": room_id,
|
||||
"sender": str(requester.user),
|
||||
**({"state_key": state_key} if state_key is not None else {}),
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
creation_ts = self._get_current_ts()
|
||||
|
||||
delay_id, next_send_ts = await self._store.add_delayed_event(
|
||||
user_localpart=requester.user.localpart,
|
||||
device_id=requester.device_id,
|
||||
creation_ts=creation_ts,
|
||||
room_id=room_id,
|
||||
event_type=event_type,
|
||||
state_key=state_key,
|
||||
origin_server_ts=origin_server_ts,
|
||||
content=content,
|
||||
delay=delay,
|
||||
)
|
||||
|
||||
if self._repl_client is not None:
|
||||
# NOTE: If this throws, the delayed event will remain in the DB and
|
||||
# will be picked up once the main worker gets another delayed event.
|
||||
await self._repl_client(
|
||||
instance_name=MAIN_PROCESS_INSTANCE_NAME,
|
||||
next_send_ts=next_send_ts,
|
||||
)
|
||||
elif self._next_send_ts_changed(next_send_ts):
|
||||
self._schedule_next_at(next_send_ts)
|
||||
|
||||
return delay_id
|
||||
|
||||
def on_added(self, next_send_ts: int) -> None:
|
||||
next_send_ts = Timestamp(next_send_ts)
|
||||
if self._next_send_ts_changed(next_send_ts):
|
||||
self._schedule_next_at(next_send_ts)
|
||||
|
||||
async def cancel(self, requester: Requester, delay_id: str) -> None:
|
||||
"""
|
||||
Cancels the scheduled delivery of the matching delayed event.
|
||||
|
||||
Args:
|
||||
requester: The owner of the delayed event to act on.
|
||||
delay_id: The ID of the delayed event to act on.
|
||||
|
||||
Raises:
|
||||
NotFoundError: if no matching delayed event could be found.
|
||||
"""
|
||||
assert self._is_master
|
||||
await self._request_ratelimiter.ratelimit(requester)
|
||||
await self._initialized_from_db
|
||||
|
||||
next_send_ts = await self._store.cancel_delayed_event(
|
||||
delay_id=delay_id,
|
||||
user_localpart=requester.user.localpart,
|
||||
)
|
||||
|
||||
if self._next_send_ts_changed(next_send_ts):
|
||||
self._schedule_next_at_or_none(next_send_ts)
|
||||
|
||||
async def restart(self, requester: Requester, delay_id: str) -> None:
|
||||
"""
|
||||
Restarts the scheduled delivery of the matching delayed event.
|
||||
|
||||
Args:
|
||||
requester: The owner of the delayed event to act on.
|
||||
delay_id: The ID of the delayed event to act on.
|
||||
|
||||
Raises:
|
||||
NotFoundError: if no matching delayed event could be found.
|
||||
"""
|
||||
assert self._is_master
|
||||
await self._request_ratelimiter.ratelimit(requester)
|
||||
await self._initialized_from_db
|
||||
|
||||
next_send_ts = await self._store.restart_delayed_event(
|
||||
delay_id=delay_id,
|
||||
user_localpart=requester.user.localpart,
|
||||
current_ts=self._get_current_ts(),
|
||||
)
|
||||
|
||||
if self._next_send_ts_changed(next_send_ts):
|
||||
self._schedule_next_at(next_send_ts)
|
||||
|
||||
async def send(self, requester: Requester, delay_id: str) -> None:
|
||||
"""
|
||||
Immediately sends the matching delayed event, instead of waiting for its scheduled delivery.
|
||||
|
||||
Args:
|
||||
requester: The owner of the delayed event to act on.
|
||||
delay_id: The ID of the delayed event to act on.
|
||||
|
||||
Raises:
|
||||
NotFoundError: if no matching delayed event could be found.
|
||||
"""
|
||||
assert self._is_master
|
||||
await self._request_ratelimiter.ratelimit(requester)
|
||||
await self._initialized_from_db
|
||||
|
||||
event, next_send_ts = await self._store.process_target_delayed_event(
|
||||
delay_id=delay_id,
|
||||
user_localpart=requester.user.localpart,
|
||||
)
|
||||
|
||||
if self._next_send_ts_changed(next_send_ts):
|
||||
self._schedule_next_at_or_none(next_send_ts)
|
||||
|
||||
await self._send_event(
|
||||
DelayedEventDetails(
|
||||
delay_id=DelayID(delay_id),
|
||||
user_localpart=UserLocalpart(requester.user.localpart),
|
||||
room_id=event.room_id,
|
||||
type=event.type,
|
||||
state_key=event.state_key,
|
||||
origin_server_ts=event.origin_server_ts,
|
||||
content=event.content,
|
||||
device_id=event.device_id,
|
||||
)
|
||||
)
|
||||
|
||||
async def _send_on_timeout(self) -> None:
|
||||
self._next_delayed_event_call = None
|
||||
|
||||
events, next_send_ts = await self._store.process_timeout_delayed_events(
|
||||
self._get_current_ts()
|
||||
)
|
||||
|
||||
if next_send_ts:
|
||||
self._schedule_next_at(next_send_ts)
|
||||
|
||||
await self._send_events(events)
|
||||
|
||||
async def _send_events(self, events: List[DelayedEventDetails]) -> None:
|
||||
sent_state: Set[Tuple[RoomID, EventType, StateKey]] = set()
|
||||
for event in events:
|
||||
if event.state_key is not None:
|
||||
state_info = (event.room_id, event.type, event.state_key)
|
||||
if state_info in sent_state:
|
||||
continue
|
||||
else:
|
||||
state_info = None
|
||||
try:
|
||||
# TODO: send in background if message event or non-conflicting state event
|
||||
await self._send_event(event)
|
||||
if state_info is not None:
|
||||
sent_state.add(state_info)
|
||||
except Exception:
|
||||
logger.exception("Failed to send delayed event")
|
||||
|
||||
for room_id, event_type, state_key in sent_state:
|
||||
await self._store.delete_processed_delayed_state_events(
|
||||
room_id=str(room_id),
|
||||
event_type=event_type,
|
||||
state_key=state_key,
|
||||
)
|
||||
|
||||
def _schedule_next_at_or_none(self, next_send_ts: Optional[Timestamp]) -> None:
|
||||
if next_send_ts is not None:
|
||||
self._schedule_next_at(next_send_ts)
|
||||
elif self._next_delayed_event_call is not None:
|
||||
self._next_delayed_event_call.cancel()
|
||||
self._next_delayed_event_call = None
|
||||
|
||||
def _schedule_next_at(self, next_send_ts: Timestamp) -> None:
|
||||
delay = next_send_ts - self._get_current_ts()
|
||||
delay_sec = delay / 1000 if delay > 0 else 0
|
||||
|
||||
if self._next_delayed_event_call is None:
|
||||
self._next_delayed_event_call = self._clock.call_later(
|
||||
delay_sec,
|
||||
run_as_background_process,
|
||||
"_send_on_timeout",
|
||||
self._send_on_timeout,
|
||||
)
|
||||
else:
|
||||
self._next_delayed_event_call.reset(delay_sec)
|
||||
|
||||
async def get_all_for_user(self, requester: Requester) -> List[JsonDict]:
|
||||
"""Return all pending delayed events requested by the given user."""
|
||||
await self._request_ratelimiter.ratelimit(requester)
|
||||
return await self._store.get_all_delayed_events_for_user(
|
||||
requester.user.localpart
|
||||
)
|
||||
|
||||
async def _send_event(
|
||||
self,
|
||||
event: DelayedEventDetails,
|
||||
txn_id: Optional[str] = None,
|
||||
) -> None:
|
||||
user_id = UserID(event.user_localpart, self._config.server.server_name)
|
||||
user_id_str = user_id.to_string()
|
||||
# Create a new requester from what data is currently available
|
||||
requester = create_requester(
|
||||
user_id,
|
||||
is_guest=await self._store.is_guest(user_id_str),
|
||||
device_id=event.device_id,
|
||||
)
|
||||
|
||||
try:
|
||||
if event.state_key is not None and event.type == EventTypes.Member:
|
||||
membership = event.content.get("membership")
|
||||
assert membership is not None
|
||||
event_id, _ = await self._room_member_handler.update_membership(
|
||||
requester,
|
||||
target=UserID.from_string(event.state_key),
|
||||
room_id=event.room_id.to_string(),
|
||||
action=membership,
|
||||
content=event.content,
|
||||
origin_server_ts=event.origin_server_ts,
|
||||
)
|
||||
else:
|
||||
event_dict: JsonDict = {
|
||||
"type": event.type,
|
||||
"content": event.content,
|
||||
"room_id": event.room_id.to_string(),
|
||||
"sender": user_id_str,
|
||||
}
|
||||
|
||||
if event.origin_server_ts is not None:
|
||||
event_dict["origin_server_ts"] = event.origin_server_ts
|
||||
|
||||
if event.state_key is not None:
|
||||
event_dict["state_key"] = event.state_key
|
||||
|
||||
(
|
||||
sent_event,
|
||||
_,
|
||||
) = await self._event_creation_handler.create_and_send_nonmember_event(
|
||||
requester,
|
||||
event_dict,
|
||||
txn_id=txn_id,
|
||||
)
|
||||
event_id = sent_event.event_id
|
||||
except ShadowBanError:
|
||||
event_id = generate_fake_event_id()
|
||||
finally:
|
||||
# TODO: If this is a temporary error, retry. Otherwise, consider notifying clients of the failure
|
||||
try:
|
||||
await self._store.delete_processed_delayed_event(
|
||||
event.delay_id, event.user_localpart
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to delete processed delayed event")
|
||||
|
||||
set_tag("event_id", event_id)
|
||||
|
||||
def _get_current_ts(self) -> Timestamp:
|
||||
return Timestamp(self._clock.time_msec())
|
||||
|
||||
def _next_send_ts_changed(self, next_send_ts: Optional[Timestamp]) -> bool:
|
||||
# The DB alone knows if the next send time changed after adding/modifying
|
||||
# a delayed event, but if we were to ever miss updating our delayed call's
|
||||
# firing time, we may miss other updates. So, keep track of changes to the
|
||||
# the next send time here instead of in the DB.
|
||||
cached_next_send_ts = (
|
||||
int(self._next_delayed_event_call.getTime() * 1000)
|
||||
if self._next_delayed_event_call is not None
|
||||
else None
|
||||
)
|
||||
return next_send_ts != cached_next_send_ts
|
||||
@@ -48,6 +48,7 @@ from synapse.metrics.background_process_metrics import (
|
||||
wrap_as_background_process,
|
||||
)
|
||||
from synapse.storage.databases.main.client_ips import DeviceLastConnectionInfo
|
||||
from synapse.storage.databases.main.roommember import EventIdMembership
|
||||
from synapse.storage.databases.main.state_deltas import StateDelta
|
||||
from synapse.types import (
|
||||
DeviceListUpdates,
|
||||
@@ -222,7 +223,6 @@ class DeviceWorkerHandler:
|
||||
return changed
|
||||
|
||||
@trace
|
||||
@measure_func("device.get_user_ids_changed")
|
||||
@cancellable
|
||||
async def get_user_ids_changed(
|
||||
self, user_id: str, from_token: StreamToken
|
||||
@@ -290,9 +290,11 @@ class DeviceWorkerHandler:
|
||||
memberships_to_fetch.add(delta.prev_event_id)
|
||||
|
||||
# Fetch all the memberships for the membership events
|
||||
event_id_to_memberships = await self.store.get_membership_from_event_ids(
|
||||
memberships_to_fetch
|
||||
)
|
||||
event_id_to_memberships: Mapping[str, Optional[EventIdMembership]] = {}
|
||||
if memberships_to_fetch:
|
||||
event_id_to_memberships = await self.store.get_membership_from_event_ids(
|
||||
memberships_to_fetch
|
||||
)
|
||||
|
||||
joined_invited_knocked = (
|
||||
Membership.JOIN,
|
||||
@@ -349,7 +351,6 @@ class DeviceWorkerHandler:
|
||||
|
||||
return device_list_updates
|
||||
|
||||
@measure_func("_generate_sync_entry_for_device_list")
|
||||
async def generate_sync_entry_for_device_list(
|
||||
self,
|
||||
user_id: str,
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, Optional, Tuple
|
||||
|
||||
from authlib.jose import JsonWebToken, JWTClaims
|
||||
from authlib.jose.errors import BadSignatureError, InvalidClaimError, JoseError
|
||||
@@ -36,11 +36,12 @@ class JwtHandler:
|
||||
|
||||
self.jwt_secret = hs.config.jwt.jwt_secret
|
||||
self.jwt_subject_claim = hs.config.jwt.jwt_subject_claim
|
||||
self.jwt_display_name_claim = hs.config.jwt.jwt_display_name_claim
|
||||
self.jwt_algorithm = hs.config.jwt.jwt_algorithm
|
||||
self.jwt_issuer = hs.config.jwt.jwt_issuer
|
||||
self.jwt_audiences = hs.config.jwt.jwt_audiences
|
||||
|
||||
def validate_login(self, login_submission: JsonDict) -> str:
|
||||
def validate_login(self, login_submission: JsonDict) -> Tuple[str, Optional[str]]:
|
||||
"""
|
||||
Authenticates the user for the /login API
|
||||
|
||||
@@ -49,7 +50,8 @@ class JwtHandler:
|
||||
(including 'type' and other relevant fields)
|
||||
|
||||
Returns:
|
||||
The user ID that is logging in.
|
||||
A tuple of (user_id, display_name) of the user that is logging in.
|
||||
If the JWT does not contain a display name, the second element of the tuple will be None.
|
||||
|
||||
Raises:
|
||||
LoginError if there was an authentication problem.
|
||||
@@ -109,4 +111,10 @@ class JwtHandler:
|
||||
if user is None:
|
||||
raise LoginError(403, "Invalid JWT", errcode=Codes.FORBIDDEN)
|
||||
|
||||
return UserID(user, self.hs.hostname).to_string()
|
||||
default_display_name = None
|
||||
if self.jwt_display_name_claim:
|
||||
display_name_claim = claims.get(self.jwt_display_name_claim)
|
||||
if display_name_claim is not None:
|
||||
default_display_name = display_name_claim
|
||||
|
||||
return UserID(user, self.hs.hostname).to_string(), default_display_name
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
import logging
|
||||
from itertools import chain
|
||||
from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Set, Tuple
|
||||
from typing import TYPE_CHECKING, AbstractSet, Dict, List, Mapping, Optional, Set, Tuple
|
||||
|
||||
from prometheus_client import Histogram
|
||||
from typing_extensions import assert_never
|
||||
@@ -49,6 +49,7 @@ from synapse.types import (
|
||||
Requester,
|
||||
SlidingSyncStreamToken,
|
||||
StateMap,
|
||||
StrCollection,
|
||||
StreamKeyType,
|
||||
StreamToken,
|
||||
)
|
||||
@@ -267,7 +268,7 @@ class SlidingSyncHandler:
|
||||
|
||||
if relevant_rooms_to_send_map:
|
||||
with start_active_span("sliding_sync.generate_room_entries"):
|
||||
await concurrently_execute(handle_room, relevant_rooms_to_send_map, 10)
|
||||
await concurrently_execute(handle_room, relevant_rooms_to_send_map, 20)
|
||||
|
||||
extensions = await self.extensions.get_extensions_response(
|
||||
sync_config=sync_config,
|
||||
@@ -293,7 +294,6 @@ class SlidingSyncHandler:
|
||||
# to record rooms as having updates even if there might not actually
|
||||
# be anything new for the user (e.g. due to event filters, events
|
||||
# having happened after the user left, etc).
|
||||
unsent_room_ids = []
|
||||
if from_token:
|
||||
# The set of rooms that the client (may) care about, but aren't
|
||||
# in any list range (or subscribed to).
|
||||
@@ -305,15 +305,24 @@ class SlidingSyncHandler:
|
||||
# TODO: Replace this with something faster. When we land the
|
||||
# sliding sync tables that record the most recent event
|
||||
# positions we can use that.
|
||||
missing_event_map_by_room = (
|
||||
await self.store.get_room_events_stream_for_rooms(
|
||||
room_ids=missing_rooms,
|
||||
from_key=to_token.room_key,
|
||||
to_key=from_token.stream_token.room_key,
|
||||
limit=1,
|
||||
unsent_room_ids: StrCollection
|
||||
if await self.store.have_finished_sliding_sync_background_jobs():
|
||||
unsent_room_ids = await (
|
||||
self.store.get_rooms_that_have_updates_since_sliding_sync_table(
|
||||
room_ids=missing_rooms,
|
||||
from_key=from_token.stream_token.room_key,
|
||||
)
|
||||
)
|
||||
)
|
||||
unsent_room_ids = list(missing_event_map_by_room)
|
||||
else:
|
||||
missing_event_map_by_room = (
|
||||
await self.store.get_room_events_stream_for_rooms(
|
||||
room_ids=missing_rooms,
|
||||
from_key=to_token.room_key,
|
||||
to_key=from_token.stream_token.room_key,
|
||||
limit=1,
|
||||
)
|
||||
)
|
||||
unsent_room_ids = list(missing_event_map_by_room)
|
||||
|
||||
new_connection_state.rooms.record_unsent_rooms(
|
||||
unsent_room_ids, from_token.stream_token.room_key
|
||||
@@ -443,13 +452,11 @@ class SlidingSyncHandler:
|
||||
to_token=to_token,
|
||||
)
|
||||
|
||||
event_map = await self.store.get_events(list(state_ids.values()))
|
||||
events = await self.store.get_events_as_list(list(state_ids.values()))
|
||||
|
||||
state_map = {}
|
||||
for key, event_id in state_ids.items():
|
||||
event = event_map.get(event_id)
|
||||
if event:
|
||||
state_map[key] = event
|
||||
for event in events:
|
||||
state_map[(event.type, event.state_key)] = event
|
||||
|
||||
return state_map
|
||||
|
||||
@@ -495,6 +502,26 @@ class SlidingSyncHandler:
|
||||
room_sync_config.timeline_limit,
|
||||
)
|
||||
|
||||
# Handle state resets. For example, if we see
|
||||
# `room_membership_for_user_at_to_token.event_id=None and
|
||||
# room_membership_for_user_at_to_token.membership is not None`, we should
|
||||
# indicate to the client that a state reset happened. Perhaps we should indicate
|
||||
# this by setting `initial: True` and empty `required_state: []`.
|
||||
state_reset_out_of_room = False
|
||||
if (
|
||||
room_membership_for_user_at_to_token.event_id is None
|
||||
and room_membership_for_user_at_to_token.membership is not None
|
||||
):
|
||||
# We only expect the `event_id` to be `None` if you've been state reset out
|
||||
# of the room (meaning you're no longer in the room). We could put this as
|
||||
# part of the if-statement above but we want to handle every case where
|
||||
# `event_id` is `None`.
|
||||
assert room_membership_for_user_at_to_token.membership is Membership.LEAVE
|
||||
|
||||
state_reset_out_of_room = True
|
||||
|
||||
prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
|
||||
|
||||
# Determine whether we should limit the timeline to the token range.
|
||||
#
|
||||
# We should return historical messages (before token range) in the
|
||||
@@ -523,11 +550,10 @@ class SlidingSyncHandler:
|
||||
# or `limited` mean for clients that interpret them correctly. In future this
|
||||
# behavior is almost certainly going to change.
|
||||
#
|
||||
# TODO: Also handle changes to `required_state`
|
||||
from_bound = None
|
||||
initial = True
|
||||
ignore_timeline_bound = False
|
||||
if from_token and not newly_joined:
|
||||
if from_token and not newly_joined and not state_reset_out_of_room:
|
||||
room_status = previous_connection_state.rooms.have_sent_room(room_id)
|
||||
if room_status.status == HaveSentRoomFlag.LIVE:
|
||||
from_bound = from_token.stream_token.room_key
|
||||
@@ -544,7 +570,6 @@ class SlidingSyncHandler:
|
||||
|
||||
log_kv({"sliding_sync.room_status": room_status})
|
||||
|
||||
prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
|
||||
if prev_room_sync_config is not None:
|
||||
# Check if the timeline limit has increased, if so ignore the
|
||||
# timeline bound and record the change (see "XXX: Odd behavior"
|
||||
@@ -555,8 +580,6 @@ class SlidingSyncHandler:
|
||||
):
|
||||
ignore_timeline_bound = True
|
||||
|
||||
# TODO: Check for changes in `required_state``
|
||||
|
||||
log_kv(
|
||||
{
|
||||
"sliding_sync.from_bound": from_bound,
|
||||
@@ -732,12 +755,6 @@ class SlidingSyncHandler:
|
||||
|
||||
stripped_state.append(strip_event(invite_or_knock_event))
|
||||
|
||||
# TODO: Handle state resets. For example, if we see
|
||||
# `room_membership_for_user_at_to_token.event_id=None and
|
||||
# room_membership_for_user_at_to_token.membership is not None`, we should
|
||||
# indicate to the client that a state reset happened. Perhaps we should indicate
|
||||
# this by setting `initial: True` and empty `required_state`.
|
||||
|
||||
# Get the changes to current state in the token range from the
|
||||
# `current_state_delta_stream` table.
|
||||
#
|
||||
@@ -784,32 +801,10 @@ class SlidingSyncHandler:
|
||||
):
|
||||
avatar_changed = True
|
||||
|
||||
# We only need the room summary for calculating heroes, however if we do
|
||||
# fetch it then we can use it to calculate `joined_count` and
|
||||
# `invited_count`.
|
||||
room_membership_summary: Optional[Mapping[str, MemberSummary]] = None
|
||||
empty_membership_summary = MemberSummary([], 0)
|
||||
# We need the room summary for:
|
||||
# - Always for initial syncs (or the first time we send down the room)
|
||||
# - When the room has no name, we need `heroes`
|
||||
# - When the membership has changed so we need to give updated `heroes` and
|
||||
# `joined_count`/`invited_count`.
|
||||
#
|
||||
# Ideally, instead of just looking at `name_changed`, we'd check if the room
|
||||
# name is not set but this is a good enough approximation that saves us from
|
||||
# having to pull out the full event. This just means, we're generating the
|
||||
# summary whenever the room name changes instead of only when it changes to
|
||||
# `None`.
|
||||
if initial or name_changed or membership_changed:
|
||||
# We can't trace the function directly because it's cached and the `@cached`
|
||||
# decorator doesn't mix with `@trace` yet.
|
||||
with start_active_span("get_room_summary"):
|
||||
if room_membership_for_user_at_to_token.membership in (
|
||||
Membership.LEAVE,
|
||||
Membership.BAN,
|
||||
):
|
||||
# TODO: Figure out how to get the membership summary for left/banned rooms
|
||||
room_membership_summary = {}
|
||||
else:
|
||||
room_membership_summary = await self.store.get_room_summary(room_id)
|
||||
# TODO: Reverse/rewind back to the `to_token`
|
||||
|
||||
# `heroes` are required if the room name is not set.
|
||||
#
|
||||
@@ -828,11 +823,45 @@ class SlidingSyncHandler:
|
||||
# get them on initial syncs (or the first time we send down the room) or if the
|
||||
# membership has changed which may change the heroes.
|
||||
if name_event_id is None and (initial or (not initial and membership_changed)):
|
||||
assert room_membership_summary is not None
|
||||
# We need the room summary to extract the heroes from
|
||||
if room_membership_for_user_at_to_token.membership != Membership.JOIN:
|
||||
# TODO: Figure out how to get the membership summary for left/banned rooms
|
||||
# For invite/knock rooms we don't include the information.
|
||||
room_membership_summary = {}
|
||||
else:
|
||||
room_membership_summary = await self.store.get_room_summary(room_id)
|
||||
# TODO: Reverse/rewind back to the `to_token`
|
||||
|
||||
hero_user_ids = extract_heroes_from_room_summary(
|
||||
room_membership_summary, me=user.to_string()
|
||||
)
|
||||
|
||||
# Fetch the membership counts for rooms we're joined to.
|
||||
#
|
||||
# Similarly to other metadata, we only need to calculate the member
|
||||
# counts if this is an initial sync or the memberships have changed.
|
||||
joined_count: Optional[int] = None
|
||||
invited_count: Optional[int] = None
|
||||
if (
|
||||
initial or membership_changed
|
||||
) and room_membership_for_user_at_to_token.membership == Membership.JOIN:
|
||||
# If we have the room summary (because we calculated heroes above)
|
||||
# then we can simply pull the counts from there.
|
||||
if room_membership_summary is not None:
|
||||
empty_membership_summary = MemberSummary([], 0)
|
||||
|
||||
joined_count = room_membership_summary.get(
|
||||
Membership.JOIN, empty_membership_summary
|
||||
).count
|
||||
|
||||
invited_count = room_membership_summary.get(
|
||||
Membership.INVITE, empty_membership_summary
|
||||
).count
|
||||
else:
|
||||
member_counts = await self.store.get_member_counts(room_id)
|
||||
joined_count = member_counts.get(Membership.JOIN, 0)
|
||||
invited_count = member_counts.get(Membership.INVITE, 0)
|
||||
|
||||
# Fetch the `required_state` for the room
|
||||
#
|
||||
# No `required_state` for invite/knock rooms (just `stripped_state`)
|
||||
@@ -964,6 +993,10 @@ class SlidingSyncHandler:
|
||||
include_others=required_state_filter.include_others,
|
||||
)
|
||||
|
||||
# The required state map to store in the room sync config, if it has
|
||||
# changed.
|
||||
changed_required_state_map: Optional[Mapping[str, AbstractSet[str]]] = None
|
||||
|
||||
# We can return all of the state that was requested if this was the first
|
||||
# time we've sent the room down this connection.
|
||||
room_state: StateMap[EventBase] = {}
|
||||
@@ -977,6 +1010,29 @@ class SlidingSyncHandler:
|
||||
else:
|
||||
assert from_bound is not None
|
||||
|
||||
if prev_room_sync_config is not None:
|
||||
# Check if there are any changes to the required state config
|
||||
# that we need to handle.
|
||||
changed_required_state_map, added_state_filter = (
|
||||
_required_state_changes(
|
||||
user.to_string(),
|
||||
previous_room_config=prev_room_sync_config,
|
||||
room_sync_config=room_sync_config,
|
||||
state_deltas=room_state_delta_id_map,
|
||||
)
|
||||
)
|
||||
|
||||
if added_state_filter:
|
||||
# Some state entries got added, so we pull out the current
|
||||
# state for them. If we don't do this we'd only send down new deltas.
|
||||
state_ids = await self.get_current_state_ids_at(
|
||||
room_id=room_id,
|
||||
room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
|
||||
state_filter=added_state_filter,
|
||||
to_token=to_token,
|
||||
)
|
||||
room_state_delta_id_map.update(state_ids)
|
||||
|
||||
events = await self.store.get_events(
|
||||
state_filter.filter_state(room_state_delta_id_map).values()
|
||||
)
|
||||
@@ -1024,25 +1080,64 @@ class SlidingSyncHandler:
|
||||
)
|
||||
)
|
||||
|
||||
# Figure out the last bump event in the room
|
||||
#
|
||||
# By default, just choose the membership event position for any non-join membership
|
||||
bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
|
||||
# Figure out the last bump event in the room. If the bump stamp hasn't
|
||||
# changed we omit it from the response.
|
||||
bump_stamp = None
|
||||
|
||||
always_return_bump_stamp = (
|
||||
# We use the membership event position for any non-join
|
||||
room_membership_for_user_at_to_token.membership != Membership.JOIN
|
||||
# We didn't fetch any timeline events but we should still check for
|
||||
# a bump_stamp that might be somewhere
|
||||
or limited is None
|
||||
# There might be a bump event somewhere before the timeline events
|
||||
# that we fetched, that we didn't previously send down
|
||||
or limited is True
|
||||
# Always give the client some frame of reference if this is the
|
||||
# first time they are seeing the room down the connection
|
||||
or initial
|
||||
)
|
||||
|
||||
# If we're joined to the room, we need to find the last bump event before the
|
||||
# `to_token`
|
||||
if room_membership_for_user_at_to_token.membership == Membership.JOIN:
|
||||
# Try and get a bump stamp, if not we just fall back to the
|
||||
# membership token.
|
||||
# Try and get a bump stamp
|
||||
new_bump_stamp = await self._get_bump_stamp(
|
||||
room_id, to_token, timeline_events
|
||||
room_id,
|
||||
to_token,
|
||||
timeline_events,
|
||||
check_outside_timeline=always_return_bump_stamp,
|
||||
)
|
||||
if new_bump_stamp is not None:
|
||||
bump_stamp = new_bump_stamp
|
||||
|
||||
unstable_expanded_timeline = False
|
||||
prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
|
||||
if bump_stamp is None and always_return_bump_stamp:
|
||||
# By default, just choose the membership event position for any non-join membership
|
||||
bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
|
||||
|
||||
if bump_stamp is not None and bump_stamp < 0:
|
||||
# We never want to send down negative stream orderings, as you can't
|
||||
# sensibly compare positive and negative stream orderings (they have
|
||||
# different meanings).
|
||||
#
|
||||
# A negative bump stamp here can only happen if the stream ordering
|
||||
# of the membership event is negative (and there are no further bump
|
||||
# stamps), which can happen if the server leaves and deletes a room,
|
||||
# and then rejoins it.
|
||||
#
|
||||
# To deal with this, we just set the bump stamp to zero, which will
|
||||
# shove this room to the bottom of the list. This is OK as the
|
||||
# moment a new message happens in the room it will get put into a
|
||||
# sensible order again.
|
||||
bump_stamp = 0
|
||||
|
||||
room_sync_required_state_map_to_persist = room_sync_config.required_state_map
|
||||
if changed_required_state_map:
|
||||
room_sync_required_state_map_to_persist = changed_required_state_map
|
||||
|
||||
# Record the `room_sync_config` if we're `ignore_timeline_bound` (which means
|
||||
# that the `timeline_limit` has increased)
|
||||
unstable_expanded_timeline = False
|
||||
if ignore_timeline_bound:
|
||||
# FIXME: We signal the fact that we're sending down more events to
|
||||
# the client by setting `unstable_expanded_timeline` to true (see
|
||||
@@ -1051,7 +1146,7 @@ class SlidingSyncHandler:
|
||||
|
||||
new_connection_state.room_configs[room_id] = RoomSyncConfig(
|
||||
timeline_limit=room_sync_config.timeline_limit,
|
||||
required_state_map=room_sync_config.required_state_map,
|
||||
required_state_map=room_sync_required_state_map_to_persist,
|
||||
)
|
||||
elif prev_room_sync_config is not None:
|
||||
# If the result is `limited` then we need to record that the
|
||||
@@ -1080,30 +1175,20 @@ class SlidingSyncHandler:
|
||||
):
|
||||
new_connection_state.room_configs[room_id] = RoomSyncConfig(
|
||||
timeline_limit=room_sync_config.timeline_limit,
|
||||
required_state_map=room_sync_config.required_state_map,
|
||||
required_state_map=room_sync_required_state_map_to_persist,
|
||||
)
|
||||
|
||||
# TODO: Record changes in required_state.
|
||||
elif changed_required_state_map is not None:
|
||||
new_connection_state.room_configs[room_id] = RoomSyncConfig(
|
||||
timeline_limit=room_sync_config.timeline_limit,
|
||||
required_state_map=room_sync_required_state_map_to_persist,
|
||||
)
|
||||
|
||||
else:
|
||||
new_connection_state.room_configs[room_id] = room_sync_config
|
||||
|
||||
set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
|
||||
|
||||
joined_count: Optional[int] = None
|
||||
if initial or membership_changed:
|
||||
assert room_membership_summary is not None
|
||||
joined_count = room_membership_summary.get(
|
||||
Membership.JOIN, empty_membership_summary
|
||||
).count
|
||||
|
||||
invited_count: Optional[int] = None
|
||||
if initial or membership_changed:
|
||||
assert room_membership_summary is not None
|
||||
invited_count = room_membership_summary.get(
|
||||
Membership.INVITE, empty_membership_summary
|
||||
).count
|
||||
|
||||
return SlidingSyncResult.RoomResult(
|
||||
name=room_name,
|
||||
avatar=room_avatar,
|
||||
@@ -1130,14 +1215,23 @@ class SlidingSyncHandler:
|
||||
|
||||
@trace
|
||||
async def _get_bump_stamp(
|
||||
self, room_id: str, to_token: StreamToken, timeline: List[EventBase]
|
||||
self,
|
||||
room_id: str,
|
||||
to_token: StreamToken,
|
||||
timeline: List[EventBase],
|
||||
check_outside_timeline: bool,
|
||||
) -> Optional[int]:
|
||||
"""Get a bump stamp for the room, if we have a bump event
|
||||
"""Get a bump stamp for the room, if we have a bump event and it has
|
||||
changed.
|
||||
|
||||
Args:
|
||||
room_id
|
||||
to_token: The upper bound of token to return
|
||||
timeline: The list of events we have fetched.
|
||||
limited: If the timeline was limited.
|
||||
check_outside_timeline: Whether we need to check for bump stamp for
|
||||
events before the timeline if we didn't find a bump stamp in
|
||||
the timeline events.
|
||||
"""
|
||||
|
||||
# First check the timeline events we're returning to see if one of
|
||||
@@ -1157,6 +1251,11 @@ class SlidingSyncHandler:
|
||||
if new_bump_stamp > 0:
|
||||
return new_bump_stamp
|
||||
|
||||
if not check_outside_timeline:
|
||||
# If we are not a limited sync, then we know the bump stamp can't
|
||||
# have changed.
|
||||
return None
|
||||
|
||||
# We can quickly query for the latest bump event in the room using the
|
||||
# sliding sync tables.
|
||||
latest_room_bump_stamp = await self.store.get_latest_bump_stamp_for_room(
|
||||
@@ -1216,3 +1315,185 @@ class SlidingSyncHandler:
|
||||
return new_bump_event_pos.stream
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _required_state_changes(
|
||||
user_id: str,
|
||||
*,
|
||||
previous_room_config: "RoomSyncConfig",
|
||||
room_sync_config: RoomSyncConfig,
|
||||
state_deltas: StateMap[str],
|
||||
) -> Tuple[Optional[Mapping[str, AbstractSet[str]]], StateFilter]:
|
||||
"""Calculates the changes between the required state room config from the
|
||||
previous requests compared with the current request.
|
||||
|
||||
This does two things. First, it calculates if we need to update the room
|
||||
config due to changes to required state. Secondly, it works out which state
|
||||
entries we need to pull from current state and return due to the state entry
|
||||
now appearing in the required state when it previously wasn't (on top of the
|
||||
state deltas).
|
||||
|
||||
This function tries to ensure to handle the case where a state entry is
|
||||
added, removed and then added again to the required state. In that case we
|
||||
only want to re-send that entry down sync if it has changed.
|
||||
|
||||
Returns:
|
||||
A 2-tuple of updated required state config (or None if there is no update)
|
||||
and the state filter to use to fetch extra current state that we need to
|
||||
return.
|
||||
"""
|
||||
|
||||
prev_required_state_map = previous_room_config.required_state_map
|
||||
request_required_state_map = room_sync_config.required_state_map
|
||||
|
||||
if prev_required_state_map == request_required_state_map:
|
||||
# There has been no change. Return immediately.
|
||||
return None, StateFilter.none()
|
||||
|
||||
prev_wildcard = prev_required_state_map.get(StateValues.WILDCARD, set())
|
||||
request_wildcard = request_required_state_map.get(StateValues.WILDCARD, set())
|
||||
|
||||
# If we were previously fetching everything ("*", "*"), always update the effective
|
||||
# room required state config to match the request. And since we we're previously
|
||||
# already fetching everything, we don't have to fetch anything now that they've
|
||||
# narrowed.
|
||||
if StateValues.WILDCARD in prev_wildcard:
|
||||
return request_required_state_map, StateFilter.none()
|
||||
|
||||
# If a event type wildcard has been added or removed we don't try and do
|
||||
# anything fancy, and instead always update the effective room required
|
||||
# state config to match the request.
|
||||
if request_wildcard - prev_wildcard:
|
||||
# Some keys were added, so we need to fetch everything
|
||||
return request_required_state_map, StateFilter.all()
|
||||
if prev_wildcard - request_wildcard:
|
||||
# Keys were only removed, so we don't have to fetch everything.
|
||||
return request_required_state_map, StateFilter.none()
|
||||
|
||||
# Contains updates to the required state map compared with the previous room
|
||||
# config. This has the same format as `RoomSyncConfig.required_state`
|
||||
changes: Dict[str, AbstractSet[str]] = {}
|
||||
|
||||
# The set of types/state keys that we need to fetch and return to the
|
||||
# client. Passed to `StateFilter.from_types(...)`
|
||||
added: List[Tuple[str, Optional[str]]] = []
|
||||
|
||||
# First we calculate what, if anything, has been *added*.
|
||||
for event_type in (
|
||||
prev_required_state_map.keys() | request_required_state_map.keys()
|
||||
):
|
||||
old_state_keys = prev_required_state_map.get(event_type, set())
|
||||
request_state_keys = request_required_state_map.get(event_type, set())
|
||||
|
||||
if old_state_keys == request_state_keys:
|
||||
# No change to this type
|
||||
continue
|
||||
|
||||
if not request_state_keys - old_state_keys:
|
||||
# Nothing *added*, so we skip. Removals happen below.
|
||||
continue
|
||||
|
||||
# Always update changes to include the newly added keys
|
||||
changes[event_type] = request_state_keys
|
||||
|
||||
if StateValues.WILDCARD in old_state_keys:
|
||||
# We were previously fetching everything for this type, so we don't need to
|
||||
# fetch anything new.
|
||||
continue
|
||||
|
||||
# Record the new state keys to fetch for this type.
|
||||
if StateValues.WILDCARD in request_state_keys:
|
||||
# If we have added a wildcard then we always just fetch everything.
|
||||
added.append((event_type, None))
|
||||
else:
|
||||
for state_key in request_state_keys - old_state_keys:
|
||||
if state_key == StateValues.ME:
|
||||
added.append((event_type, user_id))
|
||||
elif state_key == StateValues.LAZY:
|
||||
# We handle lazy loading separately (outside this function),
|
||||
# so don't need to explicitly add anything here.
|
||||
#
|
||||
# LAZY values should also be ignore for event types that are
|
||||
# not membership.
|
||||
pass
|
||||
else:
|
||||
added.append((event_type, state_key))
|
||||
|
||||
added_state_filter = StateFilter.from_types(added)
|
||||
|
||||
# Convert the list of state deltas to map from type to state_keys that have
|
||||
# changed.
|
||||
changed_types_to_state_keys: Dict[str, Set[str]] = {}
|
||||
for event_type, state_key in state_deltas:
|
||||
changed_types_to_state_keys.setdefault(event_type, set()).add(state_key)
|
||||
|
||||
# Figure out what changes we need to apply to the effective required state
|
||||
# config.
|
||||
for event_type, changed_state_keys in changed_types_to_state_keys.items():
|
||||
old_state_keys = prev_required_state_map.get(event_type, set())
|
||||
request_state_keys = request_required_state_map.get(event_type, set())
|
||||
|
||||
if old_state_keys == request_state_keys:
|
||||
# No change.
|
||||
continue
|
||||
|
||||
if request_state_keys - old_state_keys:
|
||||
# We've expanded the set of state keys, so we just clobber the
|
||||
# current set with the new set.
|
||||
#
|
||||
# We could also ensure that we keep entries where the state hasn't
|
||||
# changed, but are no longer in the requested required state, but
|
||||
# that's a sufficient edge case that we can ignore (as its only a
|
||||
# performance optimization).
|
||||
changes[event_type] = request_state_keys
|
||||
continue
|
||||
|
||||
old_state_key_wildcard = StateValues.WILDCARD in old_state_keys
|
||||
request_state_key_wildcard = StateValues.WILDCARD in request_state_keys
|
||||
|
||||
if old_state_key_wildcard != request_state_key_wildcard:
|
||||
# If a state_key wildcard has been added or removed, we always update the
|
||||
# effective room required state config to match the request.
|
||||
changes[event_type] = request_state_keys
|
||||
continue
|
||||
|
||||
if event_type == EventTypes.Member:
|
||||
old_state_key_lazy = StateValues.LAZY in old_state_keys
|
||||
request_state_key_lazy = StateValues.LAZY in request_state_keys
|
||||
|
||||
if old_state_key_lazy != request_state_key_lazy:
|
||||
# If a "$LAZY" has been added or removed we always update the effective room
|
||||
# required state config to match the request.
|
||||
changes[event_type] = request_state_keys
|
||||
continue
|
||||
|
||||
# Handle "$ME" values by adding "$ME" if the state key matches the user
|
||||
# ID.
|
||||
if user_id in changed_state_keys:
|
||||
changed_state_keys.add(StateValues.ME)
|
||||
|
||||
# At this point there are no wildcards and no additions to the set of
|
||||
# state keys requested, only deletions.
|
||||
#
|
||||
# We only remove state keys from the effective state if they've been
|
||||
# removed from the request *and* the state has changed. This ensures
|
||||
# that if a client removes and then re-adds a state key, we only send
|
||||
# down the associated current state event if its changed (rather than
|
||||
# sending down the same event twice).
|
||||
invalidated = (old_state_keys - request_state_keys) & changed_state_keys
|
||||
if invalidated:
|
||||
changes[event_type] = old_state_keys - invalidated
|
||||
|
||||
if changes:
|
||||
# Update the required state config based on the changes.
|
||||
new_required_state_map = dict(prev_required_state_map)
|
||||
for event_type, state_keys in changes.items():
|
||||
if state_keys:
|
||||
new_required_state_map[event_type] = state_keys
|
||||
else:
|
||||
# Remove entries with empty state keys.
|
||||
new_required_state_map.pop(event_type, None)
|
||||
|
||||
return new_required_state_map, added_state_filter
|
||||
else:
|
||||
return None, added_state_filter
|
||||
|
||||
@@ -14,7 +14,18 @@
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, AbstractSet, Dict, Mapping, Optional, Sequence, Set
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
AbstractSet,
|
||||
ChainMap,
|
||||
Dict,
|
||||
Mapping,
|
||||
MutableMapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Set,
|
||||
cast,
|
||||
)
|
||||
|
||||
from typing_extensions import assert_never
|
||||
|
||||
@@ -38,6 +49,7 @@ from synapse.types.handlers.sliding_sync import (
|
||||
SlidingSyncConfig,
|
||||
SlidingSyncResult,
|
||||
)
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
@@ -106,6 +118,8 @@ class SlidingSyncExtensionHandler:
|
||||
if sync_config.extensions.account_data is not None:
|
||||
account_data_response = await self.get_account_data_extension_response(
|
||||
sync_config=sync_config,
|
||||
previous_connection_state=previous_connection_state,
|
||||
new_connection_state=new_connection_state,
|
||||
actual_lists=actual_lists,
|
||||
actual_room_ids=actual_room_ids,
|
||||
account_data_request=sync_config.extensions.account_data,
|
||||
@@ -348,6 +362,8 @@ class SlidingSyncExtensionHandler:
|
||||
async def get_account_data_extension_response(
|
||||
self,
|
||||
sync_config: SlidingSyncConfig,
|
||||
previous_connection_state: "PerConnectionState",
|
||||
new_connection_state: "MutablePerConnectionState",
|
||||
actual_lists: Mapping[str, SlidingSyncResult.SlidingWindowList],
|
||||
actual_room_ids: Set[str],
|
||||
account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension,
|
||||
@@ -380,29 +396,39 @@ class SlidingSyncExtensionHandler:
|
||||
)
|
||||
)
|
||||
|
||||
# TODO: This should take into account the `from_token` and `to_token`
|
||||
have_push_rules_changed = await self.store.have_push_rules_changed_for_user(
|
||||
user_id, from_token.stream_token.push_rules_key
|
||||
)
|
||||
if have_push_rules_changed:
|
||||
global_account_data_map = dict(global_account_data_map)
|
||||
# TODO: This should take into account the `from_token` and `to_token`
|
||||
global_account_data_map[
|
||||
AccountDataTypes.PUSH_RULES
|
||||
] = await self.push_rules_handler.push_rules_for_user(sync_config.user)
|
||||
else:
|
||||
# TODO: This should take into account the `to_token`
|
||||
all_global_account_data = await self.store.get_global_account_data_for_user(
|
||||
user_id
|
||||
immutable_global_account_data_map = (
|
||||
await self.store.get_global_account_data_for_user(user_id)
|
||||
)
|
||||
|
||||
global_account_data_map = dict(all_global_account_data)
|
||||
# TODO: This should take into account the `to_token`
|
||||
global_account_data_map[
|
||||
AccountDataTypes.PUSH_RULES
|
||||
] = await self.push_rules_handler.push_rules_for_user(sync_config.user)
|
||||
# Use a `ChainMap` to avoid copying the immutable data from the cache
|
||||
global_account_data_map = ChainMap(
|
||||
{
|
||||
# TODO: This should take into account the `to_token`
|
||||
AccountDataTypes.PUSH_RULES: await self.push_rules_handler.push_rules_for_user(
|
||||
sync_config.user
|
||||
)
|
||||
},
|
||||
# Cast is safe because `ChainMap` only mutates the top-most map,
|
||||
# see https://github.com/python/typeshed/issues/8430
|
||||
cast(
|
||||
MutableMapping[str, JsonMapping], immutable_global_account_data_map
|
||||
),
|
||||
)
|
||||
|
||||
# Fetch room account data
|
||||
account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {}
|
||||
#
|
||||
account_data_by_room_map: MutableMapping[str, Mapping[str, JsonMapping]] = {}
|
||||
relevant_room_ids = self.find_relevant_room_ids_for_extension(
|
||||
requested_lists=account_data_request.lists,
|
||||
requested_room_ids=account_data_request.rooms,
|
||||
@@ -410,25 +436,154 @@ class SlidingSyncExtensionHandler:
|
||||
actual_room_ids=actual_room_ids,
|
||||
)
|
||||
if len(relevant_room_ids) > 0:
|
||||
# We need to handle the different cases depending on if we have sent
|
||||
# down account data previously or not, so we split the relevant
|
||||
# rooms up into different collections based on status.
|
||||
live_rooms = set()
|
||||
previously_rooms: Dict[str, int] = {}
|
||||
initial_rooms = set()
|
||||
|
||||
for room_id in relevant_room_ids:
|
||||
if not from_token:
|
||||
initial_rooms.add(room_id)
|
||||
continue
|
||||
|
||||
room_status = previous_connection_state.account_data.have_sent_room(
|
||||
room_id
|
||||
)
|
||||
if room_status.status == HaveSentRoomFlag.LIVE:
|
||||
live_rooms.add(room_id)
|
||||
elif room_status.status == HaveSentRoomFlag.PREVIOUSLY:
|
||||
assert room_status.last_token is not None
|
||||
previously_rooms[room_id] = room_status.last_token
|
||||
elif room_status.status == HaveSentRoomFlag.NEVER:
|
||||
initial_rooms.add(room_id)
|
||||
else:
|
||||
assert_never(room_status.status)
|
||||
|
||||
# We fetch all room account data since the from_token. This is so
|
||||
# that we can record which rooms have updates that haven't been sent
|
||||
# down.
|
||||
#
|
||||
# Mapping from room_id to mapping of `type` to `content` of room account
|
||||
# data events.
|
||||
all_updates_since_the_from_token: Mapping[
|
||||
str, Mapping[str, JsonMapping]
|
||||
] = {}
|
||||
if from_token is not None:
|
||||
# TODO: This should take into account the `from_token` and `to_token`
|
||||
account_data_by_room_map = (
|
||||
all_updates_since_the_from_token = (
|
||||
await self.store.get_updated_room_account_data_for_user(
|
||||
user_id, from_token.stream_token.account_data_key
|
||||
)
|
||||
)
|
||||
else:
|
||||
# TODO: This should take into account the `to_token`
|
||||
account_data_by_room_map = (
|
||||
await self.store.get_room_account_data_for_user(user_id)
|
||||
|
||||
# Add room tags
|
||||
#
|
||||
# TODO: This should take into account the `from_token` and `to_token`
|
||||
tags_by_room = await self.store.get_updated_tags(
|
||||
user_id, from_token.stream_token.account_data_key
|
||||
)
|
||||
for room_id, tags in tags_by_room.items():
|
||||
all_updates_since_the_from_token.setdefault(room_id, {})[
|
||||
AccountDataTypes.TAG
|
||||
] = {"tags": tags}
|
||||
|
||||
# For live rooms we just get the updates from `all_updates_since_the_from_token`
|
||||
if live_rooms:
|
||||
for room_id in all_updates_since_the_from_token.keys() & live_rooms:
|
||||
account_data_by_room_map[room_id] = (
|
||||
all_updates_since_the_from_token[room_id]
|
||||
)
|
||||
|
||||
# For previously and initial rooms we query each room individually.
|
||||
if previously_rooms or initial_rooms:
|
||||
|
||||
async def handle_previously(room_id: str) -> None:
|
||||
# Either get updates or all account data in the room
|
||||
# depending on if the room state is PREVIOUSLY or NEVER.
|
||||
previous_token = previously_rooms.get(room_id)
|
||||
if previous_token is not None:
|
||||
room_account_data = await (
|
||||
self.store.get_updated_room_account_data_for_user_for_room(
|
||||
user_id=user_id,
|
||||
room_id=room_id,
|
||||
from_stream_id=previous_token,
|
||||
to_stream_id=to_token.account_data_key,
|
||||
)
|
||||
)
|
||||
|
||||
# Add room tags
|
||||
changed = await self.store.has_tags_changed_for_room(
|
||||
user_id=user_id,
|
||||
room_id=room_id,
|
||||
from_stream_id=previous_token,
|
||||
to_stream_id=to_token.account_data_key,
|
||||
)
|
||||
if changed:
|
||||
# XXX: Ideally, this should take into account the `to_token`
|
||||
# and return the set of tags at that time but we don't track
|
||||
# changes to tags so we just have to return all tags for the
|
||||
# room.
|
||||
immutable_tag_map = await self.store.get_tags_for_room(
|
||||
user_id, room_id
|
||||
)
|
||||
room_account_data[AccountDataTypes.TAG] = {
|
||||
"tags": immutable_tag_map
|
||||
}
|
||||
|
||||
# Only add an entry if there were any updates.
|
||||
if room_account_data:
|
||||
account_data_by_room_map[room_id] = room_account_data
|
||||
else:
|
||||
# TODO: This should take into account the `to_token`
|
||||
immutable_room_account_data = (
|
||||
await self.store.get_account_data_for_room(user_id, room_id)
|
||||
)
|
||||
|
||||
# Add room tags
|
||||
#
|
||||
# XXX: Ideally, this should take into account the `to_token`
|
||||
# and return the set of tags at that time but we don't track
|
||||
# changes to tags so we just have to return all tags for the
|
||||
# room.
|
||||
immutable_tag_map = await self.store.get_tags_for_room(
|
||||
user_id, room_id
|
||||
)
|
||||
|
||||
account_data_by_room_map[room_id] = ChainMap(
|
||||
{AccountDataTypes.TAG: {"tags": immutable_tag_map}}
|
||||
if immutable_tag_map
|
||||
else {},
|
||||
# Cast is safe because `ChainMap` only mutates the top-most map,
|
||||
# see https://github.com/python/typeshed/issues/8430
|
||||
cast(
|
||||
MutableMapping[str, JsonMapping],
|
||||
immutable_room_account_data,
|
||||
),
|
||||
)
|
||||
|
||||
# We handle these rooms concurrently to speed it up.
|
||||
await concurrently_execute(
|
||||
handle_previously,
|
||||
previously_rooms.keys() | initial_rooms,
|
||||
limit=20,
|
||||
)
|
||||
|
||||
# Filter down to the relevant rooms
|
||||
account_data_by_room_map = {
|
||||
room_id: account_data_map
|
||||
for room_id, account_data_map in account_data_by_room_map.items()
|
||||
if room_id in relevant_room_ids
|
||||
}
|
||||
# Now record which rooms are now up to data, and which rooms have
|
||||
# pending updates to send.
|
||||
new_connection_state.account_data.record_sent_rooms(previously_rooms.keys())
|
||||
new_connection_state.account_data.record_sent_rooms(initial_rooms)
|
||||
missing_updates = (
|
||||
all_updates_since_the_from_token.keys() - relevant_room_ids
|
||||
)
|
||||
if missing_updates:
|
||||
# If we have missing updates then we must have had a from_token.
|
||||
assert from_token is not None
|
||||
|
||||
new_connection_state.account_data.record_unsent_rooms(
|
||||
missing_updates, from_token.stream_token.account_data_key
|
||||
)
|
||||
|
||||
return SlidingSyncResult.Extensions.AccountDataExtension(
|
||||
global_account_data_map=global_account_data_map,
|
||||
@@ -534,7 +689,10 @@ class SlidingSyncExtensionHandler:
|
||||
# For rooms we've previously sent down, but aren't up to date, we
|
||||
# need to use the from token from the room status.
|
||||
if previously_rooms:
|
||||
for room_id, receipt_token in previously_rooms.items():
|
||||
# Fetch any missing rooms concurrently.
|
||||
|
||||
async def handle_previously_room(room_id: str) -> None:
|
||||
receipt_token = previously_rooms[room_id]
|
||||
# TODO: Limit the number of receipts we're about to send down
|
||||
# for the room, if its too many we should TODO
|
||||
previously_receipts = (
|
||||
@@ -546,6 +704,10 @@ class SlidingSyncExtensionHandler:
|
||||
)
|
||||
fetched_receipts.extend(previously_receipts)
|
||||
|
||||
await concurrently_execute(
|
||||
handle_previously_room, previously_rooms.keys(), 20
|
||||
)
|
||||
|
||||
if initial_rooms:
|
||||
# We also always send down receipts for the current user.
|
||||
user_receipts = (
|
||||
@@ -602,9 +764,10 @@ class SlidingSyncExtensionHandler:
|
||||
|
||||
room_id_to_receipt_map[room_id] = {"type": type, "content": content}
|
||||
|
||||
# Now we update the per-connection state to track which receipts we have
|
||||
# and haven't sent down.
|
||||
new_connection_state.receipts.record_sent_rooms(relevant_room_ids)
|
||||
# Update the per-connection state to track which rooms we have sent
|
||||
# all the receipts for.
|
||||
new_connection_state.receipts.record_sent_rooms(previously_rooms.keys())
|
||||
new_connection_state.receipts.record_sent_rooms(initial_rooms)
|
||||
|
||||
if from_token:
|
||||
# Now find the set of rooms that may have receipts that we're not sending
|
||||
|
||||
@@ -27,6 +27,7 @@ from typing import (
|
||||
Set,
|
||||
Tuple,
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
|
||||
import attr
|
||||
@@ -39,6 +40,7 @@ from synapse.api.constants import (
|
||||
EventTypes,
|
||||
Membership,
|
||||
)
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
from synapse.events import StrippedStateEvent
|
||||
from synapse.events.utils import parse_stripped_state_event
|
||||
from synapse.logging.opentracing import start_active_span, trace
|
||||
@@ -78,6 +80,12 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Sentinel(enum.Enum):
|
||||
# defining a sentinel in this way allows mypy to correctly handle the
|
||||
# type of a dictionary lookup and subsequent type narrowing.
|
||||
UNSET_SENTINEL = object()
|
||||
|
||||
|
||||
# Helper definition for the types that we might return. We do this to avoid
|
||||
# copying data between types (which can be expensive for many rooms).
|
||||
RoomsForUserType = Union[RoomsForUserStateReset, RoomsForUser, RoomsForUserSlidingSync]
|
||||
@@ -115,11 +123,18 @@ class SlidingSyncInterestedRooms:
|
||||
newly_left_rooms: AbstractSet[str]
|
||||
dm_room_ids: AbstractSet[str]
|
||||
|
||||
|
||||
class Sentinel(enum.Enum):
|
||||
# defining a sentinel in this way allows mypy to correctly handle the
|
||||
# type of a dictionary lookup and subsequent type narrowing.
|
||||
UNSET_SENTINEL = object()
|
||||
@staticmethod
|
||||
def empty() -> "SlidingSyncInterestedRooms":
|
||||
return SlidingSyncInterestedRooms(
|
||||
lists={},
|
||||
relevant_room_map={},
|
||||
relevant_rooms_to_send_map={},
|
||||
all_rooms=set(),
|
||||
room_membership_for_user_map={},
|
||||
newly_joined_rooms=set(),
|
||||
newly_left_rooms=set(),
|
||||
dm_room_ids=set(),
|
||||
)
|
||||
|
||||
|
||||
def filter_membership_for_sync(
|
||||
@@ -179,6 +194,14 @@ class SlidingSyncRoomLists:
|
||||
from_token: Optional[StreamToken],
|
||||
) -> SlidingSyncInterestedRooms:
|
||||
"""Fetch the set of rooms that match the request"""
|
||||
has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
|
||||
has_room_subscriptions = (
|
||||
sync_config.room_subscriptions is not None
|
||||
and len(sync_config.room_subscriptions) > 0
|
||||
)
|
||||
|
||||
if not has_lists and not has_room_subscriptions:
|
||||
return SlidingSyncInterestedRooms.empty()
|
||||
|
||||
if await self.store.have_finished_sliding_sync_background_jobs():
|
||||
return await self._compute_interested_rooms_new_tables(
|
||||
@@ -218,43 +241,55 @@ class SlidingSyncRoomLists:
|
||||
# include rooms that are outside the list ranges.
|
||||
all_rooms: Set[str] = set()
|
||||
|
||||
# Note: this won't include rooms the user have left themselves. We add
|
||||
# back in rooms that the user left since `from_token` below.
|
||||
# Note: this won't include rooms the user has left themselves. We add back
|
||||
# `newly_left` rooms below. This is more efficient than fetching all rooms and
|
||||
# then filtering out the old left rooms.
|
||||
room_membership_for_user_map = await self.store.get_sliding_sync_rooms_for_user(
|
||||
user_id
|
||||
)
|
||||
|
||||
# Remove invites from ignored users
|
||||
ignored_users = await self.store.ignored_users(user_id)
|
||||
if ignored_users:
|
||||
# TODO: It would be nice to avoid these copies
|
||||
room_membership_for_user_map = dict(room_membership_for_user_map)
|
||||
# Make a copy so we don't run into an error: `dictionary changed size during
|
||||
# iteration`, when we remove items
|
||||
for room_id in list(room_membership_for_user_map.keys()):
|
||||
room_for_user_sliding_sync = room_membership_for_user_map[room_id]
|
||||
if (
|
||||
room_for_user_sliding_sync.membership == Membership.INVITE
|
||||
and room_for_user_sliding_sync.sender in ignored_users
|
||||
):
|
||||
room_membership_for_user_map.pop(room_id, None)
|
||||
|
||||
changes = await self._get_rewind_changes_to_current_membership_to_token(
|
||||
sync_config.user, room_membership_for_user_map, to_token=to_token
|
||||
)
|
||||
if changes:
|
||||
# TODO: It would be nice to avoid these copies
|
||||
room_membership_for_user_map = dict(room_membership_for_user_map)
|
||||
for room_id, change in changes.items():
|
||||
if change is None:
|
||||
# Remove rooms that the user joined after the `to_token`
|
||||
room_membership_for_user_map.pop(room_id)
|
||||
room_membership_for_user_map.pop(room_id, None)
|
||||
continue
|
||||
|
||||
current_room_entry = await self.store.get_sliding_sync_room_for_user(
|
||||
user_id, room_id
|
||||
)
|
||||
if current_room_entry is None:
|
||||
# We should always have an entry, even if we get state reset
|
||||
# out of the room.
|
||||
logger.error("Can't find room for user: %s / %s", user_id, room_id)
|
||||
continue
|
||||
|
||||
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
|
||||
room_id=room_id,
|
||||
sender=change.sender,
|
||||
membership=change.membership,
|
||||
event_id=change.event_id,
|
||||
event_pos=change.event_pos,
|
||||
room_version_id=change.room_version_id,
|
||||
# We keep the current state of the room though
|
||||
room_type=current_room_entry.room_type,
|
||||
is_encrypted=current_room_entry.is_encrypted,
|
||||
)
|
||||
existing_room = room_membership_for_user_map.get(room_id)
|
||||
if existing_room is not None:
|
||||
# Update room membership events to the point in time of the `to_token`
|
||||
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
|
||||
room_id=room_id,
|
||||
sender=change.sender,
|
||||
membership=change.membership,
|
||||
event_id=change.event_id,
|
||||
event_pos=change.event_pos,
|
||||
room_version_id=change.room_version_id,
|
||||
# We keep the state of the room though
|
||||
has_known_state=existing_room.has_known_state,
|
||||
room_type=existing_room.room_type,
|
||||
is_encrypted=existing_room.is_encrypted,
|
||||
)
|
||||
|
||||
(
|
||||
newly_joined_room_ids,
|
||||
@@ -264,45 +299,88 @@ class SlidingSyncRoomLists:
|
||||
)
|
||||
dm_room_ids = await self._get_dm_rooms_for_user(user_id)
|
||||
|
||||
# Handle leaves and state resets in the from -> to token range.
|
||||
left_rooms = newly_left_room_map.keys() - room_membership_for_user_map.keys()
|
||||
if left_rooms:
|
||||
# Add back `newly_left` rooms (rooms left in the from -> to token range).
|
||||
#
|
||||
# We do this because `get_sliding_sync_rooms_for_user(...)` doesn't include
|
||||
# rooms that the user left themselves as it's more efficient to add them back
|
||||
# here than to fetch all rooms and then filter out the old left rooms. The user
|
||||
# only leaves a room once in a blue moon so this barely needs to run.
|
||||
#
|
||||
missing_newly_left_rooms = (
|
||||
newly_left_room_map.keys() - room_membership_for_user_map.keys()
|
||||
)
|
||||
if missing_newly_left_rooms:
|
||||
# TODO: It would be nice to avoid these copies
|
||||
room_membership_for_user_map = dict(room_membership_for_user_map)
|
||||
for room_id in left_rooms:
|
||||
room_for_user = newly_left_room_map[room_id]
|
||||
for room_id in missing_newly_left_rooms:
|
||||
newly_left_room_for_user = newly_left_room_map[room_id]
|
||||
# This should be a given
|
||||
assert newly_left_room_for_user.membership == Membership.LEAVE
|
||||
|
||||
# We fetch the current room entry for the user, as that's the
|
||||
# easiest way of getting the room type etc.
|
||||
current_room_entry = await self.store.get_sliding_sync_room_for_user(
|
||||
user_id, room_id
|
||||
# Add back `newly_left` rooms
|
||||
#
|
||||
# Check for membership and state in the Sliding Sync tables as it's just
|
||||
# another membership
|
||||
newly_left_room_for_user_sliding_sync = (
|
||||
await self.store.get_sliding_sync_room_for_user(user_id, room_id)
|
||||
)
|
||||
if current_room_entry is None:
|
||||
# We should always have an entry, even if we get state reset
|
||||
# out of the room.
|
||||
logger.error("Can't find room for user: %s / %s", user_id, room_id)
|
||||
continue
|
||||
# If the membership exists, it's just a normal user left the room on
|
||||
# their own
|
||||
if newly_left_room_for_user_sliding_sync is not None:
|
||||
room_membership_for_user_map[room_id] = (
|
||||
newly_left_room_for_user_sliding_sync
|
||||
)
|
||||
|
||||
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
|
||||
room_id=room_id,
|
||||
sender=room_for_user.sender,
|
||||
membership=room_for_user.membership,
|
||||
event_id=room_for_user.event_id,
|
||||
event_pos=room_for_user.event_pos,
|
||||
room_version_id=room_for_user.room_version_id,
|
||||
room_type=current_room_entry.room_type,
|
||||
is_encrypted=current_room_entry.is_encrypted,
|
||||
)
|
||||
change = changes.get(room_id)
|
||||
if change is not None:
|
||||
# Update room membership events to the point in time of the `to_token`
|
||||
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
|
||||
room_id=room_id,
|
||||
sender=change.sender,
|
||||
membership=change.membership,
|
||||
event_id=change.event_id,
|
||||
event_pos=change.event_pos,
|
||||
room_version_id=change.room_version_id,
|
||||
# We keep the state of the room though
|
||||
has_known_state=newly_left_room_for_user_sliding_sync.has_known_state,
|
||||
room_type=newly_left_room_for_user_sliding_sync.room_type,
|
||||
is_encrypted=newly_left_room_for_user_sliding_sync.is_encrypted,
|
||||
)
|
||||
|
||||
# If we are `newly_left` from the room but can't find any membership,
|
||||
# then we have been "state reset" out of the room
|
||||
else:
|
||||
# Get the state at the time. We can't read from the Sliding Sync
|
||||
# tables because the user has no membership in the room according to
|
||||
# the state (thanks to the state reset).
|
||||
#
|
||||
# Note: `room_type` never changes, so we can just get current room
|
||||
# type
|
||||
room_type = await self.store.get_room_type(room_id)
|
||||
has_known_state = room_type is not ROOM_UNKNOWN_SENTINEL
|
||||
if isinstance(room_type, StateSentinel):
|
||||
room_type = None
|
||||
|
||||
# Get the encryption status at the time of the token
|
||||
is_encrypted = await self.get_is_encrypted_for_room_at_token(
|
||||
room_id,
|
||||
newly_left_room_for_user.event_pos.to_room_stream_token(),
|
||||
)
|
||||
|
||||
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
|
||||
room_id=room_id,
|
||||
sender=newly_left_room_for_user.sender,
|
||||
membership=newly_left_room_for_user.membership,
|
||||
event_id=newly_left_room_for_user.event_id,
|
||||
event_pos=newly_left_room_for_user.event_pos,
|
||||
room_version_id=newly_left_room_for_user.room_version_id,
|
||||
has_known_state=has_known_state,
|
||||
room_type=room_type,
|
||||
is_encrypted=is_encrypted,
|
||||
)
|
||||
|
||||
if sync_config.lists:
|
||||
sync_room_map = {
|
||||
room_id: room_membership_for_user
|
||||
for room_id, room_membership_for_user in room_membership_for_user_map.items()
|
||||
if filter_membership_for_sync(
|
||||
user_id=user_id,
|
||||
room_membership_for_user=room_membership_for_user,
|
||||
newly_left=room_id in newly_left_room_map,
|
||||
)
|
||||
}
|
||||
sync_room_map = room_membership_for_user_map
|
||||
with start_active_span("assemble_sliding_window_lists"):
|
||||
for list_key, list_config in sync_config.lists.items():
|
||||
# Apply filters
|
||||
@@ -311,6 +389,7 @@ class SlidingSyncRoomLists:
|
||||
filtered_sync_room_map = await self.filter_rooms_using_tables(
|
||||
user_id,
|
||||
sync_room_map,
|
||||
previous_connection_state,
|
||||
list_config.filters,
|
||||
to_token,
|
||||
dm_room_ids,
|
||||
@@ -338,13 +417,34 @@ class SlidingSyncRoomLists:
|
||||
ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
|
||||
|
||||
if list_config.ranges:
|
||||
if list_config.ranges == [(0, len(filtered_sync_room_map) - 1)]:
|
||||
# If we are asking for the full range, we don't need to sort the list.
|
||||
sorted_room_info = list(filtered_sync_room_map.values())
|
||||
# Optimization: If we are asking for the full range, we don't
|
||||
# need to sort the list.
|
||||
if (
|
||||
# We're looking for a single range that covers the entire list
|
||||
len(list_config.ranges) == 1
|
||||
# Range starts at 0
|
||||
and list_config.ranges[0][0] == 0
|
||||
# And the range extends to the end of the list or more. Each
|
||||
# side is inclusive.
|
||||
and list_config.ranges[0][1]
|
||||
>= len(filtered_sync_room_map) - 1
|
||||
):
|
||||
sorted_room_info: List[RoomsForUserType] = list(
|
||||
filtered_sync_room_map.values()
|
||||
)
|
||||
else:
|
||||
# Sort the list
|
||||
sorted_room_info = await self.sort_rooms_using_tables(
|
||||
filtered_sync_room_map, to_token
|
||||
sorted_room_info = await self.sort_rooms(
|
||||
# Cast is safe because RoomsForUserSlidingSync is part
|
||||
# of the `RoomsForUserType` union. Why can't it detect this?
|
||||
cast(
|
||||
Dict[str, RoomsForUserType], filtered_sync_room_map
|
||||
),
|
||||
to_token,
|
||||
# We only need to sort the rooms up to the end
|
||||
# of the largest range. Both sides of range are
|
||||
# inclusive so we `+ 1`.
|
||||
limit=max(range[1] + 1 for range in list_config.ranges),
|
||||
)
|
||||
|
||||
for range in list_config.ranges:
|
||||
@@ -387,34 +487,40 @@ class SlidingSyncRoomLists:
|
||||
)
|
||||
|
||||
lists[list_key] = SlidingSyncResult.SlidingWindowList(
|
||||
count=len(sorted_room_info),
|
||||
count=len(filtered_sync_room_map),
|
||||
ops=ops,
|
||||
)
|
||||
|
||||
if sync_config.room_subscriptions:
|
||||
with start_active_span("assemble_room_subscriptions"):
|
||||
# TODO: It would be nice to avoid these copies
|
||||
room_membership_for_user_map = dict(room_membership_for_user_map)
|
||||
|
||||
# Find which rooms are partially stated and may need to be filtered out
|
||||
# depending on the `required_state` requested (see below).
|
||||
partial_state_rooms = await self.store.get_partial_rooms()
|
||||
|
||||
# Fetch any rooms that we have not already fetched from the database.
|
||||
subscription_sliding_sync_rooms = (
|
||||
await self.store.get_sliding_sync_room_for_user_batch(
|
||||
user_id,
|
||||
sync_config.room_subscriptions.keys()
|
||||
- room_membership_for_user_map.keys(),
|
||||
)
|
||||
)
|
||||
room_membership_for_user_map.update(subscription_sliding_sync_rooms)
|
||||
|
||||
for (
|
||||
room_id,
|
||||
room_subscription,
|
||||
) in sync_config.room_subscriptions.items():
|
||||
if room_id not in room_membership_for_user_map:
|
||||
# Check if we do have an entry for the room, but didn't
|
||||
# pull it out above. This could be e.g. a leave that we
|
||||
# don't pull out by default.
|
||||
current_room_entry = (
|
||||
await self.store.get_sliding_sync_room_for_user(
|
||||
user_id, room_id
|
||||
)
|
||||
)
|
||||
if not current_room_entry:
|
||||
# TODO: Handle rooms the user isn't in.
|
||||
continue
|
||||
|
||||
room_membership_for_user_map[room_id] = current_room_entry
|
||||
# Check if we have a membership for the room, but didn't pull it out
|
||||
# above. This could be e.g. a leave that we don't pull out by
|
||||
# default.
|
||||
current_room_entry = room_membership_for_user_map.get(room_id)
|
||||
if not current_room_entry:
|
||||
# TODO: Handle rooms the user isn't in.
|
||||
continue
|
||||
|
||||
all_rooms.add(room_id)
|
||||
|
||||
@@ -429,8 +535,6 @@ class SlidingSyncRoomLists:
|
||||
if room_id in partial_state_rooms:
|
||||
continue
|
||||
|
||||
all_rooms.add(room_id)
|
||||
|
||||
# Update our `relevant_room_map` with the room we're going to display
|
||||
# and need to fetch more info about.
|
||||
existing_room_sync_config = relevant_room_map.get(room_id)
|
||||
@@ -445,7 +549,7 @@ class SlidingSyncRoomLists:
|
||||
|
||||
# Filtered subset of `relevant_room_map` for rooms that may have updates
|
||||
# (in the event stream)
|
||||
relevant_rooms_to_send_map = await self._filter_relevant_room_to_send(
|
||||
relevant_rooms_to_send_map = await self._filter_relevant_rooms_to_send(
|
||||
previous_connection_state, from_token, relevant_room_map
|
||||
)
|
||||
|
||||
@@ -502,6 +606,7 @@ class SlidingSyncRoomLists:
|
||||
filtered_sync_room_map = await self.filter_rooms(
|
||||
sync_config.user,
|
||||
sync_room_map,
|
||||
previous_connection_state,
|
||||
list_config.filters,
|
||||
to_token,
|
||||
dm_room_ids,
|
||||
@@ -632,7 +737,7 @@ class SlidingSyncRoomLists:
|
||||
|
||||
# Filtered subset of `relevant_room_map` for rooms that may have updates
|
||||
# (in the event stream)
|
||||
relevant_rooms_to_send_map = await self._filter_relevant_room_to_send(
|
||||
relevant_rooms_to_send_map = await self._filter_relevant_rooms_to_send(
|
||||
previous_connection_state, from_token, relevant_room_map
|
||||
)
|
||||
|
||||
@@ -647,7 +752,7 @@ class SlidingSyncRoomLists:
|
||||
dm_room_ids=dm_room_ids,
|
||||
)
|
||||
|
||||
async def _filter_relevant_room_to_send(
|
||||
async def _filter_relevant_rooms_to_send(
|
||||
self,
|
||||
previous_connection_state: PerConnectionState,
|
||||
from_token: Optional[StreamToken],
|
||||
@@ -911,8 +1016,38 @@ class SlidingSyncRoomLists:
|
||||
excluded_rooms=self.rooms_to_exclude_globally,
|
||||
)
|
||||
|
||||
# If the user has never joined any rooms before, we can just return an empty list
|
||||
if not room_for_user_list:
|
||||
# We filter out unknown room versions before we try and load any
|
||||
# metadata about the room. They shouldn't go down sync anyway, and their
|
||||
# metadata may be in a broken state.
|
||||
room_for_user_list = [
|
||||
room_for_user
|
||||
for room_for_user in room_for_user_list
|
||||
if room_for_user.room_version_id in KNOWN_ROOM_VERSIONS
|
||||
]
|
||||
|
||||
# Remove invites from ignored users
|
||||
ignored_users = await self.store.ignored_users(user_id)
|
||||
if ignored_users:
|
||||
room_for_user_list = [
|
||||
room_for_user
|
||||
for room_for_user in room_for_user_list
|
||||
if not (
|
||||
room_for_user.membership == Membership.INVITE
|
||||
and room_for_user.sender in ignored_users
|
||||
)
|
||||
]
|
||||
|
||||
(
|
||||
newly_joined_room_ids,
|
||||
newly_left_room_map,
|
||||
) = await self._get_newly_joined_and_left_rooms(
|
||||
user_id, to_token=to_token, from_token=from_token
|
||||
)
|
||||
|
||||
# If the user has never joined any rooms before, we can just return an empty
|
||||
# list. We also have to check the `newly_left_room_map` in case someone was
|
||||
# state reset out of all of the rooms they were in.
|
||||
if not room_for_user_list and not newly_left_room_map:
|
||||
return {}, set(), set()
|
||||
|
||||
# Since we fetched the users room list at some point in time after the
|
||||
@@ -930,23 +1065,22 @@ class SlidingSyncRoomLists:
|
||||
else:
|
||||
rooms_for_user[room_id] = change_room_for_user
|
||||
|
||||
(
|
||||
newly_joined_room_ids,
|
||||
newly_left_room_ids,
|
||||
) = await self._get_newly_joined_and_left_rooms(
|
||||
user_id, to_token=to_token, from_token=from_token
|
||||
)
|
||||
|
||||
# Ensure we have entries for rooms that the user has been "state reset"
|
||||
# out of. These are rooms appear in the `newly_left_rooms` map but
|
||||
# aren't in the `rooms_for_user` map.
|
||||
for room_id, room_membership in newly_left_room_ids.items():
|
||||
for room_id, newly_left_room_for_user in newly_left_room_map.items():
|
||||
# If we already know about the room, it's not a state reset
|
||||
if room_id in rooms_for_user:
|
||||
continue
|
||||
|
||||
rooms_for_user[room_id] = room_membership
|
||||
# This should be true if it's a state reset
|
||||
assert newly_left_room_for_user.membership is Membership.LEAVE
|
||||
assert newly_left_room_for_user.event_id is None
|
||||
assert newly_left_room_for_user.sender is None
|
||||
|
||||
return rooms_for_user, newly_joined_room_ids, set(newly_left_room_ids)
|
||||
rooms_for_user[room_id] = newly_left_room_for_user
|
||||
|
||||
return rooms_for_user, newly_joined_room_ids, set(newly_left_room_map)
|
||||
|
||||
@trace
|
||||
async def _get_newly_joined_and_left_rooms(
|
||||
@@ -963,8 +1097,15 @@ class SlidingSyncRoomLists:
|
||||
"current memberships" of the user.
|
||||
|
||||
Returns:
|
||||
A 2-tuple of newly joined room IDs and a map of newly left room
|
||||
A 2-tuple of newly joined room IDs and a map of newly_left room
|
||||
IDs to the `RoomsForUserStateReset` entry.
|
||||
|
||||
We're using `RoomsForUserStateReset` but that doesn't necessarily mean the
|
||||
user was state reset of the rooms. It's just that the `event_id`/`sender`
|
||||
are optional and we can't tell the difference between the server leaving the
|
||||
room when the user was the last person participating in the room and left or
|
||||
was state reset out of the room. To actually check for a state reset, you
|
||||
need to check if a membership still exists in the room.
|
||||
"""
|
||||
newly_joined_room_ids: Set[str] = set()
|
||||
newly_left_room_map: Dict[str, RoomsForUserStateReset] = {}
|
||||
@@ -1034,13 +1175,9 @@ class SlidingSyncRoomLists:
|
||||
# 2)
|
||||
if last_membership_change_in_from_to_range.membership == Membership.JOIN:
|
||||
possibly_newly_joined_room_ids.add(room_id)
|
||||
break
|
||||
|
||||
# 1) Figure out newly_left rooms (> `from_token` and <= `to_token`).
|
||||
if (
|
||||
last_membership_change_in_from_to_range.prev_membership
|
||||
== Membership.JOIN
|
||||
):
|
||||
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
|
||||
# 1) Mark this room as `newly_left`
|
||||
newly_left_room_map[room_id] = RoomsForUserStateReset(
|
||||
room_id=room_id,
|
||||
@@ -1492,6 +1629,7 @@ class SlidingSyncRoomLists:
|
||||
self,
|
||||
user: UserID,
|
||||
sync_room_map: Dict[str, RoomsForUserType],
|
||||
previous_connection_state: PerConnectionState,
|
||||
filters: SlidingSyncConfig.SlidingSyncList.Filters,
|
||||
to_token: StreamToken,
|
||||
dm_room_ids: AbstractSet[str],
|
||||
@@ -1511,6 +1649,8 @@ class SlidingSyncRoomLists:
|
||||
A filtered dictionary of room IDs along with membership information in the
|
||||
room at the time of `to_token`.
|
||||
"""
|
||||
user_id = user.to_string()
|
||||
|
||||
room_id_to_stripped_state_map: Dict[
|
||||
str, Optional[StateMap[StrippedStateEvent]]
|
||||
] = {}
|
||||
@@ -1620,12 +1760,14 @@ class SlidingSyncRoomLists:
|
||||
and room_type not in filters.room_types
|
||||
):
|
||||
filtered_room_id_set.remove(room_id)
|
||||
continue
|
||||
|
||||
if (
|
||||
filters.not_room_types is not None
|
||||
and room_type in filters.not_room_types
|
||||
):
|
||||
filtered_room_id_set.remove(room_id)
|
||||
continue
|
||||
|
||||
if filters.room_name_like is not None:
|
||||
with start_active_span("filters.room_name_like"):
|
||||
@@ -1642,18 +1784,64 @@ class SlidingSyncRoomLists:
|
||||
# )
|
||||
raise NotImplementedError()
|
||||
|
||||
# Filter by room tags according to the users account data
|
||||
if filters.tags is not None or filters.not_tags is not None:
|
||||
with start_active_span("filters.tags"):
|
||||
raise NotImplementedError()
|
||||
# Fetch the user tags for their rooms
|
||||
room_tags = await self.store.get_tags_for_user(user_id)
|
||||
room_id_to_tag_name_set: Dict[str, Set[str]] = {
|
||||
room_id: set(tags.keys()) for room_id, tags in room_tags.items()
|
||||
}
|
||||
|
||||
if filters.tags is not None:
|
||||
tags_set = set(filters.tags)
|
||||
filtered_room_id_set = {
|
||||
room_id
|
||||
for room_id in filtered_room_id_set
|
||||
# Remove rooms that don't have one of the tags in the filter
|
||||
if room_id_to_tag_name_set.get(room_id, set()).intersection(
|
||||
tags_set
|
||||
)
|
||||
}
|
||||
|
||||
if filters.not_tags is not None:
|
||||
not_tags_set = set(filters.not_tags)
|
||||
filtered_room_id_set = {
|
||||
room_id
|
||||
for room_id in filtered_room_id_set
|
||||
# Remove rooms if they have any of the tags in the filter
|
||||
if not room_id_to_tag_name_set.get(room_id, set()).intersection(
|
||||
not_tags_set
|
||||
)
|
||||
}
|
||||
|
||||
# Keep rooms if the user has been state reset out of it but we previously sent
|
||||
# down the connection before. We want to make sure that we send these down to
|
||||
# the client regardless of filters so they find out about the state reset.
|
||||
#
|
||||
# We don't always have access to the state in a room after being state reset if
|
||||
# no one else locally on the server is participating in the room so we patch
|
||||
# these back in manually.
|
||||
state_reset_out_of_room_id_set = {
|
||||
room_id
|
||||
for room_id in sync_room_map.keys()
|
||||
if sync_room_map[room_id].event_id is None
|
||||
and previous_connection_state.rooms.have_sent_room(room_id).status
|
||||
!= HaveSentRoomFlag.NEVER
|
||||
}
|
||||
|
||||
# Assemble a new sync room map but only with the `filtered_room_id_set`
|
||||
return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
|
||||
return {
|
||||
room_id: sync_room_map[room_id]
|
||||
for room_id in filtered_room_id_set | state_reset_out_of_room_id_set
|
||||
}
|
||||
|
||||
@trace
|
||||
async def filter_rooms_using_tables(
|
||||
self,
|
||||
user_id: str,
|
||||
sync_room_map: Mapping[str, RoomsForUserSlidingSync],
|
||||
previous_connection_state: PerConnectionState,
|
||||
filters: SlidingSyncConfig.SlidingSyncList.Filters,
|
||||
to_token: StreamToken,
|
||||
dm_room_ids: AbstractSet[str],
|
||||
@@ -1668,6 +1856,7 @@ class SlidingSyncRoomLists:
|
||||
filters: Filters to apply
|
||||
to_token: We filter based on the state of the room at this token
|
||||
dm_room_ids: Set of room IDs which are DMs
|
||||
room_tags: Mapping of room ID to tags
|
||||
|
||||
Returns:
|
||||
A filtered dictionary of room IDs along with membership information in the
|
||||
@@ -1695,7 +1884,10 @@ class SlidingSyncRoomLists:
|
||||
filtered_room_id_set = {
|
||||
room_id
|
||||
for room_id in filtered_room_id_set
|
||||
if sync_room_map[room_id].is_encrypted == filters.is_encrypted
|
||||
# Remove rooms if we can't figure out what the encryption status is
|
||||
if sync_room_map[room_id].has_known_state
|
||||
# Or remove if it doesn't match the filter
|
||||
and sync_room_map[room_id].is_encrypted == filters.is_encrypted
|
||||
}
|
||||
|
||||
# Filter for rooms that the user has been invited to
|
||||
@@ -1724,6 +1916,11 @@ class SlidingSyncRoomLists:
|
||||
# Make a copy so we don't run into an error: `Set changed size during
|
||||
# iteration`, when we filter out and remove items
|
||||
for room_id in filtered_room_id_set.copy():
|
||||
# Remove rooms if we can't figure out what room type it is
|
||||
if not sync_room_map[room_id].has_known_state:
|
||||
filtered_room_id_set.remove(room_id)
|
||||
continue
|
||||
|
||||
room_type = sync_room_map[room_id].room_type
|
||||
|
||||
if (
|
||||
@@ -1731,12 +1928,14 @@ class SlidingSyncRoomLists:
|
||||
and room_type not in filters.room_types
|
||||
):
|
||||
filtered_room_id_set.remove(room_id)
|
||||
continue
|
||||
|
||||
if (
|
||||
filters.not_room_types is not None
|
||||
and room_type in filters.not_room_types
|
||||
):
|
||||
filtered_room_id_set.remove(room_id)
|
||||
continue
|
||||
|
||||
if filters.room_name_like is not None:
|
||||
with start_active_span("filters.room_name_like"):
|
||||
@@ -1753,84 +1952,78 @@ class SlidingSyncRoomLists:
|
||||
# )
|
||||
raise NotImplementedError()
|
||||
|
||||
# Filter by room tags according to the users account data
|
||||
if filters.tags is not None or filters.not_tags is not None:
|
||||
with start_active_span("filters.tags"):
|
||||
raise NotImplementedError()
|
||||
# Fetch the user tags for their rooms
|
||||
room_tags = await self.store.get_tags_for_user(user_id)
|
||||
room_id_to_tag_name_set: Dict[str, Set[str]] = {
|
||||
room_id: set(tags.keys()) for room_id, tags in room_tags.items()
|
||||
}
|
||||
|
||||
if filters.tags is not None:
|
||||
tags_set = set(filters.tags)
|
||||
filtered_room_id_set = {
|
||||
room_id
|
||||
for room_id in filtered_room_id_set
|
||||
# Remove rooms that don't have one of the tags in the filter
|
||||
if room_id_to_tag_name_set.get(room_id, set()).intersection(
|
||||
tags_set
|
||||
)
|
||||
}
|
||||
|
||||
if filters.not_tags is not None:
|
||||
not_tags_set = set(filters.not_tags)
|
||||
filtered_room_id_set = {
|
||||
room_id
|
||||
for room_id in filtered_room_id_set
|
||||
# Remove rooms if they have any of the tags in the filter
|
||||
if not room_id_to_tag_name_set.get(room_id, set()).intersection(
|
||||
not_tags_set
|
||||
)
|
||||
}
|
||||
|
||||
# Keep rooms if the user has been state reset out of it but we previously sent
|
||||
# down the connection before. We want to make sure that we send these down to
|
||||
# the client regardless of filters so they find out about the state reset.
|
||||
#
|
||||
# We don't always have access to the state in a room after being state reset if
|
||||
# no one else locally on the server is participating in the room so we patch
|
||||
# these back in manually.
|
||||
state_reset_out_of_room_id_set = {
|
||||
room_id
|
||||
for room_id in sync_room_map.keys()
|
||||
if sync_room_map[room_id].event_id is None
|
||||
and previous_connection_state.rooms.have_sent_room(room_id).status
|
||||
!= HaveSentRoomFlag.NEVER
|
||||
}
|
||||
|
||||
# Assemble a new sync room map but only with the `filtered_room_id_set`
|
||||
return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
|
||||
|
||||
@trace
|
||||
async def sort_rooms_using_tables(
|
||||
self,
|
||||
sync_room_map: Mapping[str, RoomsForUserSlidingSync],
|
||||
to_token: StreamToken,
|
||||
) -> List[RoomsForUserSlidingSync]:
|
||||
"""
|
||||
Sort by `stream_ordering` of the last event that the user should see in the
|
||||
room. `stream_ordering` is unique so we get a stable sort.
|
||||
|
||||
Args:
|
||||
sync_room_map: Dictionary of room IDs to sort along with membership
|
||||
information in the room at the time of `to_token`.
|
||||
to_token: We sort based on the events in the room at this token (<= `to_token`)
|
||||
|
||||
Returns:
|
||||
A sorted list of room IDs by `stream_ordering` along with membership information.
|
||||
"""
|
||||
|
||||
# Assemble a map of room ID to the `stream_ordering` of the last activity that the
|
||||
# user should see in the room (<= `to_token`)
|
||||
last_activity_in_room_map: Dict[str, int] = {}
|
||||
|
||||
for room_id, room_for_user in sync_room_map.items():
|
||||
if room_for_user.membership != Membership.JOIN:
|
||||
# If the user has left/been invited/knocked/been banned from a
|
||||
# room, they shouldn't see anything past that point.
|
||||
#
|
||||
# FIXME: It's possible that people should see beyond this point
|
||||
# in invited/knocked cases if for example the room has
|
||||
# `invite`/`world_readable` history visibility, see
|
||||
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
|
||||
last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
|
||||
|
||||
# For fully-joined rooms, we find the latest activity at/before the
|
||||
# `to_token`.
|
||||
joined_room_positions = (
|
||||
await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering(
|
||||
[
|
||||
room_id
|
||||
for room_id, room_for_user in sync_room_map.items()
|
||||
if room_for_user.membership == Membership.JOIN
|
||||
],
|
||||
to_token.room_key,
|
||||
)
|
||||
)
|
||||
|
||||
last_activity_in_room_map.update(joined_room_positions)
|
||||
|
||||
return sorted(
|
||||
sync_room_map.values(),
|
||||
# Sort by the last activity (stream_ordering) in the room
|
||||
key=lambda room_info: last_activity_in_room_map[room_info.room_id],
|
||||
# We want descending order
|
||||
reverse=True,
|
||||
)
|
||||
return {
|
||||
room_id: sync_room_map[room_id]
|
||||
for room_id in filtered_room_id_set | state_reset_out_of_room_id_set
|
||||
}
|
||||
|
||||
@trace
|
||||
async def sort_rooms(
|
||||
self,
|
||||
sync_room_map: Dict[str, RoomsForUserType],
|
||||
to_token: StreamToken,
|
||||
limit: Optional[int] = None,
|
||||
) -> List[RoomsForUserType]:
|
||||
"""
|
||||
Sort by `stream_ordering` of the last event that the user should see in the
|
||||
room. `stream_ordering` is unique so we get a stable sort.
|
||||
|
||||
If `limit` is specified then sort may return fewer entries, but will
|
||||
always return at least the top N rooms. This is useful as we don't always
|
||||
need to sort the full list, but are just interested in the top N.
|
||||
|
||||
Args:
|
||||
sync_room_map: Dictionary of room IDs to sort along with membership
|
||||
information in the room at the time of `to_token`.
|
||||
to_token: We sort based on the events in the room at this token (<= `to_token`)
|
||||
limit: The number of rooms that we need to return from the top of the list.
|
||||
|
||||
Returns:
|
||||
A sorted list of room IDs by `stream_ordering` along with membership information.
|
||||
@@ -1840,8 +2033,23 @@ class SlidingSyncRoomLists:
|
||||
# user should see in the room (<= `to_token`)
|
||||
last_activity_in_room_map: Dict[str, int] = {}
|
||||
|
||||
# Same as above, except for positions that we know are in the event
|
||||
# stream cache.
|
||||
cached_positions: Dict[str, int] = {}
|
||||
|
||||
earliest_cache_position = (
|
||||
self.store._events_stream_cache.get_earliest_known_position()
|
||||
)
|
||||
|
||||
for room_id, room_for_user in sync_room_map.items():
|
||||
if room_for_user.membership != Membership.JOIN:
|
||||
if room_for_user.membership == Membership.JOIN:
|
||||
# For joined rooms check the stream change cache.
|
||||
cached_position = (
|
||||
self.store._events_stream_cache.get_max_pos_of_last_change(room_id)
|
||||
)
|
||||
if cached_position is not None:
|
||||
cached_positions[room_id] = cached_position
|
||||
else:
|
||||
# If the user has left/been invited/knocked/been banned from a
|
||||
# room, they shouldn't see anything past that point.
|
||||
#
|
||||
@@ -1851,6 +2059,48 @@ class SlidingSyncRoomLists:
|
||||
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
|
||||
last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
|
||||
|
||||
# If the stream position is in range of the stream change cache
|
||||
# we can include it.
|
||||
if room_for_user.event_pos.stream > earliest_cache_position:
|
||||
cached_positions[room_id] = room_for_user.event_pos.stream
|
||||
|
||||
# If we are only asked for the top N rooms, and we have enough from
|
||||
# looking in the stream change cache, then we can return early. This
|
||||
# is because the cache must include all entries above
|
||||
# `.get_earliest_known_position()`.
|
||||
if limit is not None and len(cached_positions) >= limit:
|
||||
# ... but first we need to handle the case where the cached max
|
||||
# position is greater than the to_token, in which case we do
|
||||
# actually query the DB. This should happen rarely, so can do it in
|
||||
# a loop.
|
||||
for room_id, position in list(cached_positions.items()):
|
||||
if position > to_token.room_key.stream:
|
||||
result = await self.store.get_last_event_pos_in_room_before_stream_ordering(
|
||||
room_id, to_token.room_key
|
||||
)
|
||||
if (
|
||||
result is not None
|
||||
and result[1].stream > earliest_cache_position
|
||||
):
|
||||
# We have a stream position in the cached range.
|
||||
cached_positions[room_id] = result[1].stream
|
||||
else:
|
||||
# No position in the range, so we remove the entry.
|
||||
cached_positions.pop(room_id)
|
||||
|
||||
if limit is not None and len(cached_positions) >= limit:
|
||||
return sorted(
|
||||
(
|
||||
room
|
||||
for room in sync_room_map.values()
|
||||
if room.room_id in cached_positions
|
||||
),
|
||||
# Sort by the last activity (stream_ordering) in the room
|
||||
key=lambda room_info: cached_positions[room_info.room_id],
|
||||
# We want descending order
|
||||
reverse=True,
|
||||
)
|
||||
|
||||
# For fully-joined rooms, we find the latest activity at/before the
|
||||
# `to_token`.
|
||||
joined_room_positions = (
|
||||
|
||||
@@ -1039,7 +1039,7 @@ class _MultipartParserProtocol(protocol.Protocol):
|
||||
self.deferred = deferred
|
||||
self.boundary = boundary
|
||||
self.max_length = max_length
|
||||
self.parser = None
|
||||
self.parser: Optional[multipart.MultipartParser] = None
|
||||
self.multipart_response = MultipartResponse()
|
||||
self.has_redirect = False
|
||||
self.in_json = False
|
||||
@@ -1097,7 +1097,7 @@ class _MultipartParserProtocol(protocol.Protocol):
|
||||
self.deferred.errback()
|
||||
self.file_length += end - start
|
||||
|
||||
callbacks = {
|
||||
callbacks: "multipart.multipart.MultipartCallbacks" = {
|
||||
"on_header_field": on_header_field,
|
||||
"on_header_value": on_header_value,
|
||||
"on_part_data": on_part_data,
|
||||
@@ -1113,7 +1113,7 @@ class _MultipartParserProtocol(protocol.Protocol):
|
||||
self.transport.abortConnection()
|
||||
|
||||
try:
|
||||
self.parser.write(incoming_data) # type: ignore[attr-defined]
|
||||
self.parser.write(incoming_data)
|
||||
except Exception as e:
|
||||
logger.warning(f"Exception writing to multipart parser: {e}")
|
||||
self.deferred.errback()
|
||||
|
||||
@@ -19,7 +19,6 @@
|
||||
#
|
||||
#
|
||||
import abc
|
||||
import cgi
|
||||
import codecs
|
||||
import logging
|
||||
import random
|
||||
@@ -792,7 +791,7 @@ class MatrixFederationHttpClient:
|
||||
url_str,
|
||||
_flatten_response_never_received(e),
|
||||
)
|
||||
body = None
|
||||
body = b""
|
||||
|
||||
exc = HttpResponseException(
|
||||
response.code, response_phrase, body
|
||||
@@ -1813,8 +1812,9 @@ def check_content_type_is(headers: Headers, expected_content_type: str) -> None:
|
||||
)
|
||||
|
||||
c_type = content_type_headers[0].decode("ascii") # only the first header
|
||||
val, options = cgi.parse_header(c_type)
|
||||
if val != expected_content_type:
|
||||
# Extract the 'essence' of the mimetype, removing any parameter
|
||||
c_type_parsed = c_type.split(";", 1)[0].strip()
|
||||
if c_type_parsed != expected_content_type:
|
||||
raise RequestSendFailed(
|
||||
RuntimeError(
|
||||
f"Remote server sent Content-Type header of '{c_type}', not '{expected_content_type}'",
|
||||
|
||||
@@ -37,19 +37,17 @@ from typing import (
|
||||
overload,
|
||||
)
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import BaseModel, MissingError, PydanticValueError, ValidationError
|
||||
from pydantic.v1.error_wrappers import ErrorWrapper
|
||||
else:
|
||||
from pydantic import BaseModel, MissingError, PydanticValueError, ValidationError
|
||||
from pydantic.error_wrappers import ErrorWrapper
|
||||
|
||||
from typing_extensions import Literal
|
||||
|
||||
from twisted.web.server import Request
|
||||
|
||||
from synapse._pydantic_compat import (
|
||||
BaseModel,
|
||||
ErrorWrapper,
|
||||
MissingError,
|
||||
PydanticValueError,
|
||||
ValidationError,
|
||||
)
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.http import redact_uri
|
||||
from synapse.http.server import HttpServer
|
||||
|
||||
@@ -206,7 +206,7 @@ class Thumbnailer:
|
||||
def _encode_image(self, output_image: Image.Image, output_type: str) -> BytesIO:
|
||||
output_bytes_io = BytesIO()
|
||||
fmt = self.FORMATS[output_type]
|
||||
if fmt == "JPEG":
|
||||
if fmt == "JPEG" or fmt == "PNG" and output_image.mode == "CMYK":
|
||||
output_image = output_image.convert("RGB")
|
||||
output_image.save(output_bytes_io, fmt, quality=80)
|
||||
return output_bytes_io
|
||||
|
||||
@@ -41,6 +41,7 @@ import attr
|
||||
from prometheus_client import Counter
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import Deferred
|
||||
|
||||
from synapse.api.constants import EduTypes, EventTypes, HistoryVisibility, Membership
|
||||
from synapse.api.errors import AuthError
|
||||
@@ -52,6 +53,7 @@ from synapse.logging.opentracing import log_kv, start_active_span
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.types import (
|
||||
ISynapseReactor,
|
||||
JsonDict,
|
||||
MultiWriterStreamToken,
|
||||
PersistedEventPosition,
|
||||
@@ -61,8 +63,11 @@ from synapse.types import (
|
||||
StreamToken,
|
||||
UserID,
|
||||
)
|
||||
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
|
||||
from synapse.util.async_helpers import (
|
||||
timeout_deferred,
|
||||
)
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.util.stringutils import shortstr
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -89,18 +94,6 @@ def count(func: Callable[[T], bool], it: Iterable[T]) -> int:
|
||||
return n
|
||||
|
||||
|
||||
class _NotificationListener:
|
||||
"""This represents a single client connection to the events stream.
|
||||
The events stream handler will have yielded to the deferred, so to
|
||||
notify the handler it is sufficient to resolve the deferred.
|
||||
"""
|
||||
|
||||
__slots__ = ["deferred"]
|
||||
|
||||
def __init__(self, deferred: "defer.Deferred"):
|
||||
self.deferred = deferred
|
||||
|
||||
|
||||
class _NotifierUserStream:
|
||||
"""This represents a user connected to the event stream.
|
||||
It tracks the most recent stream token for that user.
|
||||
@@ -113,59 +106,49 @@ class _NotifierUserStream:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
reactor: ISynapseReactor,
|
||||
user_id: str,
|
||||
rooms: StrCollection,
|
||||
current_token: StreamToken,
|
||||
time_now_ms: int,
|
||||
):
|
||||
self.reactor = reactor
|
||||
self.user_id = user_id
|
||||
self.rooms = set(rooms)
|
||||
self.current_token = current_token
|
||||
|
||||
# The last token for which we should wake up any streams that have a
|
||||
# token that comes before it. This gets updated every time we get poked.
|
||||
# We start it at the current token since if we get any streams
|
||||
# that have a token from before we have no idea whether they should be
|
||||
# woken up or not, so lets just wake them up.
|
||||
self.last_notified_token = current_token
|
||||
self.current_token = current_token
|
||||
self.last_notified_ms = time_now_ms
|
||||
|
||||
self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred(
|
||||
defer.Deferred()
|
||||
)
|
||||
# Set of listeners that we need to wake up when there has been a change.
|
||||
self.listeners: Set[Deferred[StreamToken]] = set()
|
||||
|
||||
def notify(
|
||||
def update_and_fetch_deferreds(
|
||||
self,
|
||||
stream_key: StreamKeyType,
|
||||
stream_id: Union[int, RoomStreamToken, MultiWriterStreamToken],
|
||||
current_token: StreamToken,
|
||||
time_now_ms: int,
|
||||
) -> None:
|
||||
"""Notify any listeners for this user of a new event from an
|
||||
event source.
|
||||
) -> Collection["Deferred[StreamToken]"]:
|
||||
"""Update the stream for this user because of a new event from an
|
||||
event source, and return the set of deferreds to wake up.
|
||||
|
||||
Args:
|
||||
stream_key: The stream the event came from.
|
||||
stream_id: The new id for the stream the event came from.
|
||||
current_token: The new current token.
|
||||
time_now_ms: The current time in milliseconds.
|
||||
|
||||
Returns:
|
||||
The set of deferreds that need to be called.
|
||||
"""
|
||||
self.current_token = self.current_token.copy_and_advance(stream_key, stream_id)
|
||||
self.last_notified_token = self.current_token
|
||||
self.current_token = current_token
|
||||
self.last_notified_ms = time_now_ms
|
||||
notify_deferred = self.notify_deferred
|
||||
|
||||
log_kv(
|
||||
{
|
||||
"notify": self.user_id,
|
||||
"stream": stream_key,
|
||||
"stream_id": stream_id,
|
||||
"listeners": self.count_listeners(),
|
||||
}
|
||||
)
|
||||
listeners = self.listeners
|
||||
self.listeners = set()
|
||||
|
||||
users_woken_by_stream_counter.labels(stream_key).inc()
|
||||
|
||||
with PreserveLoggingContext():
|
||||
self.notify_deferred = ObservableDeferred(defer.Deferred())
|
||||
notify_deferred.callback(self.current_token)
|
||||
return listeners
|
||||
|
||||
def remove(self, notifier: "Notifier") -> None:
|
||||
"""Remove this listener from all the indexes in the Notifier
|
||||
@@ -179,9 +162,9 @@ class _NotifierUserStream:
|
||||
notifier.user_to_user_stream.pop(self.user_id)
|
||||
|
||||
def count_listeners(self) -> int:
|
||||
return len(self.notify_deferred.observers())
|
||||
return len(self.listeners)
|
||||
|
||||
def new_listener(self, token: StreamToken) -> _NotificationListener:
|
||||
def new_listener(self, token: StreamToken) -> "Deferred[StreamToken]":
|
||||
"""Returns a deferred that is resolved when there is a new token
|
||||
greater than the given token.
|
||||
|
||||
@@ -191,10 +174,17 @@ class _NotifierUserStream:
|
||||
"""
|
||||
# Immediately wake up stream if something has already since happened
|
||||
# since their last token.
|
||||
if self.last_notified_token != token:
|
||||
return _NotificationListener(defer.succeed(self.current_token))
|
||||
else:
|
||||
return _NotificationListener(self.notify_deferred.observe())
|
||||
if token != self.current_token:
|
||||
return defer.succeed(self.current_token)
|
||||
|
||||
# Create a new deferred and add it to the set of listeners. We add a
|
||||
# cancel handler to remove it from the set again, to handle timeouts.
|
||||
deferred: "Deferred[StreamToken]" = Deferred(
|
||||
canceller=lambda d: self.listeners.discard(d)
|
||||
)
|
||||
self.listeners.add(deferred)
|
||||
|
||||
return deferred
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
@@ -247,6 +237,7 @@ class Notifier:
|
||||
# List of callbacks to be notified when a lock is released
|
||||
self._lock_released_callback: List[Callable[[str, str, str], None]] = []
|
||||
|
||||
self.reactor = hs.get_reactor()
|
||||
self.clock = hs.get_clock()
|
||||
self.appservice_handler = hs.get_application_service_handler()
|
||||
self._pusher_pool = hs.get_pusherpool()
|
||||
@@ -342,14 +333,25 @@ class Notifier:
|
||||
# Wake up all related user stream notifiers
|
||||
user_streams = self.room_to_user_streams.get(room_id, set())
|
||||
time_now_ms = self.clock.time_msec()
|
||||
current_token = self.event_sources.get_current_token()
|
||||
|
||||
listeners: List["Deferred[StreamToken]"] = []
|
||||
for user_stream in user_streams:
|
||||
try:
|
||||
user_stream.notify(
|
||||
StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
|
||||
listeners.extend(
|
||||
user_stream.update_and_fetch_deferreds(current_token, time_now_ms)
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to notify listener")
|
||||
|
||||
with PreserveLoggingContext():
|
||||
for listener in listeners:
|
||||
listener.callback(current_token)
|
||||
|
||||
users_woken_by_stream_counter.labels(StreamKeyType.UN_PARTIAL_STATED_ROOMS).inc(
|
||||
len(user_streams)
|
||||
)
|
||||
|
||||
# Poke the replication so that other workers also see the write to
|
||||
# the un-partial-stated rooms stream.
|
||||
self.notify_replication()
|
||||
@@ -519,12 +521,16 @@ class Notifier:
|
||||
rooms = rooms or []
|
||||
|
||||
with Measure(self.clock, "on_new_event"):
|
||||
user_streams = set()
|
||||
user_streams: Set[_NotifierUserStream] = set()
|
||||
|
||||
log_kv(
|
||||
{
|
||||
"waking_up_explicit_users": len(users),
|
||||
"waking_up_explicit_rooms": len(rooms),
|
||||
"users": shortstr(users),
|
||||
"rooms": shortstr(rooms),
|
||||
"stream": stream_key,
|
||||
"stream_id": new_token,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -544,12 +550,27 @@ class Notifier:
|
||||
)
|
||||
|
||||
time_now_ms = self.clock.time_msec()
|
||||
current_token = self.event_sources.get_current_token()
|
||||
listeners: List["Deferred[StreamToken]"] = []
|
||||
for user_stream in user_streams:
|
||||
try:
|
||||
user_stream.notify(stream_key, new_token, time_now_ms)
|
||||
listeners.extend(
|
||||
user_stream.update_and_fetch_deferreds(
|
||||
current_token, time_now_ms
|
||||
)
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to notify listener")
|
||||
|
||||
# We resolve all these deferreds in one go so that we only need to
|
||||
# call `PreserveLoggingContext` once, as it has a bunch of overhead
|
||||
# (to calculate performance stats)
|
||||
with PreserveLoggingContext():
|
||||
for listener in listeners:
|
||||
listener.callback(current_token)
|
||||
|
||||
users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
|
||||
|
||||
self.notify_replication()
|
||||
|
||||
# Notify appservices.
|
||||
@@ -586,6 +607,7 @@ class Notifier:
|
||||
if room_ids is None:
|
||||
room_ids = await self.store.get_rooms_for_user(user_id)
|
||||
user_stream = _NotifierUserStream(
|
||||
reactor=self.reactor,
|
||||
user_id=user_id,
|
||||
rooms=room_ids,
|
||||
current_token=current_token,
|
||||
@@ -608,8 +630,8 @@ class Notifier:
|
||||
# Now we wait for the _NotifierUserStream to be told there
|
||||
# is a new token.
|
||||
listener = user_stream.new_listener(prev_token)
|
||||
listener.deferred = timeout_deferred(
|
||||
listener.deferred,
|
||||
listener = timeout_deferred(
|
||||
listener,
|
||||
(end_time - now) / 1000.0,
|
||||
self.hs.get_reactor(),
|
||||
)
|
||||
@@ -622,7 +644,7 @@ class Notifier:
|
||||
)
|
||||
|
||||
with PreserveLoggingContext():
|
||||
await listener.deferred
|
||||
await listener
|
||||
|
||||
log_kv(
|
||||
{
|
||||
|
||||
@@ -436,6 +436,7 @@ class BulkPushRuleEvaluator:
|
||||
self._related_event_match_enabled,
|
||||
event.room_version.msc3931_push_features,
|
||||
self.hs.config.experimental.msc1767_enabled, # MSC3931 flag
|
||||
self.hs.config.experimental.msc4210_enabled,
|
||||
)
|
||||
|
||||
for uid, rules in rules_by_user.items():
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2023 New Vector, Ltd
|
||||
# Copyright (C) 2023-2024 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
@@ -23,6 +23,7 @@ from typing import TYPE_CHECKING
|
||||
from synapse.http.server import JsonResource
|
||||
from synapse.replication.http import (
|
||||
account_data,
|
||||
delayed_events,
|
||||
devices,
|
||||
federation,
|
||||
login,
|
||||
@@ -64,3 +65,4 @@ class ReplicationRestResource(JsonResource):
|
||||
login.register_servlets(hs, self)
|
||||
register.register_servlets(hs, self)
|
||||
devices.register_servlets(hs, self)
|
||||
delayed_events.register_servlets(hs, self)
|
||||
|
||||
62
synapse/replication/http/delayed_events.py
Normal file
62
synapse/replication/http/delayed_events.py
Normal file
@@ -0,0 +1,62 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2024 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Dict, Optional, Tuple
|
||||
|
||||
from twisted.web.server import Request
|
||||
|
||||
from synapse.http.server import HttpServer
|
||||
from synapse.replication.http._base import ReplicationEndpoint
|
||||
from synapse.types import JsonDict, JsonMapping
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ReplicationAddedDelayedEventRestServlet(ReplicationEndpoint):
|
||||
"""Handle a delayed event being added by another worker.
|
||||
|
||||
Request format:
|
||||
|
||||
POST /_synapse/replication/delayed_event_added/
|
||||
|
||||
{}
|
||||
"""
|
||||
|
||||
NAME = "added_delayed_event"
|
||||
PATH_ARGS = ()
|
||||
CACHE = False
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
|
||||
self.handler = hs.get_delayed_events_handler()
|
||||
|
||||
@staticmethod
|
||||
async def _serialize_payload(next_send_ts: int) -> JsonDict: # type: ignore[override]
|
||||
return {"next_send_ts": next_send_ts}
|
||||
|
||||
async def _handle_request( # type: ignore[override]
|
||||
self, request: Request, content: JsonDict
|
||||
) -> Tuple[int, Dict[str, Optional[JsonMapping]]]:
|
||||
self.handler.on_added(int(content["next_send_ts"]))
|
||||
|
||||
return 200, {}
|
||||
|
||||
|
||||
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||
ReplicationAddedDelayedEventRestServlet(hs).register(http_server)
|
||||
@@ -48,7 +48,7 @@ class ReplicationRemovePusherRestServlet(ReplicationEndpoint):
|
||||
|
||||
"""
|
||||
|
||||
NAME = "add_user_account_data"
|
||||
NAME = "remove_pusher"
|
||||
PATH_ARGS = ("user_id",)
|
||||
CACHE = False
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright (C) 2023 New Vector, Ltd
|
||||
# Copyright (C) 2023-2024 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
@@ -31,6 +31,7 @@ from synapse.rest.client import (
|
||||
auth,
|
||||
auth_issuer,
|
||||
capabilities,
|
||||
delayed_events,
|
||||
devices,
|
||||
directory,
|
||||
events,
|
||||
@@ -81,6 +82,7 @@ CLIENT_SERVLET_FUNCTIONS: Tuple[RegisterServletsFunc, ...] = (
|
||||
room.register_deprecated_servlets,
|
||||
events.register_servlets,
|
||||
room.register_servlets,
|
||||
delayed_events.register_servlets,
|
||||
login.register_servlets,
|
||||
profile.register_servlets,
|
||||
presence.register_servlets,
|
||||
|
||||
@@ -98,6 +98,8 @@ from synapse.rest.admin.users import (
|
||||
DeactivateAccountRestServlet,
|
||||
PushersRestServlet,
|
||||
RateLimitRestServlet,
|
||||
RedactUser,
|
||||
RedactUserStatus,
|
||||
ResetPasswordRestServlet,
|
||||
SearchUsersRestServlet,
|
||||
ShadowBanRestServlet,
|
||||
@@ -319,6 +321,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||
UserReplaceMasterCrossSigningKeyRestServlet(hs).register(http_server)
|
||||
UserByExternalId(hs).register(http_server)
|
||||
UserByThreePid(hs).register(http_server)
|
||||
RedactUser(hs).register(http_server)
|
||||
RedactUserStatus(hs).register(http_server)
|
||||
|
||||
DeviceRestServlet(hs).register(http_server)
|
||||
DevicesRestServlet(hs).register(http_server)
|
||||
|
||||
@@ -27,7 +27,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union
|
||||
|
||||
import attr
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
from synapse._pydantic_compat import StrictBool, StrictInt, StrictStr
|
||||
from synapse.api.constants import Direction, UserTypes
|
||||
from synapse.api.errors import Codes, NotFoundError, SynapseError
|
||||
from synapse.http.servlet import (
|
||||
@@ -50,17 +50,12 @@ from synapse.rest.admin._base import (
|
||||
from synapse.rest.client._base import client_patterns
|
||||
from synapse.storage.databases.main.registration import ExternalIDReuseException
|
||||
from synapse.storage.databases.main.stats import UserSortOrder
|
||||
from synapse.types import JsonDict, JsonMapping, UserID
|
||||
from synapse.types import JsonDict, JsonMapping, TaskStatus, UserID
|
||||
from synapse.types.rest import RequestBodyModel
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import StrictBool
|
||||
else:
|
||||
from pydantic import StrictBool
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -1410,3 +1405,99 @@ class UserByThreePid(RestServlet):
|
||||
raise NotFoundError("User not found")
|
||||
|
||||
return HTTPStatus.OK, {"user_id": user_id}
|
||||
|
||||
|
||||
class RedactUser(RestServlet):
|
||||
"""
|
||||
Redact all the events of a given user in the given rooms or if empty dict is provided
|
||||
then all events in all rooms user is member of. Kicks off a background process and
|
||||
returns an id that can be used to check on the progress of the redaction progress
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/user/(?P<user_id>[^/]*)/redact")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._auth = hs.get_auth()
|
||||
self._store = hs.get_datastores().main
|
||||
self.admin_handler = hs.get_admin_handler()
|
||||
|
||||
class PostBody(RequestBodyModel):
|
||||
rooms: List[StrictStr]
|
||||
reason: Optional[StrictStr]
|
||||
limit: Optional[StrictInt]
|
||||
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, user_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self._auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self._auth, requester)
|
||||
|
||||
# parse provided user id to check that it is valid
|
||||
UserID.from_string(user_id)
|
||||
|
||||
body = parse_and_validate_json_object_from_request(request, self.PostBody)
|
||||
|
||||
limit = body.limit
|
||||
if limit and limit <= 0:
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"If limit is provided it must be a non-negative integer greater than 0.",
|
||||
)
|
||||
|
||||
rooms = body.rooms
|
||||
if not rooms:
|
||||
current_rooms = list(await self._store.get_rooms_for_user(user_id))
|
||||
banned_rooms = list(
|
||||
await self._store.get_rooms_user_currently_banned_from(user_id)
|
||||
)
|
||||
rooms = current_rooms + banned_rooms
|
||||
|
||||
redact_id = await self.admin_handler.start_redact_events(
|
||||
user_id, rooms, requester.serialize(), body.reason, limit
|
||||
)
|
||||
|
||||
return HTTPStatus.OK, {"redact_id": redact_id}
|
||||
|
||||
|
||||
class RedactUserStatus(RestServlet):
|
||||
"""
|
||||
Check on the progress of the redaction request represented by the provided ID, returning
|
||||
the status of the process and a dict of events that were unable to be redacted, if any
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/user/redact_status/(?P<redact_id>[^/]*)$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._auth = hs.get_auth()
|
||||
self.admin_handler = hs.get_admin_handler()
|
||||
|
||||
async def on_GET(
|
||||
self, request: SynapseRequest, redact_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
await assert_requester_is_admin(self._auth, request)
|
||||
|
||||
task = await self.admin_handler.get_redact_task(redact_id)
|
||||
|
||||
if task:
|
||||
if task.status == TaskStatus.ACTIVE:
|
||||
return HTTPStatus.OK, {"status": TaskStatus.ACTIVE}
|
||||
elif task.status == TaskStatus.COMPLETE:
|
||||
assert task.result is not None
|
||||
failed_redactions = task.result.get("failed_redactions")
|
||||
return HTTPStatus.OK, {
|
||||
"status": TaskStatus.COMPLETE,
|
||||
"failed_redactions": failed_redactions if failed_redactions else {},
|
||||
}
|
||||
elif task.status == TaskStatus.SCHEDULED:
|
||||
return HTTPStatus.OK, {"status": TaskStatus.SCHEDULED}
|
||||
else:
|
||||
return HTTPStatus.OK, {
|
||||
"status": TaskStatus.FAILED,
|
||||
"error": (
|
||||
task.error
|
||||
if task.error
|
||||
else "Unknown error, please check the logs for more information."
|
||||
),
|
||||
}
|
||||
else:
|
||||
raise NotFoundError("redact id '%s' not found" % redact_id)
|
||||
|
||||
@@ -24,18 +24,12 @@ import random
|
||||
from typing import TYPE_CHECKING, List, Optional, Tuple
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import StrictBool, StrictStr, constr
|
||||
else:
|
||||
from pydantic import StrictBool, StrictStr, constr
|
||||
|
||||
import attr
|
||||
from typing_extensions import Literal
|
||||
|
||||
from twisted.web.server import Request
|
||||
|
||||
from synapse._pydantic_compat import StrictBool, StrictStr, constr
|
||||
from synapse.api.constants import LoginType
|
||||
from synapse.api.errors import (
|
||||
Codes,
|
||||
|
||||
111
synapse/rest/client/delayed_events.py
Normal file
111
synapse/rest/client/delayed_events.py
Normal file
@@ -0,0 +1,111 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2024 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
|
||||
# This module contains REST servlets to do with delayed events: /delayed_events/<paths>
|
||||
|
||||
import logging
|
||||
from enum import Enum
|
||||
from http import HTTPStatus
|
||||
from typing import TYPE_CHECKING, Tuple
|
||||
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.http.server import HttpServer
|
||||
from synapse.http.servlet import RestServlet, parse_json_object_from_request
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.rest.client._base import client_patterns
|
||||
from synapse.types import JsonDict
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class _UpdateDelayedEventAction(Enum):
|
||||
CANCEL = "cancel"
|
||||
RESTART = "restart"
|
||||
SEND = "send"
|
||||
|
||||
|
||||
class UpdateDelayedEventServlet(RestServlet):
|
||||
PATTERNS = client_patterns(
|
||||
r"/org\.matrix\.msc4140/delayed_events/(?P<delay_id>[^/]+)$",
|
||||
releases=(),
|
||||
)
|
||||
CATEGORY = "Delayed event management requests"
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__()
|
||||
self.auth = hs.get_auth()
|
||||
self.delayed_events_handler = hs.get_delayed_events_handler()
|
||||
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, delay_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
|
||||
body = parse_json_object_from_request(request)
|
||||
try:
|
||||
action = str(body["action"])
|
||||
except KeyError:
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"'action' is missing",
|
||||
Codes.MISSING_PARAM,
|
||||
)
|
||||
try:
|
||||
enum_action = _UpdateDelayedEventAction(action)
|
||||
except ValueError:
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"'action' is not one of "
|
||||
+ ", ".join(f"'{m.value}'" for m in _UpdateDelayedEventAction),
|
||||
Codes.INVALID_PARAM,
|
||||
)
|
||||
|
||||
if enum_action == _UpdateDelayedEventAction.CANCEL:
|
||||
await self.delayed_events_handler.cancel(requester, delay_id)
|
||||
elif enum_action == _UpdateDelayedEventAction.RESTART:
|
||||
await self.delayed_events_handler.restart(requester, delay_id)
|
||||
elif enum_action == _UpdateDelayedEventAction.SEND:
|
||||
await self.delayed_events_handler.send(requester, delay_id)
|
||||
return 200, {}
|
||||
|
||||
|
||||
class DelayedEventsServlet(RestServlet):
|
||||
PATTERNS = client_patterns(
|
||||
r"/org\.matrix\.msc4140/delayed_events$",
|
||||
releases=(),
|
||||
)
|
||||
CATEGORY = "Delayed event management requests"
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__()
|
||||
self.auth = hs.get_auth()
|
||||
self.delayed_events_handler = hs.get_delayed_events_handler()
|
||||
|
||||
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
# TODO: Support Pagination stream API ("from" query parameter)
|
||||
delayed_events = await self.delayed_events_handler.get_all_for_user(requester)
|
||||
|
||||
ret = {"delayed_events": delayed_events}
|
||||
return 200, ret
|
||||
|
||||
|
||||
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||
# The following can't currently be instantiated on workers.
|
||||
if hs.config.worker.worker_app is None:
|
||||
UpdateDelayedEventServlet(hs).register(http_server)
|
||||
DelayedEventsServlet(hs).register(http_server)
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user