mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-07 01:20:16 +00:00
Compare commits
127 Commits
travis/win
...
erikj/ss_u
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
89a6ff8224 | ||
|
|
29bd79728d | ||
|
|
569d884543 | ||
|
|
993644ded0 | ||
|
|
a5d25bb623 | ||
|
|
f162c92f2a | ||
|
|
9ce489be5e | ||
|
|
fae75b0376 | ||
|
|
f77bfbfa30 | ||
|
|
1892ba5f67 | ||
|
|
a51daffba5 | ||
|
|
b05b2e14bb | ||
|
|
a308d99f30 | ||
|
|
a9fc1fd112 | ||
|
|
6a11bdf01d | ||
|
|
8fea190a1f | ||
|
|
81c19c4cd2 | ||
|
|
aaa3c36420 | ||
|
|
3e7eb45eb1 | ||
|
|
bab37dfc6f | ||
|
|
9f9ec92526 | ||
|
|
ff7b27013e | ||
|
|
e1f5f0fbb8 | ||
|
|
8c9f2743bc | ||
|
|
b076941a36 | ||
|
|
8bbe65f3c0 | ||
|
|
b7faf01f26 | ||
|
|
4f7f6ee9a0 | ||
|
|
a640b318df | ||
|
|
34b7586446 | ||
|
|
70b0e38603 | ||
|
|
f31360e34b | ||
|
|
3ad38b644d | ||
|
|
44ac2aa3b6 | ||
|
|
11db575218 | ||
|
|
30e9f6e469 | ||
|
|
eb62d12063 | ||
|
|
ceb3686dcd | ||
|
|
1dfa59b238 | ||
|
|
bef6568537 | ||
|
|
244a255065 | ||
|
|
932cb0a928 | ||
|
|
2dad718265 | ||
|
|
5d8446298c | ||
|
|
d845e939a9 | ||
|
|
23727869c7 | ||
|
|
c270355349 | ||
|
|
e3db7b2d81 | ||
|
|
2b620e0a15 | ||
|
|
39731bb205 | ||
|
|
1d6186265a | ||
|
|
46de0ee16b | ||
|
|
b221f0b84b | ||
|
|
b2c55bd049 | ||
|
|
ed583d9c81 | ||
|
|
f76dc9923c | ||
|
|
7e997fb8b1 | ||
|
|
dbc2290cbe | ||
|
|
2f6b86e79a | ||
|
|
37f9876ccf | ||
|
|
8b449a8ce6 | ||
|
|
53db8a914e | ||
|
|
e4868f8a1e | ||
|
|
dcad81082c | ||
|
|
c56b070e6f | ||
|
|
be726724a8 | ||
|
|
62ae56a4ac | ||
|
|
808dab0699 | ||
|
|
34306be5aa | ||
|
|
be4a16ff44 | ||
|
|
568051c0f0 | ||
|
|
ebbabfe782 | ||
|
|
69ac4b6a6e | ||
|
|
729026e604 | ||
|
|
bdf37ad4c4 | ||
|
|
8bbc98e66d | ||
|
|
4b9f4c2abf | ||
|
|
e8ee784c75 | ||
|
|
48c1307911 | ||
|
|
d225b6b3eb | ||
|
|
1daae43f3a | ||
|
|
a9ee832e48 | ||
|
|
13a99fba1b | ||
|
|
e3a0681ecf | ||
|
|
de05a64246 | ||
|
|
d221512498 | ||
|
|
ed0face8ad | ||
|
|
73529d3732 | ||
|
|
1648337775 | ||
|
|
dc8ddc6472 | ||
|
|
d3f9afd8d9 | ||
|
|
43c865f7c9 | ||
|
|
71d83477cb | ||
|
|
6a01af59e1 | ||
|
|
f583f1dce4 | ||
|
|
a574de0062 | ||
|
|
3fee32ed6b | ||
|
|
5884f0a956 | ||
|
|
79924aebef | ||
|
|
574aa53126 | ||
|
|
83894180b2 | ||
|
|
429ecb7564 | ||
|
|
9e1acea051 | ||
|
|
899d33f2ba | ||
|
|
df11af14db | ||
|
|
d88ba45db9 | ||
|
|
14f2b1eb00 | ||
|
|
2af729a193 | ||
|
|
0de0689ae8 | ||
|
|
4c44020838 | ||
|
|
4f6194492a | ||
|
|
ab62aa09da | ||
|
|
fb66e938b2 | ||
|
|
5a97bbd895 | ||
|
|
606da398fc | ||
|
|
677142b6a9 | ||
|
|
342f0c35b7 | ||
|
|
5871daf877 | ||
|
|
30e14c8510 | ||
|
|
3232bc2982 | ||
|
|
4ca13ce0dd | ||
|
|
8e229535fa | ||
|
|
e0ff850cb7 | ||
|
|
22fbc5be54 | ||
|
|
1cf3ff6b40 | ||
|
|
62d8b0361b | ||
|
|
4d6f7c0fc9 |
2
.github/workflows/docker.yml
vendored
2
.github/workflows/docker.yml
vendored
@@ -30,7 +30,7 @@ jobs:
|
||||
run: docker buildx inspect
|
||||
|
||||
- name: Install Cosign
|
||||
uses: sigstore/cosign-installer@v3.5.0
|
||||
uses: sigstore/cosign-installer@v3.6.0
|
||||
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
|
||||
4
.github/workflows/tests.yml
vendored
4
.github/workflows/tests.yml
vendored
@@ -305,7 +305,7 @@ jobs:
|
||||
- lint-readme
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: matrix-org/done-action@v2
|
||||
- uses: matrix-org/done-action@v3
|
||||
with:
|
||||
needs: ${{ toJSON(needs) }}
|
||||
|
||||
@@ -737,7 +737,7 @@ jobs:
|
||||
- linting-done
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: matrix-org/done-action@v2
|
||||
- uses: matrix-org/done-action@v3
|
||||
with:
|
||||
needs: ${{ toJSON(needs) }}
|
||||
|
||||
|
||||
179
CHANGES.md
179
CHANGES.md
@@ -1,3 +1,182 @@
|
||||
# Synapse 1.113.0 (2024-08-13)
|
||||
|
||||
No significant changes since 1.113.0rc1.
|
||||
|
||||
|
||||
|
||||
|
||||
# Synapse 1.113.0rc1 (2024-08-06)
|
||||
|
||||
### Features
|
||||
|
||||
- Track which rooms have been sent to clients in the experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17447](https://github.com/element-hq/synapse/issues/17447))
|
||||
- Add Account Data extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17477](https://github.com/element-hq/synapse/issues/17477))
|
||||
- Add receipts extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17489](https://github.com/element-hq/synapse/issues/17489))
|
||||
- Add typing notification extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17505](https://github.com/element-hq/synapse/issues/17505))
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- Update experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint to handle invite/knock rooms when filtering. ([\#17450](https://github.com/element-hq/synapse/issues/17450))
|
||||
- Fix a bug introduced in v1.110.0 which caused `/keys/query` to return incomplete results, leading to high network activity and CPU usage on Matrix clients. ([\#17499](https://github.com/element-hq/synapse/issues/17499))
|
||||
|
||||
### Improved Documentation
|
||||
|
||||
- Update the [`allowed_local_3pids`](https://element-hq.github.io/synapse/v1.112/usage/configuration/config_documentation.html#allowed_local_3pids) config option's msisdn address to a working example. ([\#17476](https://github.com/element-hq/synapse/issues/17476))
|
||||
|
||||
### Internal Changes
|
||||
|
||||
- Change sliding sync to use their own token format in preparation for storing per-connection state. ([\#17452](https://github.com/element-hq/synapse/issues/17452))
|
||||
- Ensure we don't send down negative `bump_stamp` in experimental sliding sync endpoint. ([\#17478](https://github.com/element-hq/synapse/issues/17478))
|
||||
- Do not send down empty room entries down experimental sliding sync endpoint. ([\#17479](https://github.com/element-hq/synapse/issues/17479))
|
||||
- Refactor Sliding Sync tests to better utilize the `SlidingSyncBase`. ([\#17481](https://github.com/element-hq/synapse/issues/17481), [\#17482](https://github.com/element-hq/synapse/issues/17482))
|
||||
- Add some opentracing tags and logging to the experimental sliding sync implementation. ([\#17501](https://github.com/element-hq/synapse/issues/17501))
|
||||
- Split and move Sliding Sync tests so we have some more sane test file sizes. ([\#17504](https://github.com/element-hq/synapse/issues/17504))
|
||||
- Update the `limited` field description in the Sliding Sync response to accurately describe what it actually represents. ([\#17507](https://github.com/element-hq/synapse/issues/17507))
|
||||
- Easier to understand `timeline` assertions in Sliding Sync tests. ([\#17511](https://github.com/element-hq/synapse/issues/17511))
|
||||
- Reset the sliding sync connection if we don't recognize the per-connection state position. ([\#17529](https://github.com/element-hq/synapse/issues/17529))
|
||||
|
||||
|
||||
|
||||
### Updates to locked dependencies
|
||||
|
||||
* Bump bcrypt from 4.1.3 to 4.2.0. ([\#17495](https://github.com/element-hq/synapse/issues/17495))
|
||||
* Bump black from 24.4.2 to 24.8.0. ([\#17522](https://github.com/element-hq/synapse/issues/17522))
|
||||
* Bump phonenumbers from 8.13.39 to 8.13.42. ([\#17521](https://github.com/element-hq/synapse/issues/17521))
|
||||
* Bump ruff from 0.5.4 to 0.5.5. ([\#17494](https://github.com/element-hq/synapse/issues/17494))
|
||||
* Bump serde_json from 1.0.120 to 1.0.121. ([\#17493](https://github.com/element-hq/synapse/issues/17493))
|
||||
* Bump serde_json from 1.0.121 to 1.0.122. ([\#17525](https://github.com/element-hq/synapse/issues/17525))
|
||||
* Bump towncrier from 23.11.0 to 24.7.1. ([\#17523](https://github.com/element-hq/synapse/issues/17523))
|
||||
* Bump types-pyopenssl from 24.1.0.20240425 to 24.1.0.20240722. ([\#17496](https://github.com/element-hq/synapse/issues/17496))
|
||||
* Bump types-setuptools from 70.1.0.20240627 to 71.1.0.20240726. ([\#17497](https://github.com/element-hq/synapse/issues/17497))
|
||||
|
||||
# Synapse 1.112.0 (2024-07-30)
|
||||
|
||||
This security release is to update our locked dependency on Twisted to 24.7.0rc1, which includes a security fix for [CVE-2024-41671 / GHSA-c8m8-j448-xjx7: Disordered HTTP pipeline response in twisted.web, again](https://github.com/twisted/twisted/security/advisories/GHSA-c8m8-j448-xjx7).
|
||||
|
||||
Note that this security fix is also available as **Synapse 1.111.1**, which does not include the rest of the changes in Synapse 1.112.0.
|
||||
|
||||
This issue means that, if multiple HTTP requests are pipelined in the same TCP connection, Synapse can send responses to the wrong HTTP request.
|
||||
If a reverse proxy was configured to use HTTP pipelining, this could result in responses being sent to the wrong user, severely harming confidentiality.
|
||||
|
||||
With that said, despite being a high severity issue, **we consider it unlikely that Synapse installations will be affected**.
|
||||
The use of HTTP pipelining in this fashion would cause worse performance for clients (request-response latencies would be increased as users' responses would be artificially blocked behind other users' slow requests). Further, Nginx and Haproxy, two common reverse proxies, do not appear to support configuring their upstreams to use HTTP pipelining and thus would not be affected. For both of these reasons, we consider it unlikely that a Synapse deployment would be set up in such a configuration.
|
||||
|
||||
Despite that, we cannot rule out that some installations may exist with this unusual setup and so we are releasing this security update today.
|
||||
|
||||
**pip users:** Note that by default, upgrading Synapse using pip will not automatically upgrade Twisted. **Please manually install the new version of Twisted** using `pip install Twisted==24.7.0rc1`. Note also that even the `--upgrade-strategy=eager` flag to `pip install -U matrix-synapse` will not upgrade Twisted to a patched version because it is only a release candidate at this time.
|
||||
|
||||
### Internal Changes
|
||||
|
||||
- Upgrade locked dependency on Twisted to 24.7.0rc1. ([\#17502](https://github.com/element-hq/synapse/issues/17502))
|
||||
|
||||
|
||||
# Synapse 1.112.0rc1 (2024-07-23)
|
||||
|
||||
Please note that this release candidate does not include the security dependency update
|
||||
included in version 1.111.1 as this version was released before 1.111.1.
|
||||
The same security fix can be found in the full release of 1.112.0.
|
||||
|
||||
### Features
|
||||
|
||||
- Add to-device extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17416](https://github.com/element-hq/synapse/issues/17416))
|
||||
- Populate `name`/`avatar` fields in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17418](https://github.com/element-hq/synapse/issues/17418))
|
||||
- Populate `heroes` and room summary fields (`joined_count`, `invited_count`) in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17419](https://github.com/element-hq/synapse/issues/17419))
|
||||
- Populate `is_dm` room field in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17429](https://github.com/element-hq/synapse/issues/17429))
|
||||
- Add room subscriptions to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17432](https://github.com/element-hq/synapse/issues/17432))
|
||||
- Prepare for authenticated media freeze. ([\#17433](https://github.com/element-hq/synapse/issues/17433))
|
||||
- Add E2EE extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17454](https://github.com/element-hq/synapse/issues/17454))
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- Add configurable option to always include offline users in presence sync results. Contributed by @Michael-Hollister. ([\#17231](https://github.com/element-hq/synapse/issues/17231))
|
||||
- Fix bug in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint when using room type filters and the user has one or more remote invites. ([\#17434](https://github.com/element-hq/synapse/issues/17434))
|
||||
- Order `heroes` by `stream_ordering` as the Matrix specification states (applies to `/sync`). ([\#17435](https://github.com/element-hq/synapse/issues/17435))
|
||||
- Fix rare bug where `/sync` would break for a user when using workers with multiple stream writers. ([\#17438](https://github.com/element-hq/synapse/issues/17438))
|
||||
|
||||
### Improved Documentation
|
||||
|
||||
- Update the readme image to have a white background, so that it is readable in dark mode. ([\#17387](https://github.com/element-hq/synapse/issues/17387))
|
||||
- Add Red Hat Enterprise Linux and Rocky Linux 8 and 9 installation instructions. ([\#17423](https://github.com/element-hq/synapse/issues/17423))
|
||||
- Improve documentation for the [`default_power_level_content_override`](https://element-hq.github.io/synapse/latest/usage/configuration/config_documentation.html#default_power_level_content_override) config option. ([\#17451](https://github.com/element-hq/synapse/issues/17451))
|
||||
|
||||
### Internal Changes
|
||||
|
||||
- Make sure we always use the right logic for enabling the media repo. ([\#17424](https://github.com/element-hq/synapse/issues/17424))
|
||||
- Fix argument documentation for method `RateLimiter.record_action`. ([\#17426](https://github.com/element-hq/synapse/issues/17426))
|
||||
- Reduce volume of 'Waiting for current token' logs, which were introduced in v1.109.0. ([\#17428](https://github.com/element-hq/synapse/issues/17428))
|
||||
- Limit concurrent remote downloads to 6 per IP address, and decrement remote downloads without a content-length from the ratelimiter after the download is complete. ([\#17439](https://github.com/element-hq/synapse/issues/17439))
|
||||
- Remove unnecessary call to resume producing in fake channel. ([\#17449](https://github.com/element-hq/synapse/issues/17449))
|
||||
- Update experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint to bump room when it is created. ([\#17453](https://github.com/element-hq/synapse/issues/17453))
|
||||
- Speed up generating sliding sync responses. ([\#17458](https://github.com/element-hq/synapse/issues/17458))
|
||||
- Add cache to `get_rooms_for_local_user_where_membership_is` to speed up sliding sync. ([\#17460](https://github.com/element-hq/synapse/issues/17460))
|
||||
- Speed up fetching room keys from backup. ([\#17461](https://github.com/element-hq/synapse/issues/17461))
|
||||
- Speed up sorting of the room list in sliding sync. ([\#17468](https://github.com/element-hq/synapse/issues/17468))
|
||||
- Implement handling of `$ME` as a state key in sliding sync. ([\#17469](https://github.com/element-hq/synapse/issues/17469))
|
||||
|
||||
|
||||
|
||||
### Updates to locked dependencies
|
||||
|
||||
* Bump bytes from 1.6.0 to 1.6.1. ([\#17441](https://github.com/element-hq/synapse/issues/17441))
|
||||
* Bump hiredis from 2.3.2 to 3.0.0. ([\#17464](https://github.com/element-hq/synapse/issues/17464))
|
||||
* Bump jsonschema from 4.22.0 to 4.23.0. ([\#17444](https://github.com/element-hq/synapse/issues/17444))
|
||||
* Bump matrix-org/done-action from 2 to 3. ([\#17440](https://github.com/element-hq/synapse/issues/17440))
|
||||
* Bump mypy from 1.9.0 to 1.10.1. ([\#17445](https://github.com/element-hq/synapse/issues/17445))
|
||||
* Bump pyopenssl from 24.1.0 to 24.2.1. ([\#17465](https://github.com/element-hq/synapse/issues/17465))
|
||||
* Bump ruff from 0.5.0 to 0.5.4. ([\#17466](https://github.com/element-hq/synapse/issues/17466))
|
||||
* Bump sentry-sdk from 2.6.0 to 2.8.0. ([\#17456](https://github.com/element-hq/synapse/issues/17456))
|
||||
* Bump sentry-sdk from 2.8.0 to 2.10.0. ([\#17467](https://github.com/element-hq/synapse/issues/17467))
|
||||
* Bump setuptools from 67.6.0 to 70.0.0. ([\#17448](https://github.com/element-hq/synapse/issues/17448))
|
||||
* Bump twine from 5.1.0 to 5.1.1. ([\#17443](https://github.com/element-hq/synapse/issues/17443))
|
||||
* Bump types-jsonschema from 4.22.0.20240610 to 4.23.0.20240712. ([\#17446](https://github.com/element-hq/synapse/issues/17446))
|
||||
* Bump ulid from 1.1.2 to 1.1.3. ([\#17442](https://github.com/element-hq/synapse/issues/17442))
|
||||
* Bump zipp from 3.15.0 to 3.19.1. ([\#17427](https://github.com/element-hq/synapse/issues/17427))
|
||||
|
||||
|
||||
# Synapse 1.111.1 (2024-07-30)
|
||||
|
||||
This security release is to update our locked dependency on Twisted to 24.7.0rc1, which includes a security fix for [CVE-2024-41671 / GHSA-c8m8-j448-xjx7: Disordered HTTP pipeline response in twisted.web, again](https://github.com/twisted/twisted/security/advisories/GHSA-c8m8-j448-xjx7).
|
||||
|
||||
This issue means that, if multiple HTTP requests are pipelined in the same TCP connection, Synapse can send responses to the wrong HTTP request.
|
||||
If a reverse proxy was configured to use HTTP pipelining, this could result in responses being sent to the wrong user, severely harming confidentiality.
|
||||
|
||||
With that said, despite being a high severity issue, **we consider it unlikely that Synapse installations will be affected**.
|
||||
The use of HTTP pipelining in this fashion would cause worse performance for clients (request-response latencies would be increased as users' responses would be artificially blocked behind other users' slow requests). Further, Nginx and Haproxy, two common reverse proxies, do not appear to support configuring their upstreams to use HTTP pipelining and thus would not be affected. For both of these reasons, we consider it unlikely that a Synapse deployment would be set up in such a configuration.
|
||||
|
||||
Despite that, we cannot rule out that some installations may exist with this unusual setup and so we are releasing this security update today.
|
||||
|
||||
**pip users:** Note that by default, upgrading Synapse using pip will not automatically upgrade Twisted. **Please manually install the new version of Twisted** using `pip install Twisted==24.7.0rc1`. Note also that even the `--upgrade-strategy=eager` flag to `pip install -U matrix-synapse` will not upgrade Twisted to a patched version because it is only a release candidate at this time.
|
||||
|
||||
|
||||
### Internal Changes
|
||||
|
||||
- Upgrade locked dependency on Twisted to 24.7.0rc1. ([\#17502](https://github.com/element-hq/synapse/issues/17502))
|
||||
|
||||
|
||||
# Synapse 1.111.0 (2024-07-16)
|
||||
|
||||
No significant changes since 1.111.0rc2.
|
||||
|
||||
|
||||
|
||||
|
||||
# Synapse 1.111.0rc2 (2024-07-10)
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- Fix bug where using `synapse.app.media_repository` worker configuration would break the new media endpoints. ([\#17420](https://github.com/element-hq/synapse/issues/17420))
|
||||
|
||||
### Improved Documentation
|
||||
|
||||
- Document the new federation media worker endpoints in the [upgrade notes](https://element-hq.github.io/synapse/v1.111/upgrade.html) and [worker docs](https://element-hq.github.io/synapse/v1.111/workers.html). ([\#17421](https://github.com/element-hq/synapse/issues/17421))
|
||||
|
||||
### Internal Changes
|
||||
|
||||
- Route authenticated federation media requests to media repository workers in Complement tests. ([\#17422](https://github.com/element-hq/synapse/issues/17422))
|
||||
|
||||
|
||||
|
||||
|
||||
# Synapse 1.111.0rc1 (2024-07-09)
|
||||
|
||||
### Features
|
||||
|
||||
25
Cargo.lock
generated
25
Cargo.lock
generated
@@ -67,9 +67,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c"
|
||||
|
||||
[[package]]
|
||||
name = "bytes"
|
||||
version = "1.6.0"
|
||||
version = "1.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9"
|
||||
checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50"
|
||||
|
||||
[[package]]
|
||||
name = "cfg-if"
|
||||
@@ -444,9 +444,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "regex"
|
||||
version = "1.10.5"
|
||||
version = "1.10.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b91213439dad192326a0d7c6ee3955910425f441d7038e0d6933b0aec5c4517f"
|
||||
checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
@@ -485,18 +485,18 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.204"
|
||||
version = "1.0.206"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bc76f558e0cbb2a839d37354c575f1dc3fdc6546b5be373ba43d95f231bf7c12"
|
||||
checksum = "5b3e4cd94123dd520a128bcd11e34d9e9e423e7e3e50425cb1b4b1e3549d0284"
|
||||
dependencies = [
|
||||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.204"
|
||||
version = "1.0.206"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222"
|
||||
checksum = "fabfb6138d2383ea8208cf98ccf69cdfb1aff4088460681d84189aa259762f97"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -505,11 +505,12 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde_json"
|
||||
version = "1.0.120"
|
||||
version = "1.0.124"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4e0d21c9a8cae1235ad58a00c11cb40d4b1e5c784f1ef2c537876ed6ffd8b7c5"
|
||||
checksum = "66ad62847a56b3dba58cc891acd13884b9c61138d330c0d7b6181713d4fce38d"
|
||||
dependencies = [
|
||||
"itoa",
|
||||
"memchr",
|
||||
"ryu",
|
||||
"serde",
|
||||
]
|
||||
@@ -597,9 +598,9 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
|
||||
|
||||
[[package]]
|
||||
name = "ulid"
|
||||
version = "1.1.2"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "34778c17965aa2a08913b57e1f34db9b4a63f5de31768b55bf20d2795f921259"
|
||||
checksum = "04f903f293d11f31c0c29e4148f6dc0d033a7f80cebc0282bea147611667d289"
|
||||
dependencies = [
|
||||
"getrandom",
|
||||
"rand",
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
.. image:: https://github.com/element-hq/product/assets/87339233/7abf477a-5277-47f3-be44-ea44917d8ed7
|
||||
.. image:: ./docs/element_logo_white_bg.svg
|
||||
:height: 60px
|
||||
|
||||
**Element Synapse - Matrix homeserver implementation**
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
Fix Docker image not building locally in some Windows environments.
|
||||
1
changelog.d/17483.bugfix
Normal file
1
changelog.d/17483.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Start handlers for new media endpoints when media resource configured.
|
||||
1
changelog.d/17510.bugfix
Normal file
1
changelog.d/17510.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix timeline ordering (using `stream_ordering` instead of topological ordering) in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
|
||||
1
changelog.d/17514.misc
Normal file
1
changelog.d/17514.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add more tracing to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
|
||||
3
changelog.d/17515.doc
Normal file
3
changelog.d/17515.doc
Normal file
@@ -0,0 +1,3 @@
|
||||
Clarify default behaviour of the
|
||||
[`auto_accept_invites.worker_to_run_on`](https://element-hq.github.io/synapse/develop/usage/configuration/config_documentation.html#auto-accept-invites)
|
||||
option.
|
||||
1
changelog.d/17531.misc
Normal file
1
changelog.d/17531.misc
Normal file
@@ -0,0 +1 @@
|
||||
Fixup comment in sliding sync implementation.
|
||||
1
changelog.d/17535.bugfix
Normal file
1
changelog.d/17535.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix experimental sliding sync implementation to remember any updates in rooms that were not sent down immediately.
|
||||
1
changelog.d/17536.misc
Normal file
1
changelog.d/17536.misc
Normal file
@@ -0,0 +1 @@
|
||||
Replace override of deprecated method `HTTPAdapter.get_connection` with `get_connection_with_tls_context`.
|
||||
1
changelog.d/17537.misc
Normal file
1
changelog.d/17537.misc
Normal file
@@ -0,0 +1 @@
|
||||
Fix performance of device lists in `/key/changes` and sliding sync.
|
||||
1
changelog.d/17538.bugfix
Normal file
1
changelog.d/17538.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Better exclude partially stated rooms if we must await full state in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
|
||||
1
changelog.d/17542.misc
Normal file
1
changelog.d/17542.misc
Normal file
@@ -0,0 +1 @@
|
||||
Bump setuptools from 67.6.0 to 72.1.0.
|
||||
1
changelog.d/17545.bugfix
Normal file
1
changelog.d/17545.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Handle lower-case http headers in `_Mulitpart_Parser_Protocol`.
|
||||
1
changelog.d/17548.misc
Normal file
1
changelog.d/17548.misc
Normal file
@@ -0,0 +1 @@
|
||||
Fix performance of device lists in `/key/changes` and sliding sync.
|
||||
1
changelog.d/17557.misc
Normal file
1
changelog.d/17557.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add a utility function for generating random event IDs.
|
||||
1
changelog.d/17558.misc
Normal file
1
changelog.d/17558.misc
Normal file
@@ -0,0 +1 @@
|
||||
Speed up responding to media requests.
|
||||
1
changelog.d/17559.doc
Normal file
1
changelog.d/17559.doc
Normal file
@@ -0,0 +1 @@
|
||||
Improve docstrings for profile methods.
|
||||
1
changelog.d/17561.misc
Normal file
1
changelog.d/17561.misc
Normal file
@@ -0,0 +1 @@
|
||||
Speed up responding to media requests.
|
||||
1
changelog.d/17562.misc
Normal file
1
changelog.d/17562.misc
Normal file
@@ -0,0 +1 @@
|
||||
Test github token before running release script steps.
|
||||
1
changelog.d/17563.misc
Normal file
1
changelog.d/17563.misc
Normal file
@@ -0,0 +1 @@
|
||||
Reduce log spam of multipart files.
|
||||
1
changelog.d/17564.misc
Normal file
1
changelog.d/17564.misc
Normal file
@@ -0,0 +1 @@
|
||||
Speed up responding to media requests.
|
||||
1
changelog.d/17566.misc
Normal file
1
changelog.d/17566.misc
Normal file
@@ -0,0 +1 @@
|
||||
Speed up responding to media requests.
|
||||
1
changelog.d/17567.misc
Normal file
1
changelog.d/17567.misc
Normal file
@@ -0,0 +1 @@
|
||||
Speed up responding to media requests.
|
||||
1
changelog.d/17568.bugfix
Normal file
1
changelog.d/17568.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix fetching federation signing keys from servers that omit `old_verify_keys`. Contributed by @tulir @ Beeper.
|
||||
1
changelog.d/17569.misc
Normal file
1
changelog.d/17569.misc
Normal file
@@ -0,0 +1 @@
|
||||
Speed up responding to media requests.
|
||||
1
changelog.d/17570.bugfix
Normal file
1
changelog.d/17570.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix bug where we would respond with an error when a remote server asked for media that had a length of 0, using the new multipart federation media endpoint.
|
||||
1
changelog.d/17571.misc
Normal file
1
changelog.d/17571.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add a flag to `/versions`, `org.matrix.simplified_msc3575`, to indicate whether experimental sliding sync support has been enabled.
|
||||
42
debian/changelog
vendored
42
debian/changelog
vendored
@@ -1,3 +1,45 @@
|
||||
matrix-synapse-py3 (1.113.0) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.113.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 13 Aug 2024 14:36:56 +0100
|
||||
|
||||
matrix-synapse-py3 (1.113.0~rc1) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.113.0rc1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 06 Aug 2024 12:23:23 +0100
|
||||
|
||||
matrix-synapse-py3 (1.112.0) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.112.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 30 Jul 2024 17:15:48 +0100
|
||||
|
||||
matrix-synapse-py3 (1.112.0~rc1) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.112.0rc1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 23 Jul 2024 08:58:55 -0600
|
||||
|
||||
matrix-synapse-py3 (1.111.1) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.111.1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 30 Jul 2024 16:13:52 +0100
|
||||
|
||||
matrix-synapse-py3 (1.111.0) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.111.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 16 Jul 2024 12:42:46 +0200
|
||||
|
||||
matrix-synapse-py3 (1.111.0~rc2) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.111.0rc2.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Wed, 10 Jul 2024 08:46:54 +0000
|
||||
|
||||
matrix-synapse-py3 (1.111.0~rc1) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.111.0rc1.
|
||||
|
||||
2
debian/templates
vendored
2
debian/templates
vendored
@@ -5,7 +5,7 @@ _Description: Name of the server:
|
||||
servers via federation. This is normally the public hostname of the
|
||||
server running synapse, but can be different if you set up delegation.
|
||||
Please refer to the delegation documentation in this case:
|
||||
https://github.com/element-hq/synapse/blob/master/docs/delegate.md.
|
||||
https://element-hq.github.io/synapse/latest/delegate.html.
|
||||
|
||||
Template: matrix-synapse/report-stats
|
||||
Type: boolean
|
||||
|
||||
@@ -27,7 +27,7 @@ ARG PYTHON_VERSION=3.11
|
||||
###
|
||||
# We hardcode the use of Debian bookworm here because this could change upstream
|
||||
# and other Dockerfiles used for testing are expecting bookworm.
|
||||
FROM docker.io/library/python:${PYTHON_VERSION}-slim-bookworm as requirements
|
||||
FROM docker.io/library/python:${PYTHON_VERSION}-slim-bookworm AS requirements
|
||||
|
||||
# RUN --mount is specific to buildkit and is documented at
|
||||
# https://github.com/moby/buildkit/blob/master/frontend/dockerfile/docs/syntax.md#build-mounts-run---mount.
|
||||
@@ -87,7 +87,7 @@ RUN if [ -z "$TEST_ONLY_IGNORE_POETRY_LOCKFILE" ]; then \
|
||||
###
|
||||
### Stage 1: builder
|
||||
###
|
||||
FROM docker.io/library/python:${PYTHON_VERSION}-slim-bookworm as builder
|
||||
FROM docker.io/library/python:${PYTHON_VERSION}-slim-bookworm AS builder
|
||||
|
||||
# install the OS build deps
|
||||
RUN \
|
||||
@@ -179,13 +179,11 @@ RUN \
|
||||
libicu72 \
|
||||
libssl-dev \
|
||||
openssl \
|
||||
dos2unix \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
COPY --from=builder /install /usr/local
|
||||
COPY ./docker/start.py /start.py
|
||||
COPY ./docker/conf /conf
|
||||
RUN dos2unix /start.py
|
||||
|
||||
EXPOSE 8008/tcp 8009/tcp 8448/tcp
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ ARG distro=""
|
||||
# https://launchpad.net/~jyrki-pulliainen/+archive/ubuntu/dh-virtualenv, but
|
||||
# it's not obviously easier to use that than to build our own.)
|
||||
|
||||
FROM docker.io/library/${distro} as builder
|
||||
FROM docker.io/library/${distro} AS builder
|
||||
|
||||
RUN apt-get update -qq -o Acquire::Languages=none
|
||||
RUN env DEBIAN_FRONTEND=noninteractive apt-get install \
|
||||
|
||||
@@ -126,6 +126,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
|
||||
"^/_synapse/admin/v1/media/.*$",
|
||||
"^/_synapse/admin/v1/quarantine_media/.*$",
|
||||
"^/_matrix/client/v1/media/.*$",
|
||||
"^/_matrix/federation/v1/media/.*$",
|
||||
],
|
||||
# The first configured media worker will run the media background jobs
|
||||
"shared_extra_conf": {
|
||||
|
||||
@@ -21,8 +21,10 @@ incrementing integer, but backfilled events start with `stream_ordering=-1` and
|
||||
|
||||
---
|
||||
|
||||
- `/sync` returns things in the order they arrive at the server (`stream_ordering`).
|
||||
- `/messages` (and `/backfill` in the federation API) return them in the order determined by the event graph `(topological_ordering, stream_ordering)`.
|
||||
- Incremental `/sync?since=xxx` returns things in the order they arrive at the server
|
||||
(`stream_ordering`).
|
||||
- Initial `/sync`, `/messages` (and `/backfill` in the federation API) return them in
|
||||
the order determined by the event graph `(topological_ordering, stream_ordering)`.
|
||||
|
||||
The general idea is that, if you're following a room in real-time (i.e.
|
||||
`/sync`), you probably want to see the messages as they arrive at your server,
|
||||
|
||||
94
docs/element_logo_white_bg.svg
Normal file
94
docs/element_logo_white_bg.svg
Normal file
@@ -0,0 +1,94 @@
|
||||
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
|
||||
<!-- Created with Inkscape (http://www.inkscape.org/) -->
|
||||
|
||||
<svg
|
||||
width="41.440346mm"
|
||||
height="10.383124mm"
|
||||
viewBox="0 0 41.440346 10.383125"
|
||||
version="1.1"
|
||||
id="svg1"
|
||||
xml:space="preserve"
|
||||
sodipodi:docname="element_logo_white_bg.svg"
|
||||
xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
|
||||
xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
|
||||
xmlns="http://www.w3.org/2000/svg"
|
||||
xmlns:svg="http://www.w3.org/2000/svg"><sodipodi:namedview
|
||||
id="namedview1"
|
||||
pagecolor="#ffffff"
|
||||
bordercolor="#000000"
|
||||
borderopacity="0.25"
|
||||
inkscape:showpageshadow="2"
|
||||
inkscape:pageopacity="0.0"
|
||||
inkscape:pagecheckerboard="0"
|
||||
inkscape:deskcolor="#d1d1d1"
|
||||
inkscape:document-units="mm"
|
||||
showgrid="false"
|
||||
inkscape:export-bgcolor="#ffffffff" /><defs
|
||||
id="defs1" /><g
|
||||
id="layer1"
|
||||
transform="translate(-84.803844,-143.2075)"
|
||||
inkscape:export-filename="element_logo_white_bg.svg"
|
||||
inkscape:export-xdpi="96"
|
||||
inkscape:export-ydpi="96"><g
|
||||
style="fill:none"
|
||||
id="g1"
|
||||
transform="matrix(0.26458333,0,0,0.26458333,85.841658,144.26667)"><rect
|
||||
style="display:inline;fill:#ffffff;fill-opacity:1;stroke:#ffffff;stroke-width:1.31041;stroke-dasharray:none;stroke-opacity:1"
|
||||
id="rect20"
|
||||
width="155.31451"
|
||||
height="37.932892"
|
||||
x="-3.2672384"
|
||||
y="-3.3479743"
|
||||
rx="3.3718522"
|
||||
ry="3.7915266"
|
||||
transform="translate(-2.1259843e-6)"
|
||||
inkscape:label="rect20"
|
||||
inkscape:export-filename="rect20.svg"
|
||||
inkscape:export-xdpi="96"
|
||||
inkscape:export-ydpi="96" /><path
|
||||
fill-rule="evenodd"
|
||||
clip-rule="evenodd"
|
||||
d="M 16,32 C 24.8366,32 32,24.8366 32,16 32,7.16344 24.8366,0 16,0 7.16344,0 0,7.16344 0,16 0,24.8366 7.16344,32 16,32 Z"
|
||||
fill="#0dbd8b"
|
||||
id="path1" /><path
|
||||
fill-rule="evenodd"
|
||||
clip-rule="evenodd"
|
||||
d="m 13.0756,7.455 c 0,-0.64584 0.5247,-1.1694 1.1719,-1.1694 4.3864,0 7.9423,3.54853 7.9423,7.9259 0,0.6458 -0.5246,1.1694 -1.1718,1.1694 -0.6472,0 -1.1719,-0.5236 -1.1719,-1.1694 0,-3.0857 -2.5066,-5.58711 -5.5986,-5.58711 -0.6472,0 -1.1719,-0.52355 -1.1719,-1.16939 z"
|
||||
fill="#ffffff"
|
||||
id="path2" /><path
|
||||
fill-rule="evenodd"
|
||||
clip-rule="evenodd"
|
||||
d="m 24.5424,13.042 c 0.6472,0 1.1719,0.5235 1.1719,1.1694 0,4.3773 -3.5559,7.9258 -7.9424,7.9258 -0.6472,0 -1.1718,-0.5235 -1.1718,-1.1693 0,-0.6459 0.5246,-1.1694 1.1718,-1.1694 3.0921,0 5.5987,-2.5015 5.5987,-5.5871 0,-0.6459 0.5247,-1.1694 1.1718,-1.1694 z"
|
||||
fill="#ffffff"
|
||||
id="path3" /><path
|
||||
fill-rule="evenodd"
|
||||
clip-rule="evenodd"
|
||||
d="m 18.9446,24.5446 c 0,0.6459 -0.5247,1.1694 -1.1718,1.1694 -4.3865,0 -7.94239,-3.5485 -7.94239,-7.9258 0,-0.6459 0.52469,-1.1694 1.17179,-1.1694 0.6472,0 1.1719,0.5235 1.1719,1.1694 0,3.0856 2.5066,5.587 5.5987,5.587 0.6471,0 1.1718,0.5236 1.1718,1.1694 z"
|
||||
fill="#ffffff"
|
||||
id="path4" /><path
|
||||
fill-rule="evenodd"
|
||||
clip-rule="evenodd"
|
||||
d="m 7.45823,18.9576 c -0.64718,0 -1.17183,-0.5235 -1.17183,-1.1694 0,-4.3773 3.55591,-7.92581 7.9423,-7.92581 0.6472,0 1.1719,0.52351 1.1719,1.16941 0,0.6458 -0.5247,1.1694 -1.1719,1.1694 -3.092,0 -5.59864,2.5014 -5.59864,5.587 0,0.6459 -0.52465,1.1694 -1.17183,1.1694 z"
|
||||
fill="#ffffff"
|
||||
id="path5" /><path
|
||||
d="M 56.2856,18.1428 H 44.9998 c 0.1334,1.181 0.5619,2.1238 1.2858,2.8286 0.7238,0.6857 1.6761,1.0286 2.8571,1.0286 0.7809,0 1.4857,-0.1905 2.1143,-0.5715 0.6286,-0.3809 1.0762,-0.8952 1.3428,-1.5428 h 3.4286 c -0.4571,1.5047 -1.3143,2.7238 -2.5714,3.6571 -1.2381,0.9143 -2.7048,1.3715 -4.4,1.3715 -2.2095,0 -4,-0.7334 -5.3714,-2.2 -1.3524,-1.4667 -2.0286,-3.3239 -2.0286,-5.5715 0,-2.1905 0.6857,-4.0285 2.0571,-5.5143 1.3715,-1.4857 3.1429,-2.22853 5.3143,-2.22853 2.1714,0 3.9238,0.73333 5.2572,2.20003 1.3523,1.4476 2.0285,3.2762 2.0285,5.4857 z m -7.2572,-5.9714 c -1.0667,0 -1.9524,0.3143 -2.6571,0.9429 -0.7048,0.6285 -1.1429,1.4666 -1.3143,2.5142 h 7.8857 c -0.1524,-1.0476 -0.5714,-1.8857 -1.2571,-2.5142 -0.6858,-0.6286 -1.5715,-0.9429 -2.6572,-0.9429 z"
|
||||
fill="#000000"
|
||||
id="path6" /><path
|
||||
d="M 58.6539,20.1428 V 3.14282 h 3.4 V 20.2 c 0,0.7619 0.419,1.1428 1.2571,1.1428 l 0.6,-0.0285 v 3.2285 c -0.3238,0.0572 -0.6667,0.0857 -1.0286,0.0857 -1.4666,0 -2.5428,-0.3714 -3.2285,-1.1142 -0.6667,-0.7429 -1,-1.8667 -1,-3.3715 z"
|
||||
fill="#000000"
|
||||
id="path7" /><path
|
||||
d="M 79.7454,18.1428 H 68.4597 c 0.1333,1.181 0.5619,2.1238 1.2857,2.8286 0.7238,0.6857 1.6762,1.0286 2.8571,1.0286 0.781,0 1.4857,-0.1905 2.1143,-0.5715 0.6286,-0.3809 1.0762,-0.8952 1.3429,-1.5428 h 3.4285 c -0.4571,1.5047 -1.3143,2.7238 -2.5714,3.6571 -1.2381,0.9143 -2.7048,1.3715 -4.4,1.3715 -2.2095,0 -4,-0.7334 -5.3714,-2.2 -1.3524,-1.4667 -2.0286,-3.3239 -2.0286,-5.5715 0,-2.1905 0.6857,-4.0285 2.0571,-5.5143 1.3715,-1.4857 3.1429,-2.22853 5.3143,-2.22853 2.1715,0 3.9238,0.73333 5.2572,2.20003 1.3524,1.4476 2.0285,3.2762 2.0285,5.4857 z m -7.2572,-5.9714 c -1.0666,0 -1.9524,0.3143 -2.6571,0.9429 -0.7048,0.6285 -1.1429,1.4666 -1.3143,2.5142 h 7.8857 c -0.1524,-1.0476 -0.5714,-1.8857 -1.2571,-2.5142 -0.6857,-0.6286 -1.5715,-0.9429 -2.6572,-0.9429 z"
|
||||
fill="#000000"
|
||||
id="path8" /><path
|
||||
d="m 95.0851,16.0571 v 8.5143 h -3.4 v -8.8857 c 0,-2.2476 -0.9333,-3.3714 -2.8,-3.3714 -1.0095,0 -1.819,0.3238 -2.4286,0.9714 -0.5904,0.6476 -0.8857,1.5333 -0.8857,2.6571 v 8.6286 h -3.4 V 9.74282 h 3.1429 v 1.97148 c 0.3619,-0.6667 0.9143,-1.2191 1.6571,-1.6572 0.7429,-0.43809 1.6667,-0.65713 2.7714,-0.65713 2.0572,0 3.5429,0.78093 4.4572,2.34283 1.2571,-1.5619 2.9333,-2.34283 5.0286,-2.34283 1.733,0 3.067,0.54285 4,1.62853 0.933,1.0667 1.4,2.4762 1.4,4.2286 v 9.3143 h -3.4 v -8.8857 c 0,-2.2476 -0.933,-3.3714 -2.8,-3.3714 -1.0286,0 -1.8477,0.3333 -2.4572,1 -0.5905,0.6476 -0.8857,1.5619 -0.8857,2.7428 z"
|
||||
fill="#000000"
|
||||
id="path9" /><path
|
||||
d="m 121.537,18.1428 h -11.286 c 0.133,1.181 0.562,2.1238 1.286,2.8286 0.723,0.6857 1.676,1.0286 2.857,1.0286 0.781,0 1.486,-0.1905 2.114,-0.5715 0.629,-0.3809 1.076,-0.8952 1.343,-1.5428 h 3.429 c -0.458,1.5047 -1.315,2.7238 -2.572,3.6571 -1.238,0.9143 -2.705,1.3715 -4.4,1.3715 -2.209,0 -4,-0.7334 -5.371,-2.2 -1.353,-1.4667 -2.029,-3.3239 -2.029,-5.5715 0,-2.1905 0.686,-4.0285 2.057,-5.5143 1.372,-1.4857 3.143,-2.22853 5.315,-2.22853 2.171,0 3.923,0.73333 5.257,2.20003 1.352,1.4476 2.028,3.2762 2.028,5.4857 z m -7.257,-5.9714 c -1.067,0 -1.953,0.3143 -2.658,0.9429 -0.704,0.6285 -1.142,1.4666 -1.314,2.5142 h 7.886 c -0.153,-1.0476 -0.572,-1.8857 -1.257,-2.5142 -0.686,-0.6286 -1.572,-0.9429 -2.657,-0.9429 z"
|
||||
fill="#000000"
|
||||
id="path10" /><path
|
||||
d="m 127.105,9.74282 v 1.97148 c 0.343,-0.6477 0.905,-1.1905 1.686,-1.6286 0.8,-0.45716 1.762,-0.68573 2.885,-0.68573 1.753,0 3.105,0.53333 4.058,1.60003 0.971,1.0666 1.457,2.4857 1.457,4.2571 v 9.3143 h -3.4 v -8.8857 c 0,-1.0476 -0.248,-1.8667 -0.743,-2.4572 -0.476,-0.6095 -1.21,-0.9142 -2.2,-0.9142 -1.086,0 -1.943,0.3238 -2.572,0.9714 -0.609,0.6476 -0.914,1.5428 -0.914,2.6857 v 8.6 h -3.4 V 9.74282 Z"
|
||||
fill="#000000"
|
||||
id="path11" /><path
|
||||
d="m 147.12,21.5428 v 2.9429 c -0.419,0.1143 -1.009,0.1714 -1.771,0.1714 -2.895,0 -4.343,-1.4571 -4.343,-4.3714 v -7.8286 h -2.257 V 9.74282 h 2.257 V 5.88568 h 3.4 v 3.85714 h 2.772 v 2.71428 h -2.772 v 7.4857 c 0,1.1619 0.552,1.7429 1.657,1.7429 z"
|
||||
fill="#000000"
|
||||
id="path12" /></g></g></svg>
|
||||
|
After Width: | Height: | Size: 7.5 KiB |
@@ -67,7 +67,7 @@ in Synapse can be deactivated.
|
||||
**NOTE**: This has an impact on security and is for testing purposes only!
|
||||
|
||||
To deactivate the certificate validation, the following setting must be added to
|
||||
your [homserver.yaml](../usage/configuration/homeserver_sample_config.md).
|
||||
your [homeserver.yaml](../usage/configuration/homeserver_sample_config.md).
|
||||
|
||||
```yaml
|
||||
use_insecure_ssl_client_just_for_testing_do_not_use: true
|
||||
|
||||
@@ -309,7 +309,62 @@ sudo dnf install libtiff-devel libjpeg-devel libzip-devel freetype-devel \
|
||||
libwebp-devel libxml2-devel libxslt-devel libpq-devel \
|
||||
python3-virtualenv libffi-devel openssl-devel python3-devel \
|
||||
libicu-devel
|
||||
sudo dnf groupinstall "Development Tools"
|
||||
sudo dnf group install "Development Tools"
|
||||
```
|
||||
|
||||
##### Red Hat Enterprise Linux / Rocky Linux
|
||||
|
||||
*Note: The term "RHEL" below refers to both Red Hat Enterprise Linux and Rocky Linux. The distributions are 1:1 binary compatible.*
|
||||
|
||||
It's recommended to use the latest Python versions.
|
||||
|
||||
RHEL 8 in particular ships with Python 3.6 by default which is EOL and therefore no longer supported by Synapse. RHEL 9 ship with Python 3.9 which is still supported by the Python core team as of this writing. However, newer Python versions provide significant performance improvements and they're available in official distributions' repositories. Therefore it's recommended to use them.
|
||||
|
||||
Python 3.11 and 3.12 are available for both RHEL 8 and 9.
|
||||
|
||||
These commands should be run as root user.
|
||||
|
||||
RHEL 8
|
||||
```bash
|
||||
# Enable PowerTools repository
|
||||
dnf config-manager --set-enabled powertools
|
||||
```
|
||||
RHEL 9
|
||||
```bash
|
||||
# Enable CodeReady Linux Builder repository
|
||||
crb enable
|
||||
```
|
||||
|
||||
Install new version of Python. You only need one of these:
|
||||
```bash
|
||||
# Python 3.11
|
||||
dnf install python3.11 python3.11-devel
|
||||
```
|
||||
```bash
|
||||
# Python 3.12
|
||||
dnf install python3.12 python3.12-devel
|
||||
```
|
||||
Finally, install common prerequisites
|
||||
```bash
|
||||
dnf install libicu libicu-devel libpq5 libpq5-devel lz4 pkgconf
|
||||
dnf group install "Development Tools"
|
||||
```
|
||||
###### Using venv module instead of virtualenv command
|
||||
|
||||
It's recommended to use Python venv module directly rather than the virtualenv command.
|
||||
* On RHEL 9, virtualenv is only available on [EPEL](https://docs.fedoraproject.org/en-US/epel/).
|
||||
* On RHEL 8, virtualenv is based on Python 3.6. It does not support creating 3.11/3.12 virtual environments.
|
||||
|
||||
Here's an example of creating Python 3.12 virtual environment and installing Synapse from PyPI.
|
||||
|
||||
```bash
|
||||
mkdir -p ~/synapse
|
||||
# To use Python 3.11, simply use the command "python3.11" instead.
|
||||
python3.12 -m venv ~/synapse/env
|
||||
source ~/synapse/env/bin/activate
|
||||
pip install --upgrade pip
|
||||
pip install --upgrade setuptools
|
||||
pip install matrix-synapse
|
||||
```
|
||||
|
||||
##### macOS
|
||||
|
||||
@@ -119,13 +119,14 @@ stacking them up. You can monitor the currently running background updates with
|
||||
|
||||
# Upgrading to v1.111.0
|
||||
|
||||
## New worker endpoints for authenticated client media
|
||||
## New worker endpoints for authenticated client and federation media
|
||||
|
||||
[Media repository workers](./workers.md#synapseappmedia_repository) handling
|
||||
Media APIs can now handle the following endpoint pattern:
|
||||
Media APIs can now handle the following endpoint patterns:
|
||||
|
||||
```
|
||||
^/_matrix/client/v1/media/.*$
|
||||
^/_matrix/federation/v1/media/.*$
|
||||
```
|
||||
|
||||
Please update your reverse proxy configuration.
|
||||
|
||||
@@ -246,6 +246,7 @@ Example configuration:
|
||||
```yaml
|
||||
presence:
|
||||
enabled: false
|
||||
include_offline_users_on_sync: false
|
||||
```
|
||||
|
||||
`enabled` can also be set to a special value of "untracked" which ignores updates
|
||||
@@ -254,6 +255,10 @@ received via clients and federation, while still accepting updates from the
|
||||
|
||||
*The "untracked" option was added in Synapse 1.96.0.*
|
||||
|
||||
When clients perform an initial or `full_state` sync, presence results for offline users are
|
||||
not included by default. Setting `include_offline_users_on_sync` to `true` will always include
|
||||
offline users in the results. Defaults to false.
|
||||
|
||||
---
|
||||
### `require_auth_for_profile_requests`
|
||||
|
||||
@@ -1863,6 +1868,18 @@ federation_rr_transactions_per_room_per_second: 40
|
||||
## Media Store
|
||||
Config options related to Synapse's media store.
|
||||
|
||||
---
|
||||
### `enable_authenticated_media`
|
||||
|
||||
When set to true, all subsequent media uploads will be marked as authenticated, and will not be available over legacy
|
||||
unauthenticated media endpoints (`/_matrix/media/(r0|v3|v1)/download` and `/_matrix/media/(r0|v3|v1)/thumbnail`) - requests for authenticated media over these endpoints will result in a 404. All media, including authenticated media, will be available over the authenticated media endpoints `_matrix/client/v1/media/download` and `_matrix/client/v1/media/thumbnail`. Media uploaded prior to setting this option to true will still be available over the legacy endpoints. Note if the setting is switched to false
|
||||
after enabling, media marked as authenticated will be available over legacy endpoints. Defaults to false, but
|
||||
this will change to true in a future Synapse release.
|
||||
|
||||
Example configuration:
|
||||
```yaml
|
||||
enable_authenticated_media: true
|
||||
```
|
||||
---
|
||||
### `enable_media_repo`
|
||||
|
||||
@@ -2369,7 +2386,7 @@ enable_registration_without_verification: true
|
||||
---
|
||||
### `registrations_require_3pid`
|
||||
|
||||
If this is set, users must provide all of the specified types of 3PID when registering an account.
|
||||
If this is set, users must provide all of the specified types of [3PID](https://spec.matrix.org/latest/appendices/#3pid-types) when registering an account.
|
||||
|
||||
Note that [`enable_registration`](#enable_registration) must also be set to allow account registration.
|
||||
|
||||
@@ -2394,6 +2411,9 @@ disable_msisdn_registration: true
|
||||
|
||||
Mandate that users are only allowed to associate certain formats of
|
||||
3PIDs with accounts on this server, as specified by the `medium` and `pattern` sub-options.
|
||||
`pattern` is a [Perl-like regular expression](https://docs.python.org/3/library/re.html#module-re).
|
||||
|
||||
More information about 3PIDs, allowed `medium` types and their `address` syntax can be found [in the Matrix spec](https://spec.matrix.org/latest/appendices/#3pid-types).
|
||||
|
||||
Example configuration:
|
||||
```yaml
|
||||
@@ -2403,7 +2423,7 @@ allowed_local_3pids:
|
||||
- medium: email
|
||||
pattern: '^[^@]+@vector\.im$'
|
||||
- medium: msisdn
|
||||
pattern: '\+44'
|
||||
pattern: '^44\d{10}$'
|
||||
```
|
||||
---
|
||||
### `enable_3pid_lookup`
|
||||
@@ -4134,6 +4154,38 @@ default_power_level_content_override:
|
||||
trusted_private_chat: null
|
||||
public_chat: null
|
||||
```
|
||||
|
||||
The default power levels for each preset are:
|
||||
```yaml
|
||||
"m.room.name": 50
|
||||
"m.room.power_levels": 100
|
||||
"m.room.history_visibility": 100
|
||||
"m.room.canonical_alias": 50
|
||||
"m.room.avatar": 50
|
||||
"m.room.tombstone": 100
|
||||
"m.room.server_acl": 100
|
||||
"m.room.encryption": 100
|
||||
```
|
||||
|
||||
So a complete example where the default power-levels for a preset are maintained
|
||||
but the power level for a new key is set is:
|
||||
```yaml
|
||||
default_power_level_content_override:
|
||||
private_chat:
|
||||
events:
|
||||
"com.example.foo": 0
|
||||
"m.room.name": 50
|
||||
"m.room.power_levels": 100
|
||||
"m.room.history_visibility": 100
|
||||
"m.room.canonical_alias": 50
|
||||
"m.room.avatar": 50
|
||||
"m.room.tombstone": 100
|
||||
"m.room.server_acl": 100
|
||||
"m.room.encryption": 100
|
||||
trusted_private_chat: null
|
||||
public_chat: null
|
||||
```
|
||||
|
||||
---
|
||||
### `forget_rooms_on_leave`
|
||||
|
||||
@@ -4633,7 +4685,9 @@ This setting has the following sub-options:
|
||||
* `only_for_direct_messages`: Whether invites should be automatically accepted for all room types, or only
|
||||
for direct messages. Defaults to false.
|
||||
* `only_from_local_users`: Whether to only automatically accept invites from users on this homeserver. Defaults to false.
|
||||
* `worker_to_run_on`: Which worker to run this module on. This must match the "worker_name".
|
||||
* `worker_to_run_on`: Which worker to run this module on. This must match
|
||||
the "worker_name". If not set or `null`, invites will be accepted on the
|
||||
main process.
|
||||
|
||||
NOTE: Care should be taken not to enable this setting if the `synapse_auto_accept_invite` module is enabled and installed.
|
||||
The two modules will compete to perform the same task and may result in undesired behaviour. For example, multiple join
|
||||
|
||||
@@ -740,6 +740,7 @@ Handles the media repository. It can handle all endpoints starting with:
|
||||
|
||||
/_matrix/media/
|
||||
/_matrix/client/v1/media/
|
||||
/_matrix/federation/v1/media/
|
||||
|
||||
... and the following regular expressions matching media-specific administration APIs:
|
||||
|
||||
|
||||
867
poetry.lock
generated
867
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -97,7 +97,7 @@ module-name = "synapse.synapse_rust"
|
||||
|
||||
[tool.poetry]
|
||||
name = "matrix-synapse"
|
||||
version = "1.111.0rc1"
|
||||
version = "1.113.0"
|
||||
description = "Homeserver for the Matrix decentralised comms protocol"
|
||||
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
|
||||
license = "AGPL-3.0-or-later"
|
||||
@@ -201,8 +201,8 @@ netaddr = ">=0.7.18"
|
||||
# add a lower bound to the Jinja2 dependency.
|
||||
Jinja2 = ">=3.0"
|
||||
bleach = ">=1.4.3"
|
||||
# We use `Self`, which were added in `typing-extensions` 4.0.
|
||||
typing-extensions = ">=4.0"
|
||||
# We use `assert_never`, which were added in `typing-extensions` 4.1.
|
||||
typing-extensions = ">=4.1"
|
||||
# We enforce that we have a `cryptography` version that bundles an `openssl`
|
||||
# with the latest security patches.
|
||||
cryptography = ">=3.4.7"
|
||||
@@ -322,7 +322,7 @@ all = [
|
||||
# This helps prevents merge conflicts when running a batch of dependabot updates.
|
||||
isort = ">=5.10.1"
|
||||
black = ">=22.7.0"
|
||||
ruff = "0.5.0"
|
||||
ruff = "0.5.5"
|
||||
# Type checking only works with the pydantic.v1 compat module from pydantic v2
|
||||
pydantic = "^2"
|
||||
|
||||
|
||||
@@ -43,7 +43,7 @@ import argparse
|
||||
import base64
|
||||
import json
|
||||
import sys
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
from typing import Any, Dict, Mapping, Optional, Tuple, Union
|
||||
from urllib import parse as urlparse
|
||||
|
||||
import requests
|
||||
@@ -75,7 +75,7 @@ def encode_canonical_json(value: object) -> bytes:
|
||||
value,
|
||||
# Encode code-points outside of ASCII as UTF-8 rather than \u escapes
|
||||
ensure_ascii=False,
|
||||
# Remove unecessary white space.
|
||||
# Remove unnecessary white space.
|
||||
separators=(",", ":"),
|
||||
# Sort the keys of dictionaries.
|
||||
sort_keys=True,
|
||||
@@ -298,12 +298,23 @@ class MatrixConnectionAdapter(HTTPAdapter):
|
||||
|
||||
return super().send(request, *args, **kwargs)
|
||||
|
||||
def get_connection(
|
||||
self, url: str, proxies: Optional[Dict[str, str]] = None
|
||||
def get_connection_with_tls_context(
|
||||
self,
|
||||
request: PreparedRequest,
|
||||
verify: Optional[Union[bool, str]],
|
||||
proxies: Optional[Mapping[str, str]] = None,
|
||||
cert: Optional[Union[Tuple[str, str], str]] = None,
|
||||
) -> HTTPConnectionPool:
|
||||
# overrides the get_connection() method in the base class
|
||||
parsed = urlparse.urlsplit(url)
|
||||
(host, port, ssl_server_name) = self._lookup(parsed.netloc)
|
||||
# overrides the get_connection_with_tls_context() method in the base class
|
||||
parsed = urlparse.urlsplit(request.url)
|
||||
|
||||
# Extract the server name from the request URL, and ensure it's a str.
|
||||
hostname = parsed.netloc
|
||||
if isinstance(hostname, bytes):
|
||||
hostname = hostname.decode("utf-8")
|
||||
assert isinstance(hostname, str)
|
||||
|
||||
(host, port, ssl_server_name) = self._lookup(hostname)
|
||||
print(
|
||||
f"Connecting to {host}:{port} with SNI {ssl_server_name}", file=sys.stderr
|
||||
)
|
||||
|
||||
@@ -324,6 +324,11 @@ def tag(gh_token: Optional[str]) -> None:
|
||||
def _tag(gh_token: Optional[str]) -> None:
|
||||
"""Tags the release and generates a draft GitHub release"""
|
||||
|
||||
if gh_token:
|
||||
# Test that the GH Token is valid before continuing.
|
||||
gh = Github(gh_token)
|
||||
gh.get_user()
|
||||
|
||||
# Make sure we're in a git repo.
|
||||
repo = get_repo_and_check_clean_checkout()
|
||||
|
||||
@@ -418,6 +423,11 @@ def publish(gh_token: str) -> None:
|
||||
def _publish(gh_token: str) -> None:
|
||||
"""Publish release on GitHub."""
|
||||
|
||||
if gh_token:
|
||||
# Test that the GH Token is valid before continuing.
|
||||
gh = Github(gh_token)
|
||||
gh.get_user()
|
||||
|
||||
# Make sure we're in a git repo.
|
||||
get_repo_and_check_clean_checkout()
|
||||
|
||||
@@ -460,6 +470,11 @@ def upload(gh_token: Optional[str]) -> None:
|
||||
def _upload(gh_token: Optional[str]) -> None:
|
||||
"""Upload release to pypi."""
|
||||
|
||||
if gh_token:
|
||||
# Test that the GH Token is valid before continuing.
|
||||
gh = Github(gh_token)
|
||||
gh.get_user()
|
||||
|
||||
current_version = get_package_version()
|
||||
tag_name = f"v{current_version}"
|
||||
|
||||
@@ -555,6 +570,11 @@ def wait_for_actions(gh_token: Optional[str]) -> None:
|
||||
|
||||
|
||||
def _wait_for_actions(gh_token: Optional[str]) -> None:
|
||||
if gh_token:
|
||||
# Test that the GH Token is valid before continuing.
|
||||
gh = Github(gh_token)
|
||||
gh.get_user()
|
||||
|
||||
# Find out the version and tag name.
|
||||
current_version = get_package_version()
|
||||
tag_name = f"v{current_version}"
|
||||
@@ -711,6 +731,11 @@ Ask the designated people to do the blog and tweets."""
|
||||
@cli.command()
|
||||
@click.option("--gh-token", envvar=["GH_TOKEN", "GITHUB_TOKEN"], required=True)
|
||||
def full(gh_token: str) -> None:
|
||||
if gh_token:
|
||||
# Test that the GH Token is valid before continuing.
|
||||
gh = Github(gh_token)
|
||||
gh.get_user()
|
||||
|
||||
click.echo("1. If this is a security release, read the security wiki page.")
|
||||
click.echo("2. Check for any release blockers before proceeding.")
|
||||
click.echo(" https://github.com/element-hq/synapse/labels/X-Release-Blocker")
|
||||
|
||||
@@ -119,18 +119,19 @@ BOOLEAN_COLUMNS = {
|
||||
"e2e_room_keys": ["is_verified"],
|
||||
"event_edges": ["is_state"],
|
||||
"events": ["processed", "outlier", "contains_url"],
|
||||
"local_media_repository": ["safe_from_quarantine"],
|
||||
"local_media_repository": ["safe_from_quarantine", "authenticated"],
|
||||
"per_user_experimental_features": ["enabled"],
|
||||
"presence_list": ["accepted"],
|
||||
"presence_stream": ["currently_active"],
|
||||
"public_room_list_stream": ["visibility"],
|
||||
"pushers": ["enabled"],
|
||||
"redactions": ["have_censored"],
|
||||
"remote_media_cache": ["authenticated"],
|
||||
"room_stats_state": ["is_federatable"],
|
||||
"rooms": ["is_public", "has_auth_chain_index"],
|
||||
"users": ["shadow_banned", "approved", "locked", "suspended"],
|
||||
"un_partial_stated_event_stream": ["rejection_status_changed"],
|
||||
"users_who_share_rooms": ["share_private"],
|
||||
"per_user_experimental_features": ["enabled"],
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -50,7 +50,7 @@ class Membership:
|
||||
KNOCK: Final = "knock"
|
||||
LEAVE: Final = "leave"
|
||||
BAN: Final = "ban"
|
||||
LIST: Final = {INVITE, JOIN, KNOCK, LEAVE, BAN}
|
||||
LIST: Final = frozenset((INVITE, JOIN, KNOCK, LEAVE, BAN))
|
||||
|
||||
|
||||
class PresenceState:
|
||||
@@ -225,6 +225,11 @@ class EventContentFields:
|
||||
# This is deprecated in MSC2175.
|
||||
ROOM_CREATOR: Final = "creator"
|
||||
|
||||
# The version of the room for `m.room.create` events.
|
||||
ROOM_VERSION: Final = "room_version"
|
||||
|
||||
ROOM_NAME: Final = "name"
|
||||
|
||||
# Used in m.room.guest_access events.
|
||||
GUEST_ACCESS: Final = "guest_access"
|
||||
|
||||
@@ -237,6 +242,9 @@ class EventContentFields:
|
||||
# an unspecced field added to to-device messages to identify them uniquely-ish
|
||||
TO_DEVICE_MSGID: Final = "org.matrix.msgid"
|
||||
|
||||
# `m.room.encryption`` algorithm field
|
||||
ENCRYPTION_ALGORITHM: Final = "algorithm"
|
||||
|
||||
|
||||
class EventUnsignedContentFields:
|
||||
"""Fields found inside the 'unsigned' data on events"""
|
||||
|
||||
@@ -128,6 +128,10 @@ class Codes(str, Enum):
|
||||
# MSC2677
|
||||
DUPLICATE_ANNOTATION = "M_DUPLICATE_ANNOTATION"
|
||||
|
||||
# MSC3575 we are telling the client they need to expire their sliding sync
|
||||
# connection.
|
||||
UNKNOWN_POS = "M_UNKNOWN_POS"
|
||||
|
||||
|
||||
class CodeMessageException(RuntimeError):
|
||||
"""An exception with integer code, a message string attributes and optional headers.
|
||||
@@ -847,3 +851,17 @@ class PartialStateConflictError(SynapseError):
|
||||
msg=PartialStateConflictError.message(),
|
||||
errcode=Codes.UNKNOWN,
|
||||
)
|
||||
|
||||
|
||||
class SlidingSyncUnknownPosition(SynapseError):
|
||||
"""An error that Synapse can return to signal to the client to expire their
|
||||
sliding sync connection (i.e. send a new request without a `?since=`
|
||||
param).
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
msg="Unknown position",
|
||||
errcode=Codes.UNKNOWN_POS,
|
||||
)
|
||||
|
||||
@@ -236,9 +236,8 @@ class Ratelimiter:
|
||||
requester: The requester that is doing the action, if any.
|
||||
key: An arbitrary key used to classify an action. Defaults to the
|
||||
requester's user ID.
|
||||
n_actions: The number of times the user wants to do this action. If the user
|
||||
cannot do all of the actions, the user's action count is not incremented
|
||||
at all.
|
||||
n_actions: The number of times the user performed the action. May be negative
|
||||
to "refund" the rate limit.
|
||||
_time_now_s: The current time. Optional, defaults to the current time according
|
||||
to self.clock. Only used by tests.
|
||||
"""
|
||||
|
||||
@@ -206,6 +206,21 @@ class GenericWorkerServer(HomeServer):
|
||||
"/_synapse/admin": admin_resource,
|
||||
}
|
||||
)
|
||||
|
||||
if "federation" not in res.names:
|
||||
# Only load the federation media resource separately if federation
|
||||
# resource is not specified since federation resource includes media
|
||||
# resource.
|
||||
resources[FEDERATION_PREFIX] = TransportLayerServer(
|
||||
self, servlet_groups=["media"]
|
||||
)
|
||||
if "client" not in res.names:
|
||||
# Only load the client media resource separately if client
|
||||
# resource is not specified since client resource includes media
|
||||
# resource.
|
||||
resources[CLIENT_API_PREFIX] = ClientRestResource(
|
||||
self, servlet_groups=["media"]
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
"A 'media' listener is configured but the media"
|
||||
|
||||
@@ -101,6 +101,12 @@ class SynapseHomeServer(HomeServer):
|
||||
# Skip loading openid resource if federation is defined
|
||||
# since federation resource will include openid
|
||||
continue
|
||||
if name == "media" and (
|
||||
"federation" in res.names or "client" in res.names
|
||||
):
|
||||
# Skip loading media resource if federation or client are defined
|
||||
# since federation & client resources will include media
|
||||
continue
|
||||
if name == "health":
|
||||
# Skip loading, health resource is always included
|
||||
continue
|
||||
@@ -217,7 +223,7 @@ class SynapseHomeServer(HomeServer):
|
||||
)
|
||||
|
||||
if name in ["media", "federation", "client"]:
|
||||
if self.config.server.enable_media_repo:
|
||||
if self.config.media.can_load_media_repo:
|
||||
media_repo = self.get_media_repository_resource()
|
||||
resources.update(
|
||||
{
|
||||
@@ -231,6 +237,14 @@ class SynapseHomeServer(HomeServer):
|
||||
"'media' resource conflicts with enable_media_repo=False"
|
||||
)
|
||||
|
||||
if name == "media":
|
||||
resources[FEDERATION_PREFIX] = TransportLayerServer(
|
||||
self, servlet_groups=["media"]
|
||||
)
|
||||
resources[CLIENT_API_PREFIX] = ClientRestResource(
|
||||
self, servlet_groups=["media"]
|
||||
)
|
||||
|
||||
if name in ["keys", "federation"]:
|
||||
resources[SERVER_KEY_PREFIX] = KeyResource(self)
|
||||
|
||||
|
||||
@@ -126,7 +126,7 @@ class ContentRepositoryConfig(Config):
|
||||
# Only enable the media repo if either the media repo is enabled or the
|
||||
# current worker app is the media repo.
|
||||
if (
|
||||
self.root.server.enable_media_repo is False
|
||||
config.get("enable_media_repo", True) is False
|
||||
and config.get("worker_app") != "synapse.app.media_repository"
|
||||
):
|
||||
self.can_load_media_repo = False
|
||||
@@ -272,6 +272,10 @@ class ContentRepositoryConfig(Config):
|
||||
remote_media_lifetime
|
||||
)
|
||||
|
||||
self.enable_authenticated_media = config.get(
|
||||
"enable_authenticated_media", False
|
||||
)
|
||||
|
||||
def generate_config_section(self, data_dir_path: str, **kwargs: Any) -> str:
|
||||
assert data_dir_path is not None
|
||||
media_store = os.path.join(data_dir_path, "media_store")
|
||||
|
||||
@@ -384,6 +384,11 @@ class ServerConfig(Config):
|
||||
# Whether to internally track presence, requires that presence is enabled,
|
||||
self.track_presence = self.presence_enabled and presence_enabled != "untracked"
|
||||
|
||||
# Determines if presence results for offline users are included on initial/full sync
|
||||
self.presence_include_offline_users_on_sync = presence_config.get(
|
||||
"include_offline_users_on_sync", False
|
||||
)
|
||||
|
||||
# Custom presence router module
|
||||
# This is the legacy way of configuring it (the config should now be put in the modules section)
|
||||
self.presence_router_module_class = None
|
||||
@@ -395,12 +400,6 @@ class ServerConfig(Config):
|
||||
self.presence_router_config,
|
||||
) = load_module(presence_router_config, ("presence", "presence_router"))
|
||||
|
||||
# whether to enable the media repository endpoints. This should be set
|
||||
# to false if the media repository is running as a separate endpoint;
|
||||
# doing so ensures that we will not run cache cleanup jobs on the
|
||||
# master, potentially causing inconsistency.
|
||||
self.enable_media_repo = config.get("enable_media_repo", True)
|
||||
|
||||
# Whether to require authentication to retrieve profile data (avatars,
|
||||
# display names) of other users through the client API.
|
||||
self.require_auth_for_profile_requests = config.get(
|
||||
|
||||
@@ -589,7 +589,7 @@ class BaseV2KeyFetcher(KeyFetcher):
|
||||
% (server_name,)
|
||||
)
|
||||
|
||||
for key_id, key_data in response_json["old_verify_keys"].items():
|
||||
for key_id, key_data in response_json.get("old_verify_keys", {}).items():
|
||||
if is_signing_algorithm_supported(key_id):
|
||||
key_base64 = key_data["key"]
|
||||
key_bytes = decode_base64(key_base64)
|
||||
|
||||
@@ -554,3 +554,22 @@ def relation_from_event(event: EventBase) -> Optional[_EventRelation]:
|
||||
aggregation_key = None
|
||||
|
||||
return _EventRelation(parent_id, rel_type, aggregation_key)
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class StrippedStateEvent:
|
||||
"""
|
||||
A stripped down state event. Usually used for remote invite/knocks so the user can
|
||||
make an informed decision on whether they want to join.
|
||||
|
||||
Attributes:
|
||||
type: Event `type`
|
||||
state_key: Event `state_key`
|
||||
sender: Event `sender`
|
||||
content: Event `content`
|
||||
"""
|
||||
|
||||
type: str
|
||||
state_key: str
|
||||
sender: str
|
||||
content: Dict[str, Any]
|
||||
|
||||
@@ -49,7 +49,7 @@ from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.api.room_versions import RoomVersion
|
||||
from synapse.types import JsonDict, Requester
|
||||
|
||||
from . import EventBase, make_event_from_dict
|
||||
from . import EventBase, StrippedStateEvent, make_event_from_dict
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.handlers.relations import BundledAggregations
|
||||
@@ -854,3 +854,30 @@ def strip_event(event: EventBase) -> JsonDict:
|
||||
"content": event.content,
|
||||
"sender": event.sender,
|
||||
}
|
||||
|
||||
|
||||
def parse_stripped_state_event(raw_stripped_event: Any) -> Optional[StrippedStateEvent]:
|
||||
"""
|
||||
Given a raw value from an event's `unsigned` field, attempt to parse it into a
|
||||
`StrippedStateEvent`.
|
||||
"""
|
||||
if isinstance(raw_stripped_event, dict):
|
||||
# All of these fields are required
|
||||
type = raw_stripped_event.get("type")
|
||||
state_key = raw_stripped_event.get("state_key")
|
||||
sender = raw_stripped_event.get("sender")
|
||||
content = raw_stripped_event.get("content")
|
||||
if (
|
||||
isinstance(type, str)
|
||||
and isinstance(state_key, str)
|
||||
and isinstance(sender, str)
|
||||
and isinstance(content, dict)
|
||||
):
|
||||
return StrippedStateEvent(
|
||||
type=type,
|
||||
state_key=state_key,
|
||||
sender=sender,
|
||||
content=content,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
@@ -271,6 +271,10 @@ SERVLET_GROUPS: Dict[str, Iterable[Type[BaseFederationServlet]]] = {
|
||||
"federation": FEDERATION_SERVLET_CLASSES,
|
||||
"room_list": (PublicRoomList,),
|
||||
"openid": (OpenIdUserInfo,),
|
||||
"media": (
|
||||
FederationMediaDownloadServlet,
|
||||
FederationMediaThumbnailServlet,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
@@ -321,7 +325,7 @@ def register_servlets(
|
||||
servletclass == FederationMediaDownloadServlet
|
||||
or servletclass == FederationMediaThumbnailServlet
|
||||
):
|
||||
if not hs.config.server.enable_media_repo:
|
||||
if not hs.config.media.can_load_media_repo:
|
||||
continue
|
||||
|
||||
servletclass(
|
||||
|
||||
@@ -912,6 +912,4 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
|
||||
FederationV1SendKnockServlet,
|
||||
FederationMakeKnockServlet,
|
||||
FederationAccountStatusServlet,
|
||||
FederationMediaDownloadServlet,
|
||||
FederationMediaThumbnailServlet,
|
||||
)
|
||||
|
||||
@@ -197,8 +197,14 @@ class AdminHandler:
|
||||
# events that we have and then filtering, this isn't the most
|
||||
# efficient method perhaps but it does guarantee we get everything.
|
||||
while True:
|
||||
events, _ = await self._store.paginate_room_events(
|
||||
room_id, from_key, to_key, limit=100, direction=Direction.FORWARDS
|
||||
events, _ = (
|
||||
await self._store.paginate_room_events_by_topological_ordering(
|
||||
room_id=room_id,
|
||||
from_key=from_key,
|
||||
to_key=to_key,
|
||||
limit=100,
|
||||
direction=Direction.FORWARDS,
|
||||
)
|
||||
)
|
||||
if not events:
|
||||
break
|
||||
|
||||
@@ -20,10 +20,20 @@
|
||||
#
|
||||
#
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping, Optional, Set, Tuple
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
AbstractSet,
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Set,
|
||||
Tuple,
|
||||
)
|
||||
|
||||
from synapse.api import errors
|
||||
from synapse.api.constants import EduTypes, EventTypes
|
||||
from synapse.api.constants import EduTypes, EventTypes, Membership
|
||||
from synapse.api.errors import (
|
||||
Codes,
|
||||
FederationDeniedError,
|
||||
@@ -38,7 +48,9 @@ from synapse.metrics.background_process_metrics import (
|
||||
wrap_as_background_process,
|
||||
)
|
||||
from synapse.storage.databases.main.client_ips import DeviceLastConnectionInfo
|
||||
from synapse.storage.databases.main.state_deltas import StateDelta
|
||||
from synapse.types import (
|
||||
DeviceListUpdates,
|
||||
JsonDict,
|
||||
JsonMapping,
|
||||
ScheduledTask,
|
||||
@@ -214,138 +226,210 @@ class DeviceWorkerHandler:
|
||||
@cancellable
|
||||
async def get_user_ids_changed(
|
||||
self, user_id: str, from_token: StreamToken
|
||||
) -> JsonDict:
|
||||
) -> DeviceListUpdates:
|
||||
"""Get list of users that have had the devices updated, or have newly
|
||||
joined a room, that `user_id` may be interested in.
|
||||
"""
|
||||
|
||||
set_tag("user_id", user_id)
|
||||
set_tag("from_token", str(from_token))
|
||||
now_room_key = self.store.get_room_max_token()
|
||||
|
||||
room_ids = await self.store.get_rooms_for_user(user_id)
|
||||
now_token = self._event_sources.get_current_token()
|
||||
|
||||
changed = await self.get_device_changes_in_shared_rooms(
|
||||
user_id, room_ids, from_token
|
||||
# We need to work out all the different membership changes for the user
|
||||
# and user they share a room with, to pass to
|
||||
# `generate_sync_entry_for_device_list`. See its docstring for details
|
||||
# on the data required.
|
||||
|
||||
joined_room_ids = await self.store.get_rooms_for_user(user_id)
|
||||
|
||||
# Get the set of rooms that the user has joined/left
|
||||
membership_changes = (
|
||||
await self.store.get_current_state_delta_membership_changes_for_user(
|
||||
user_id, from_key=from_token.room_key, to_key=now_token.room_key
|
||||
)
|
||||
)
|
||||
|
||||
# Then work out if any users have since joined
|
||||
rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
|
||||
# Check for newly joined or left rooms. We need to make sure that we add
|
||||
# to newly joined in the case membership goes from join -> leave -> join
|
||||
# again.
|
||||
newly_joined_rooms: Set[str] = set()
|
||||
newly_left_rooms: Set[str] = set()
|
||||
for change in membership_changes:
|
||||
# We check for changes in "joinedness", i.e. if the membership has
|
||||
# changed to or from JOIN.
|
||||
if change.membership == Membership.JOIN:
|
||||
if change.prev_membership != Membership.JOIN:
|
||||
newly_joined_rooms.add(change.room_id)
|
||||
newly_left_rooms.discard(change.room_id)
|
||||
elif change.prev_membership == Membership.JOIN:
|
||||
newly_joined_rooms.discard(change.room_id)
|
||||
newly_left_rooms.add(change.room_id)
|
||||
|
||||
member_events = await self.store.get_membership_changes_for_user(
|
||||
user_id, from_token.room_key, now_room_key
|
||||
# We now work out if any other users have since joined or left the rooms
|
||||
# the user is currently in.
|
||||
|
||||
# List of membership changes per room
|
||||
room_to_deltas: Dict[str, List[StateDelta]] = {}
|
||||
# The set of event IDs of membership events (so we can fetch their
|
||||
# associated membership).
|
||||
memberships_to_fetch: Set[str] = set()
|
||||
|
||||
# TODO: Only pull out membership events?
|
||||
state_changes = await self.store.get_current_state_deltas_for_rooms(
|
||||
joined_room_ids, from_token=from_token.room_key, to_token=now_token.room_key
|
||||
)
|
||||
rooms_changed.update(event.room_id for event in member_events)
|
||||
|
||||
stream_ordering = from_token.room_key.stream
|
||||
|
||||
possibly_changed = set(changed)
|
||||
possibly_left = set()
|
||||
for room_id in rooms_changed:
|
||||
# Check if the forward extremities have changed. If not then we know
|
||||
# the current state won't have changed, and so we can skip this room.
|
||||
try:
|
||||
if not await self.store.have_room_forward_extremities_changed_since(
|
||||
room_id, stream_ordering
|
||||
):
|
||||
continue
|
||||
except errors.StoreError:
|
||||
pass
|
||||
|
||||
current_state_ids = await self._state_storage.get_current_state_ids(
|
||||
room_id, await_full_state=False
|
||||
)
|
||||
|
||||
# The user may have left the room
|
||||
# TODO: Check if they actually did or if we were just invited.
|
||||
if room_id not in room_ids:
|
||||
for etype, state_key in current_state_ids.keys():
|
||||
if etype != EventTypes.Member:
|
||||
continue
|
||||
possibly_left.add(state_key)
|
||||
for delta in state_changes:
|
||||
if delta.event_type != EventTypes.Member:
|
||||
continue
|
||||
|
||||
# Fetch the current state at the time.
|
||||
try:
|
||||
event_ids = await self.store.get_forward_extremities_for_room_at_stream_ordering(
|
||||
room_id, stream_ordering=stream_ordering
|
||||
)
|
||||
except errors.StoreError:
|
||||
# we have purged the stream_ordering index since the stream
|
||||
# ordering: treat it the same as a new room
|
||||
event_ids = []
|
||||
room_to_deltas.setdefault(delta.room_id, []).append(delta)
|
||||
if delta.event_id:
|
||||
memberships_to_fetch.add(delta.event_id)
|
||||
if delta.prev_event_id:
|
||||
memberships_to_fetch.add(delta.prev_event_id)
|
||||
|
||||
# special-case for an empty prev state: include all members
|
||||
# in the changed list
|
||||
if not event_ids:
|
||||
log_kv(
|
||||
{"event": "encountered empty previous state", "room_id": room_id}
|
||||
)
|
||||
for etype, state_key in current_state_ids.keys():
|
||||
if etype != EventTypes.Member:
|
||||
continue
|
||||
possibly_changed.add(state_key)
|
||||
continue
|
||||
# Fetch all the memberships for the membership events
|
||||
event_id_to_memberships = await self.store.get_membership_from_event_ids(
|
||||
memberships_to_fetch
|
||||
)
|
||||
|
||||
current_member_id = current_state_ids.get((EventTypes.Member, user_id))
|
||||
if not current_member_id:
|
||||
continue
|
||||
joined_invited_knocked = (
|
||||
Membership.JOIN,
|
||||
Membership.INVITE,
|
||||
Membership.KNOCK,
|
||||
)
|
||||
|
||||
# mapping from event_id -> state_dict
|
||||
prev_state_ids = await self._state_storage.get_state_ids_for_events(
|
||||
event_ids,
|
||||
await_full_state=False,
|
||||
)
|
||||
# We now want to find any user that have newly joined/invited/knocked,
|
||||
# or newly left, similarly to above.
|
||||
newly_joined_or_invited_or_knocked_users: Set[str] = set()
|
||||
newly_left_users: Set[str] = set()
|
||||
for _, deltas in room_to_deltas.items():
|
||||
for delta in deltas:
|
||||
# Get the prev/new memberships for the delta
|
||||
new_membership = None
|
||||
prev_membership = None
|
||||
if delta.event_id:
|
||||
m = event_id_to_memberships.get(delta.event_id)
|
||||
if m is not None:
|
||||
new_membership = m.membership
|
||||
if delta.prev_event_id:
|
||||
m = event_id_to_memberships.get(delta.prev_event_id)
|
||||
if m is not None:
|
||||
prev_membership = m.membership
|
||||
|
||||
# Check if we've joined the room? If so we just blindly add all the users to
|
||||
# the "possibly changed" users.
|
||||
for state_dict in prev_state_ids.values():
|
||||
member_event = state_dict.get((EventTypes.Member, user_id), None)
|
||||
if not member_event or member_event != current_member_id:
|
||||
for etype, state_key in current_state_ids.keys():
|
||||
if etype != EventTypes.Member:
|
||||
continue
|
||||
possibly_changed.add(state_key)
|
||||
break
|
||||
# Check if a user has newly joined/invited/knocked, or left.
|
||||
if new_membership in joined_invited_knocked:
|
||||
if prev_membership not in joined_invited_knocked:
|
||||
newly_joined_or_invited_or_knocked_users.add(delta.state_key)
|
||||
newly_left_users.discard(delta.state_key)
|
||||
elif prev_membership in joined_invited_knocked:
|
||||
newly_joined_or_invited_or_knocked_users.discard(delta.state_key)
|
||||
newly_left_users.add(delta.state_key)
|
||||
|
||||
# If there has been any change in membership, include them in the
|
||||
# possibly changed list. We'll check if they are joined below,
|
||||
# and we're not toooo worried about spuriously adding users.
|
||||
for key, event_id in current_state_ids.items():
|
||||
etype, state_key = key
|
||||
if etype != EventTypes.Member:
|
||||
continue
|
||||
# Now we actually calculate the device list entry with the information
|
||||
# calculated above.
|
||||
device_list_updates = await self.generate_sync_entry_for_device_list(
|
||||
user_id=user_id,
|
||||
since_token=from_token,
|
||||
now_token=now_token,
|
||||
joined_room_ids=joined_room_ids,
|
||||
newly_joined_rooms=newly_joined_rooms,
|
||||
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
|
||||
newly_left_rooms=newly_left_rooms,
|
||||
newly_left_users=newly_left_users,
|
||||
)
|
||||
|
||||
# check if this member has changed since any of the extremities
|
||||
# at the stream_ordering, and add them to the list if so.
|
||||
for state_dict in prev_state_ids.values():
|
||||
prev_event_id = state_dict.get(key, None)
|
||||
if not prev_event_id or prev_event_id != event_id:
|
||||
if state_key != user_id:
|
||||
possibly_changed.add(state_key)
|
||||
break
|
||||
log_kv(
|
||||
{
|
||||
"changed": device_list_updates.changed,
|
||||
"left": device_list_updates.left,
|
||||
}
|
||||
)
|
||||
|
||||
if possibly_changed or possibly_left:
|
||||
possibly_joined = possibly_changed
|
||||
possibly_left = possibly_changed | possibly_left
|
||||
return device_list_updates
|
||||
|
||||
# Double check if we still share rooms with the given user.
|
||||
users_rooms = await self.store.get_rooms_for_users(possibly_left)
|
||||
for changed_user_id, entries in users_rooms.items():
|
||||
if any(rid in room_ids for rid in entries):
|
||||
possibly_left.discard(changed_user_id)
|
||||
else:
|
||||
possibly_joined.discard(changed_user_id)
|
||||
@measure_func("_generate_sync_entry_for_device_list")
|
||||
async def generate_sync_entry_for_device_list(
|
||||
self,
|
||||
user_id: str,
|
||||
since_token: StreamToken,
|
||||
now_token: StreamToken,
|
||||
joined_room_ids: AbstractSet[str],
|
||||
newly_joined_rooms: AbstractSet[str],
|
||||
newly_joined_or_invited_or_knocked_users: AbstractSet[str],
|
||||
newly_left_rooms: AbstractSet[str],
|
||||
newly_left_users: AbstractSet[str],
|
||||
) -> DeviceListUpdates:
|
||||
"""Generate the DeviceListUpdates section of sync
|
||||
|
||||
else:
|
||||
possibly_joined = set()
|
||||
possibly_left = set()
|
||||
Args:
|
||||
sync_result_builder
|
||||
newly_joined_rooms: Set of rooms user has joined since previous sync
|
||||
newly_joined_or_invited_or_knocked_users: Set of users that have joined,
|
||||
been invited to a room or are knocking on a room since
|
||||
previous sync.
|
||||
newly_left_rooms: Set of rooms user has left since previous sync
|
||||
newly_left_users: Set of users that have left a room we're in since
|
||||
previous sync
|
||||
"""
|
||||
# Take a copy since these fields will be mutated later.
|
||||
newly_joined_or_invited_or_knocked_users = set(
|
||||
newly_joined_or_invited_or_knocked_users
|
||||
)
|
||||
newly_left_users = set(newly_left_users)
|
||||
|
||||
result = {"changed": list(possibly_joined), "left": list(possibly_left)}
|
||||
# We want to figure out what user IDs the client should refetch
|
||||
# device keys for, and which users we aren't going to track changes
|
||||
# for anymore.
|
||||
#
|
||||
# For the first step we check:
|
||||
# a. if any users we share a room with have updated their devices,
|
||||
# and
|
||||
# b. we also check if we've joined any new rooms, or if a user has
|
||||
# joined a room we're in.
|
||||
#
|
||||
# For the second step we just find any users we no longer share a
|
||||
# room with by looking at all users that have left a room plus users
|
||||
# that were in a room we've left.
|
||||
|
||||
log_kv(result)
|
||||
users_that_have_changed = set()
|
||||
|
||||
return result
|
||||
# Step 1a, check for changes in devices of users we share a room
|
||||
# with
|
||||
users_that_have_changed = await self.get_device_changes_in_shared_rooms(
|
||||
user_id,
|
||||
joined_room_ids,
|
||||
from_token=since_token,
|
||||
now_token=now_token,
|
||||
)
|
||||
|
||||
# Step 1b, check for newly joined rooms
|
||||
for room_id in newly_joined_rooms:
|
||||
joined_users = await self.store.get_users_in_room(room_id)
|
||||
newly_joined_or_invited_or_knocked_users.update(joined_users)
|
||||
|
||||
# TODO: Check that these users are actually new, i.e. either they
|
||||
# weren't in the previous sync *or* they left and rejoined.
|
||||
users_that_have_changed.update(newly_joined_or_invited_or_knocked_users)
|
||||
|
||||
user_signatures_changed = await self.store.get_users_whose_signatures_changed(
|
||||
user_id, since_token.device_list_key
|
||||
)
|
||||
users_that_have_changed.update(user_signatures_changed)
|
||||
|
||||
# Now find users that we no longer track
|
||||
for room_id in newly_left_rooms:
|
||||
left_users = await self.store.get_users_in_room(room_id)
|
||||
newly_left_users.update(left_users)
|
||||
|
||||
# Remove any users that we still share a room with.
|
||||
left_users_rooms = await self.store.get_rooms_for_users(newly_left_users)
|
||||
for user_id, entries in left_users_rooms.items():
|
||||
if any(rid in joined_room_ids for rid in entries):
|
||||
newly_left_users.discard(user_id)
|
||||
|
||||
return DeviceListUpdates(changed=users_that_have_changed, left=newly_left_users)
|
||||
|
||||
async def on_federation_query_user_devices(self, user_id: str) -> JsonDict:
|
||||
if not self.hs.is_mine(UserID.from_string(user_id)):
|
||||
|
||||
@@ -291,13 +291,20 @@ class E2eKeysHandler:
|
||||
|
||||
# Only try and fetch keys for destinations that are not marked as
|
||||
# down.
|
||||
filtered_destinations = await filter_destinations_by_retry_limiter(
|
||||
remote_queries_not_in_cache.keys(),
|
||||
self.clock,
|
||||
self.store,
|
||||
# Let's give an arbitrary grace period for those hosts that are
|
||||
# only recently down
|
||||
retry_due_within_ms=60 * 1000,
|
||||
unfiltered_destinations = remote_queries_not_in_cache.keys()
|
||||
filtered_destinations = set(
|
||||
await filter_destinations_by_retry_limiter(
|
||||
unfiltered_destinations,
|
||||
self.clock,
|
||||
self.store,
|
||||
# Let's give an arbitrary grace period for those hosts that are
|
||||
# only recently down
|
||||
retry_due_within_ms=60 * 1000,
|
||||
)
|
||||
)
|
||||
failures.update(
|
||||
(dest, _NOT_READY_FOR_RETRY_FAILURE)
|
||||
for dest in (unfiltered_destinations - filtered_destinations)
|
||||
)
|
||||
|
||||
await concurrently_execute(
|
||||
@@ -1641,6 +1648,9 @@ def _check_device_signature(
|
||||
raise SynapseError(400, "Invalid signature", Codes.INVALID_SIGNATURE)
|
||||
|
||||
|
||||
_NOT_READY_FOR_RETRY_FAILURE = {"status": 503, "message": "Not ready for retry"}
|
||||
|
||||
|
||||
def _exception_to_failure(e: Exception) -> JsonDict:
|
||||
if isinstance(e, SynapseError):
|
||||
return {"status": e.code, "errcode": e.errcode, "message": str(e)}
|
||||
@@ -1649,7 +1659,7 @@ def _exception_to_failure(e: Exception) -> JsonDict:
|
||||
return {"status": e.code, "message": str(e)}
|
||||
|
||||
if isinstance(e, NotRetryingDestination):
|
||||
return {"status": 503, "message": "Not ready for retry"}
|
||||
return _NOT_READY_FOR_RETRY_FAILURE
|
||||
|
||||
# include ConnectionRefused and other errors
|
||||
#
|
||||
|
||||
@@ -34,7 +34,7 @@ from synapse.api.errors import (
|
||||
from synapse.logging.opentracing import log_kv, trace
|
||||
from synapse.storage.databases.main.e2e_room_keys import RoomKey
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.async_helpers import ReadWriteLock
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
@@ -58,7 +58,7 @@ class E2eRoomKeysHandler:
|
||||
# clients belonging to a user will receive and try to upload a new session at
|
||||
# roughly the same time. Also used to lock out uploads when the key is being
|
||||
# changed.
|
||||
self._upload_linearizer = Linearizer("upload_room_keys_lock")
|
||||
self._upload_lock = ReadWriteLock()
|
||||
|
||||
@trace
|
||||
async def get_room_keys(
|
||||
@@ -89,7 +89,7 @@ class E2eRoomKeysHandler:
|
||||
|
||||
# we deliberately take the lock to get keys so that changing the version
|
||||
# works atomically
|
||||
async with self._upload_linearizer.queue(user_id):
|
||||
async with self._upload_lock.read(user_id):
|
||||
# make sure the backup version exists
|
||||
try:
|
||||
await self.store.get_e2e_room_keys_version_info(user_id, version)
|
||||
@@ -132,7 +132,7 @@ class E2eRoomKeysHandler:
|
||||
"""
|
||||
|
||||
# lock for consistency with uploading
|
||||
async with self._upload_linearizer.queue(user_id):
|
||||
async with self._upload_lock.write(user_id):
|
||||
# make sure the backup version exists
|
||||
try:
|
||||
version_info = await self.store.get_e2e_room_keys_version_info(
|
||||
@@ -193,7 +193,7 @@ class E2eRoomKeysHandler:
|
||||
# TODO: Validate the JSON to make sure it has the right keys.
|
||||
|
||||
# XXX: perhaps we should use a finer grained lock here?
|
||||
async with self._upload_linearizer.queue(user_id):
|
||||
async with self._upload_lock.write(user_id):
|
||||
# Check that the version we're trying to upload is the current version
|
||||
try:
|
||||
version_info = await self.store.get_e2e_room_keys_version_info(user_id)
|
||||
@@ -355,7 +355,7 @@ class E2eRoomKeysHandler:
|
||||
# TODO: Validate the JSON to make sure it has the right keys.
|
||||
|
||||
# lock everyone out until we've switched version
|
||||
async with self._upload_linearizer.queue(user_id):
|
||||
async with self._upload_lock.write(user_id):
|
||||
new_version = await self.store.create_e2e_room_keys_version(
|
||||
user_id, version_info
|
||||
)
|
||||
@@ -382,7 +382,7 @@ class E2eRoomKeysHandler:
|
||||
}
|
||||
"""
|
||||
|
||||
async with self._upload_linearizer.queue(user_id):
|
||||
async with self._upload_lock.read(user_id):
|
||||
try:
|
||||
res = await self.store.get_e2e_room_keys_version_info(user_id, version)
|
||||
except StoreError as e:
|
||||
@@ -407,7 +407,7 @@ class E2eRoomKeysHandler:
|
||||
NotFoundError: if this backup version doesn't exist
|
||||
"""
|
||||
|
||||
async with self._upload_linearizer.queue(user_id):
|
||||
async with self._upload_lock.write(user_id):
|
||||
try:
|
||||
await self.store.delete_e2e_room_keys_version(user_id, version)
|
||||
except StoreError as e:
|
||||
@@ -437,7 +437,7 @@ class E2eRoomKeysHandler:
|
||||
raise SynapseError(
|
||||
400, "Version in body does not match", Codes.INVALID_PARAM
|
||||
)
|
||||
async with self._upload_linearizer.queue(user_id):
|
||||
async with self._upload_lock.write(user_id):
|
||||
try:
|
||||
old_info = await self.store.get_e2e_room_keys_version_info(
|
||||
user_id, version
|
||||
|
||||
@@ -507,13 +507,15 @@ class PaginationHandler:
|
||||
|
||||
# Initially fetch the events from the database. With any luck, we can return
|
||||
# these without blocking on backfill (handled below).
|
||||
events, next_key = await self.store.paginate_room_events(
|
||||
room_id=room_id,
|
||||
from_key=from_token.room_key,
|
||||
to_key=to_room_key,
|
||||
direction=pagin_config.direction,
|
||||
limit=pagin_config.limit,
|
||||
event_filter=event_filter,
|
||||
events, next_key = (
|
||||
await self.store.paginate_room_events_by_topological_ordering(
|
||||
room_id=room_id,
|
||||
from_key=from_token.room_key,
|
||||
to_key=to_room_key,
|
||||
direction=pagin_config.direction,
|
||||
limit=pagin_config.limit,
|
||||
event_filter=event_filter,
|
||||
)
|
||||
)
|
||||
|
||||
if pagin_config.direction == Direction.BACKWARDS:
|
||||
@@ -582,13 +584,15 @@ class PaginationHandler:
|
||||
# If we did backfill something, refetch the events from the database to
|
||||
# catch anything new that might have been added since we last fetched.
|
||||
if did_backfill:
|
||||
events, next_key = await self.store.paginate_room_events(
|
||||
room_id=room_id,
|
||||
from_key=from_token.room_key,
|
||||
to_key=to_room_key,
|
||||
direction=pagin_config.direction,
|
||||
limit=pagin_config.limit,
|
||||
event_filter=event_filter,
|
||||
events, next_key = (
|
||||
await self.store.paginate_room_events_by_topological_ordering(
|
||||
room_id=room_id,
|
||||
from_key=from_token.room_key,
|
||||
to_key=to_room_key,
|
||||
direction=pagin_config.direction,
|
||||
limit=pagin_config.limit,
|
||||
event_filter=event_filter,
|
||||
)
|
||||
)
|
||||
else:
|
||||
# Otherwise, we can backfill in the background for eventual
|
||||
|
||||
@@ -74,6 +74,17 @@ class ProfileHandler:
|
||||
self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
|
||||
|
||||
async def get_profile(self, user_id: str, ignore_backoff: bool = True) -> JsonDict:
|
||||
"""
|
||||
Get a user's profile as a JSON dictionary.
|
||||
|
||||
Args:
|
||||
user_id: The user to fetch the profile of.
|
||||
ignore_backoff: True to ignore backoff when fetching over federation.
|
||||
|
||||
Returns:
|
||||
A JSON dictionary. For local queries this will include the displayname and avatar_url
|
||||
fields. For remote queries it may contain arbitrary information.
|
||||
"""
|
||||
target_user = UserID.from_string(user_id)
|
||||
|
||||
if self.hs.is_mine(target_user):
|
||||
@@ -107,6 +118,15 @@ class ProfileHandler:
|
||||
raise e.to_synapse_error()
|
||||
|
||||
async def get_displayname(self, target_user: UserID) -> Optional[str]:
|
||||
"""
|
||||
Fetch a user's display name from their profile.
|
||||
|
||||
Args:
|
||||
target_user: The user to fetch the display name of.
|
||||
|
||||
Returns:
|
||||
The user's display name or None if unset.
|
||||
"""
|
||||
if self.hs.is_mine(target_user):
|
||||
try:
|
||||
displayname = await self.store.get_profile_displayname(target_user)
|
||||
@@ -203,6 +223,15 @@ class ProfileHandler:
|
||||
await self._update_join_states(requester, target_user)
|
||||
|
||||
async def get_avatar_url(self, target_user: UserID) -> Optional[str]:
|
||||
"""
|
||||
Fetch a user's avatar URL from their profile.
|
||||
|
||||
Args:
|
||||
target_user: The user to fetch the avatar URL of.
|
||||
|
||||
Returns:
|
||||
The user's avatar URL or None if unset.
|
||||
"""
|
||||
if self.hs.is_mine(target_user):
|
||||
try:
|
||||
avatar_url = await self.store.get_profile_avatar_url(target_user)
|
||||
@@ -403,6 +432,12 @@ class ProfileHandler:
|
||||
async def _update_join_states(
|
||||
self, requester: Requester, target_user: UserID
|
||||
) -> None:
|
||||
"""
|
||||
Update the membership events of each room the user is joined to with the
|
||||
new profile information.
|
||||
|
||||
Note that this stomps over any custom display name or avatar URL in member events.
|
||||
"""
|
||||
if not self.hs.is_mine(target_user):
|
||||
return
|
||||
|
||||
|
||||
@@ -286,8 +286,14 @@ class ReceiptEventSource(EventSource[MultiWriterStreamToken, JsonMapping]):
|
||||
room_ids: Iterable[str],
|
||||
is_guest: bool,
|
||||
explicit_room_id: Optional[str] = None,
|
||||
to_key: Optional[MultiWriterStreamToken] = None,
|
||||
) -> Tuple[List[JsonMapping], MultiWriterStreamToken]:
|
||||
to_key = self.get_current_key()
|
||||
"""
|
||||
Find read receipts for given rooms (> `from_token` and <= `to_token`)
|
||||
"""
|
||||
|
||||
if to_key is None:
|
||||
to_key = self.get_current_key()
|
||||
|
||||
if from_key == to_key:
|
||||
return [], to_key
|
||||
|
||||
@@ -1188,6 +1188,8 @@ class RoomCreationHandler:
|
||||
)
|
||||
events_to_send.append((power_event, power_context))
|
||||
else:
|
||||
# Please update the docs for `default_power_level_content_override` when
|
||||
# updating the `events` dict below
|
||||
power_level_content: JsonDict = {
|
||||
"users": {creator_id: 100},
|
||||
"users_default": 0,
|
||||
@@ -1748,7 +1750,7 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
|
||||
from_key=from_key,
|
||||
to_key=to_key,
|
||||
limit=limit or 10,
|
||||
order="ASC",
|
||||
direction=Direction.FORWARDS,
|
||||
)
|
||||
|
||||
events = list(room_events)
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -293,7 +293,9 @@ class StatsHandler:
|
||||
"history_visibility"
|
||||
)
|
||||
elif delta.event_type == EventTypes.RoomEncryption:
|
||||
room_state["encryption"] = event_content.get("algorithm")
|
||||
room_state["encryption"] = event_content.get(
|
||||
EventContentFields.ENCRYPTION_ALGORITHM
|
||||
)
|
||||
elif delta.event_type == EventTypes.Name:
|
||||
room_state["name"] = event_content.get("name")
|
||||
elif delta.event_type == EventTypes.Topic:
|
||||
|
||||
@@ -43,6 +43,7 @@ from prometheus_client import Counter
|
||||
|
||||
from synapse.api.constants import (
|
||||
AccountDataTypes,
|
||||
Direction,
|
||||
EventContentFields,
|
||||
EventTypes,
|
||||
JoinRules,
|
||||
@@ -64,6 +65,7 @@ from synapse.logging.opentracing import (
|
||||
)
|
||||
from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
|
||||
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
|
||||
from synapse.storage.databases.main.stream import PaginateFunction
|
||||
from synapse.storage.roommember import MemberSummary
|
||||
from synapse.types import (
|
||||
DeviceListUpdates,
|
||||
@@ -84,7 +86,7 @@ from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext
|
||||
from synapse.util.metrics import Measure, measure_func
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -879,22 +881,49 @@ class SyncHandler:
|
||||
since_key = since_token.room_key
|
||||
|
||||
while limited and len(recents) < timeline_limit and max_repeat:
|
||||
# If we have a since_key then we are trying to get any events
|
||||
# that have happened since `since_key` up to `end_key`, so we
|
||||
# can just use `get_room_events_stream_for_room`.
|
||||
# Otherwise, we want to return the last N events in the room
|
||||
# in topological ordering.
|
||||
if since_key:
|
||||
events, end_key = await self.store.get_room_events_stream_for_room(
|
||||
room_id,
|
||||
limit=load_limit + 1,
|
||||
from_key=since_key,
|
||||
to_key=end_key,
|
||||
)
|
||||
else:
|
||||
events, end_key = await self.store.get_recent_events_for_room(
|
||||
room_id, limit=load_limit + 1, end_token=end_key
|
||||
)
|
||||
# For initial `/sync`, we want to view a historical section of the
|
||||
# timeline; to fetch events by `topological_ordering` (best
|
||||
# representation of the room DAG as others were seeing it at the time).
|
||||
# This also aligns with the order that `/messages` returns events in.
|
||||
#
|
||||
# For incremental `/sync`, we want to get all updates for rooms since
|
||||
# the last `/sync` (regardless if those updates arrived late or happened
|
||||
# a while ago in the past); to fetch events by `stream_ordering` (in the
|
||||
# order they were received by the server).
|
||||
#
|
||||
# Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
|
||||
#
|
||||
# FIXME: Using workaround for mypy,
|
||||
# https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and
|
||||
# https://github.com/python/mypy/issues/17479
|
||||
paginate_room_events_by_topological_ordering: PaginateFunction = (
|
||||
self.store.paginate_room_events_by_topological_ordering
|
||||
)
|
||||
paginate_room_events_by_stream_ordering: PaginateFunction = (
|
||||
self.store.paginate_room_events_by_stream_ordering
|
||||
)
|
||||
pagination_method: PaginateFunction = (
|
||||
# Use `topographical_ordering` for historical events
|
||||
paginate_room_events_by_topological_ordering
|
||||
if since_key is None
|
||||
# Use `stream_ordering` for updates
|
||||
else paginate_room_events_by_stream_ordering
|
||||
)
|
||||
events, end_key = await pagination_method(
|
||||
room_id=room_id,
|
||||
# The bounds are reversed so we can paginate backwards
|
||||
# (from newer to older events) starting at to_bound.
|
||||
# This ensures we fill the `limit` with the newest events first,
|
||||
from_key=end_key,
|
||||
to_key=since_key,
|
||||
direction=Direction.BACKWARDS,
|
||||
# We add one so we can determine if there are enough events to saturate
|
||||
# the limit or not (see `limited`)
|
||||
limit=load_limit + 1,
|
||||
)
|
||||
# We want to return the events in ascending order (the last event is the
|
||||
# most recent).
|
||||
events.reverse()
|
||||
|
||||
log_kv({"loaded_recents": len(events)})
|
||||
|
||||
@@ -1750,8 +1779,15 @@ class SyncHandler:
|
||||
)
|
||||
|
||||
if include_device_list_updates:
|
||||
device_lists = await self._generate_sync_entry_for_device_list(
|
||||
sync_result_builder,
|
||||
# include_device_list_updates can only be True if we have a
|
||||
# since token.
|
||||
assert since_token is not None
|
||||
|
||||
device_lists = await self._device_handler.generate_sync_entry_for_device_list(
|
||||
user_id=user_id,
|
||||
since_token=since_token,
|
||||
now_token=sync_result_builder.now_token,
|
||||
joined_room_ids=sync_result_builder.joined_room_ids,
|
||||
newly_joined_rooms=newly_joined_rooms,
|
||||
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
|
||||
newly_left_rooms=newly_left_rooms,
|
||||
@@ -1863,8 +1899,14 @@ class SyncHandler:
|
||||
newly_left_users,
|
||||
) = sync_result_builder.calculate_user_changes()
|
||||
|
||||
device_lists = await self._generate_sync_entry_for_device_list(
|
||||
sync_result_builder,
|
||||
# include_device_list_updates can only be True if we have a
|
||||
# since token.
|
||||
assert since_token is not None
|
||||
device_lists = await self._device_handler.generate_sync_entry_for_device_list(
|
||||
user_id=user_id,
|
||||
since_token=since_token,
|
||||
now_token=sync_result_builder.now_token,
|
||||
joined_room_ids=sync_result_builder.joined_room_ids,
|
||||
newly_joined_rooms=newly_joined_rooms,
|
||||
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
|
||||
newly_left_rooms=newly_left_rooms,
|
||||
@@ -2041,94 +2083,6 @@ class SyncHandler:
|
||||
|
||||
return sync_result_builder
|
||||
|
||||
@measure_func("_generate_sync_entry_for_device_list")
|
||||
async def _generate_sync_entry_for_device_list(
|
||||
self,
|
||||
sync_result_builder: "SyncResultBuilder",
|
||||
newly_joined_rooms: AbstractSet[str],
|
||||
newly_joined_or_invited_or_knocked_users: AbstractSet[str],
|
||||
newly_left_rooms: AbstractSet[str],
|
||||
newly_left_users: AbstractSet[str],
|
||||
) -> DeviceListUpdates:
|
||||
"""Generate the DeviceListUpdates section of sync
|
||||
|
||||
Args:
|
||||
sync_result_builder
|
||||
newly_joined_rooms: Set of rooms user has joined since previous sync
|
||||
newly_joined_or_invited_or_knocked_users: Set of users that have joined,
|
||||
been invited to a room or are knocking on a room since
|
||||
previous sync.
|
||||
newly_left_rooms: Set of rooms user has left since previous sync
|
||||
newly_left_users: Set of users that have left a room we're in since
|
||||
previous sync
|
||||
"""
|
||||
|
||||
user_id = sync_result_builder.sync_config.user.to_string()
|
||||
since_token = sync_result_builder.since_token
|
||||
assert since_token is not None
|
||||
|
||||
# Take a copy since these fields will be mutated later.
|
||||
newly_joined_or_invited_or_knocked_users = set(
|
||||
newly_joined_or_invited_or_knocked_users
|
||||
)
|
||||
newly_left_users = set(newly_left_users)
|
||||
|
||||
# We want to figure out what user IDs the client should refetch
|
||||
# device keys for, and which users we aren't going to track changes
|
||||
# for anymore.
|
||||
#
|
||||
# For the first step we check:
|
||||
# a. if any users we share a room with have updated their devices,
|
||||
# and
|
||||
# b. we also check if we've joined any new rooms, or if a user has
|
||||
# joined a room we're in.
|
||||
#
|
||||
# For the second step we just find any users we no longer share a
|
||||
# room with by looking at all users that have left a room plus users
|
||||
# that were in a room we've left.
|
||||
|
||||
users_that_have_changed = set()
|
||||
|
||||
joined_room_ids = sync_result_builder.joined_room_ids
|
||||
|
||||
# Step 1a, check for changes in devices of users we share a room
|
||||
# with
|
||||
users_that_have_changed = (
|
||||
await self._device_handler.get_device_changes_in_shared_rooms(
|
||||
user_id,
|
||||
joined_room_ids,
|
||||
from_token=since_token,
|
||||
now_token=sync_result_builder.now_token,
|
||||
)
|
||||
)
|
||||
|
||||
# Step 1b, check for newly joined rooms
|
||||
for room_id in newly_joined_rooms:
|
||||
joined_users = await self.store.get_users_in_room(room_id)
|
||||
newly_joined_or_invited_or_knocked_users.update(joined_users)
|
||||
|
||||
# TODO: Check that these users are actually new, i.e. either they
|
||||
# weren't in the previous sync *or* they left and rejoined.
|
||||
users_that_have_changed.update(newly_joined_or_invited_or_knocked_users)
|
||||
|
||||
user_signatures_changed = await self.store.get_users_whose_signatures_changed(
|
||||
user_id, since_token.device_list_key
|
||||
)
|
||||
users_that_have_changed.update(user_signatures_changed)
|
||||
|
||||
# Now find users that we no longer track
|
||||
for room_id in newly_left_rooms:
|
||||
left_users = await self.store.get_users_in_room(room_id)
|
||||
newly_left_users.update(left_users)
|
||||
|
||||
# Remove any users that we still share a room with.
|
||||
left_users_rooms = await self.store.get_rooms_for_users(newly_left_users)
|
||||
for user_id, entries in left_users_rooms.items():
|
||||
if any(rid in joined_room_ids for rid in entries):
|
||||
newly_left_users.discard(user_id)
|
||||
|
||||
return DeviceListUpdates(changed=users_that_have_changed, left=newly_left_users)
|
||||
|
||||
@trace
|
||||
async def _generate_sync_entry_for_to_device(
|
||||
self, sync_result_builder: "SyncResultBuilder"
|
||||
@@ -2270,7 +2224,11 @@ class SyncHandler:
|
||||
user=user,
|
||||
from_key=presence_key,
|
||||
is_guest=sync_config.is_guest,
|
||||
include_offline=include_offline,
|
||||
include_offline=(
|
||||
True
|
||||
if self.hs_config.server.presence_include_offline_users_on_sync
|
||||
else include_offline
|
||||
),
|
||||
)
|
||||
assert presence_key
|
||||
sync_result_builder.now_token = now_token.copy_and_replace(
|
||||
@@ -2637,9 +2595,10 @@ class SyncHandler:
|
||||
# a "gap" in the timeline, as described by the spec for /sync.
|
||||
room_to_events = await self.store.get_room_events_stream_for_rooms(
|
||||
room_ids=sync_result_builder.joined_room_ids,
|
||||
from_key=since_token.room_key,
|
||||
to_key=now_token.room_key,
|
||||
from_key=now_token.room_key,
|
||||
to_key=since_token.room_key,
|
||||
limit=timeline_limit + 1,
|
||||
direction=Direction.BACKWARDS,
|
||||
)
|
||||
|
||||
# We loop through all room ids, even if there are no new events, in case
|
||||
@@ -2650,6 +2609,9 @@ class SyncHandler:
|
||||
newly_joined = room_id in newly_joined_rooms
|
||||
if room_entry:
|
||||
events, start_key = room_entry
|
||||
# We want to return the events in ascending order (the last event is the
|
||||
# most recent).
|
||||
events.reverse()
|
||||
|
||||
prev_batch_token = now_token.copy_and_replace(
|
||||
StreamKeyType.ROOM, start_key
|
||||
|
||||
@@ -565,7 +565,12 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
|
||||
room_ids: Iterable[str],
|
||||
is_guest: bool,
|
||||
explicit_room_id: Optional[str] = None,
|
||||
to_key: Optional[int] = None,
|
||||
) -> Tuple[List[JsonMapping], int]:
|
||||
"""
|
||||
Find typing notifications for given rooms (> `from_token` and <= `to_token`)
|
||||
"""
|
||||
|
||||
with Measure(self.clock, "typing.get_new_events"):
|
||||
from_key = int(from_key)
|
||||
handler = self.get_typing_handler()
|
||||
@@ -574,7 +579,9 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
|
||||
for room_id in room_ids:
|
||||
if room_id not in handler._room_serials:
|
||||
continue
|
||||
if handler._room_serials[room_id] <= from_key:
|
||||
if handler._room_serials[room_id] <= from_key or (
|
||||
to_key is not None and handler._room_serials[room_id] > to_key
|
||||
):
|
||||
continue
|
||||
|
||||
events.append(self._make_event_for(room_id))
|
||||
|
||||
@@ -1057,11 +1057,11 @@ class _MultipartParserProtocol(protocol.Protocol):
|
||||
if not self.parser:
|
||||
|
||||
def on_header_field(data: bytes, start: int, end: int) -> None:
|
||||
if data[start:end] == b"Location":
|
||||
if data[start:end].lower() == b"location":
|
||||
self.has_redirect = True
|
||||
if data[start:end] == b"Content-Disposition":
|
||||
if data[start:end].lower() == b"content-disposition":
|
||||
self.in_disposition = True
|
||||
if data[start:end] == b"Content-Type":
|
||||
if data[start:end].lower() == b"content-type":
|
||||
self.in_content_type = True
|
||||
|
||||
def on_header_value(data: bytes, start: int, end: int) -> None:
|
||||
@@ -1088,7 +1088,6 @@ class _MultipartParserProtocol(protocol.Protocol):
|
||||
return
|
||||
# otherwise we are in the file part
|
||||
else:
|
||||
logger.info("Writing multipart file data to stream")
|
||||
try:
|
||||
self.stream.write(data[start:end])
|
||||
except Exception as e:
|
||||
|
||||
@@ -90,7 +90,7 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.logging.opentracing import set_tag, start_active_span, tags
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import json_decoder
|
||||
from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred
|
||||
from synapse.util.async_helpers import AwakenableSleeper, Linearizer, timeout_deferred
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.util.stringutils import parse_and_validate_server_name
|
||||
|
||||
@@ -475,6 +475,8 @@ class MatrixFederationHttpClient:
|
||||
use_proxy=True,
|
||||
)
|
||||
|
||||
self.remote_download_linearizer = Linearizer("remote_download_linearizer", 6)
|
||||
|
||||
def wake_destination(self, destination: str) -> None:
|
||||
"""Called when the remote server may have come back online."""
|
||||
|
||||
@@ -1486,35 +1488,44 @@ class MatrixFederationHttpClient:
|
||||
)
|
||||
|
||||
headers = dict(response.headers.getAllRawHeaders())
|
||||
|
||||
expected_size = response.length
|
||||
# if we don't get an expected length then use the max length
|
||||
|
||||
if expected_size == UNKNOWN_LENGTH:
|
||||
expected_size = max_size
|
||||
logger.debug(
|
||||
f"File size unknown, assuming file is max allowable size: {max_size}"
|
||||
)
|
||||
else:
|
||||
if int(expected_size) > max_size:
|
||||
msg = "Requested file is too large > %r bytes" % (max_size,)
|
||||
logger.warning(
|
||||
"{%s} [%s] %s",
|
||||
request.txn_id,
|
||||
request.destination,
|
||||
msg,
|
||||
)
|
||||
raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
|
||||
|
||||
read_body, _ = await download_ratelimiter.can_do_action(
|
||||
requester=None,
|
||||
key=ip_address,
|
||||
n_actions=expected_size,
|
||||
)
|
||||
if not read_body:
|
||||
msg = "Requested file size exceeds ratelimits"
|
||||
logger.warning(
|
||||
"{%s} [%s] %s",
|
||||
request.txn_id,
|
||||
request.destination,
|
||||
msg,
|
||||
read_body, _ = await download_ratelimiter.can_do_action(
|
||||
requester=None,
|
||||
key=ip_address,
|
||||
n_actions=expected_size,
|
||||
)
|
||||
raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED)
|
||||
if not read_body:
|
||||
msg = "Requested file size exceeds ratelimits"
|
||||
logger.warning(
|
||||
"{%s} [%s] %s",
|
||||
request.txn_id,
|
||||
request.destination,
|
||||
msg,
|
||||
)
|
||||
raise SynapseError(
|
||||
HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED
|
||||
)
|
||||
|
||||
try:
|
||||
# add a byte of headroom to max size as function errs at >=
|
||||
d = read_body_with_max_size(response, output_stream, expected_size + 1)
|
||||
d.addTimeout(self.default_timeout_seconds, self.reactor)
|
||||
length = await make_deferred_yieldable(d)
|
||||
async with self.remote_download_linearizer.queue(ip_address):
|
||||
# add a byte of headroom to max size as function errs at >=
|
||||
d = read_body_with_max_size(response, output_stream, expected_size + 1)
|
||||
d.addTimeout(self.default_timeout_seconds, self.reactor)
|
||||
length = await make_deferred_yieldable(d)
|
||||
except BodyExceededMaxSize:
|
||||
msg = "Requested file is too large > %r bytes" % (expected_size,)
|
||||
logger.warning(
|
||||
@@ -1560,6 +1571,13 @@ class MatrixFederationHttpClient:
|
||||
request.method,
|
||||
request.uri.decode("ascii"),
|
||||
)
|
||||
|
||||
# if we didn't know the length upfront, decrement the actual size from ratelimiter
|
||||
if response.length == UNKNOWN_LENGTH:
|
||||
download_ratelimiter.record_action(
|
||||
requester=None, key=ip_address, n_actions=length
|
||||
)
|
||||
|
||||
return length, headers
|
||||
|
||||
async def federation_get_file(
|
||||
@@ -1630,29 +1648,37 @@ class MatrixFederationHttpClient:
|
||||
)
|
||||
|
||||
headers = dict(response.headers.getAllRawHeaders())
|
||||
|
||||
expected_size = response.length
|
||||
# if we don't get an expected length then use the max length
|
||||
|
||||
if expected_size == UNKNOWN_LENGTH:
|
||||
expected_size = max_size
|
||||
logger.debug(
|
||||
f"File size unknown, assuming file is max allowable size: {max_size}"
|
||||
)
|
||||
else:
|
||||
if int(expected_size) > max_size:
|
||||
msg = "Requested file is too large > %r bytes" % (max_size,)
|
||||
logger.warning(
|
||||
"{%s} [%s] %s",
|
||||
request.txn_id,
|
||||
request.destination,
|
||||
msg,
|
||||
)
|
||||
raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
|
||||
|
||||
read_body, _ = await download_ratelimiter.can_do_action(
|
||||
requester=None,
|
||||
key=ip_address,
|
||||
n_actions=expected_size,
|
||||
)
|
||||
if not read_body:
|
||||
msg = "Requested file size exceeds ratelimits"
|
||||
logger.warning(
|
||||
"{%s} [%s] %s",
|
||||
request.txn_id,
|
||||
request.destination,
|
||||
msg,
|
||||
read_body, _ = await download_ratelimiter.can_do_action(
|
||||
requester=None,
|
||||
key=ip_address,
|
||||
n_actions=expected_size,
|
||||
)
|
||||
raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED)
|
||||
if not read_body:
|
||||
msg = "Requested file size exceeds ratelimits"
|
||||
logger.warning(
|
||||
"{%s} [%s] %s",
|
||||
request.txn_id,
|
||||
request.destination,
|
||||
msg,
|
||||
)
|
||||
raise SynapseError(
|
||||
HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED
|
||||
)
|
||||
|
||||
# this should be a multipart/mixed response with the boundary string in the header
|
||||
try:
|
||||
@@ -1672,11 +1698,12 @@ class MatrixFederationHttpClient:
|
||||
raise SynapseError(HTTPStatus.BAD_GATEWAY, msg)
|
||||
|
||||
try:
|
||||
# add a byte of headroom to max size as `_MultipartParserProtocol.dataReceived` errs at >=
|
||||
deferred = read_multipart_response(
|
||||
response, output_stream, boundary, expected_size + 1
|
||||
)
|
||||
deferred.addTimeout(self.default_timeout_seconds, self.reactor)
|
||||
async with self.remote_download_linearizer.queue(ip_address):
|
||||
# add a byte of headroom to max size as `_MultipartParserProtocol.dataReceived` errs at >=
|
||||
deferred = read_multipart_response(
|
||||
response, output_stream, boundary, expected_size + 1
|
||||
)
|
||||
deferred.addTimeout(self.default_timeout_seconds, self.reactor)
|
||||
except BodyExceededMaxSize:
|
||||
msg = "Requested file is too large > %r bytes" % (expected_size,)
|
||||
logger.warning(
|
||||
@@ -1743,6 +1770,13 @@ class MatrixFederationHttpClient:
|
||||
request.method,
|
||||
request.uri.decode("ascii"),
|
||||
)
|
||||
|
||||
# if we didn't know the length upfront, decrement the actual size from ratelimiter
|
||||
if response.length == UNKNOWN_LENGTH:
|
||||
download_ratelimiter.record_action(
|
||||
requester=None, key=ip_address, n_actions=length
|
||||
)
|
||||
|
||||
return length, headers, multipart_response.json
|
||||
|
||||
|
||||
|
||||
@@ -62,6 +62,15 @@ HOP_BY_HOP_HEADERS = {
|
||||
"Upgrade",
|
||||
}
|
||||
|
||||
if hasattr(Headers, "_canonicalNameCaps"):
|
||||
# Twisted < 24.7.0rc1
|
||||
_canonicalHeaderName = Headers()._canonicalNameCaps # type: ignore[attr-defined]
|
||||
else:
|
||||
# Twisted >= 24.7.0rc1
|
||||
# But note that `_encodeName` still exists on prior versions,
|
||||
# it just encodes differently
|
||||
_canonicalHeaderName = Headers()._encodeName
|
||||
|
||||
|
||||
def parse_connection_header_value(
|
||||
connection_header_value: Optional[bytes],
|
||||
@@ -85,11 +94,10 @@ def parse_connection_header_value(
|
||||
The set of header names that should not be copied over from the remote response.
|
||||
The keys are capitalized in canonical capitalization.
|
||||
"""
|
||||
headers = Headers()
|
||||
extra_headers_to_remove: Set[str] = set()
|
||||
if connection_header_value:
|
||||
extra_headers_to_remove = {
|
||||
headers._canonicalNameCaps(connection_option.strip()).decode("ascii")
|
||||
_canonicalHeaderName(connection_option.strip()).decode("ascii")
|
||||
for connection_option in connection_header_value.split(b",")
|
||||
}
|
||||
|
||||
|
||||
@@ -658,7 +658,7 @@ class SynapseSite(ProxySite):
|
||||
)
|
||||
|
||||
self.site_tag = site_tag
|
||||
self.reactor = reactor
|
||||
self.reactor: ISynapseReactor = reactor
|
||||
|
||||
assert config.http_options is not None
|
||||
proxied = config.http_options.x_forwarded
|
||||
@@ -683,7 +683,7 @@ class SynapseSite(ProxySite):
|
||||
self.access_logger = logging.getLogger(logger_name)
|
||||
self.server_version_string = server_version_string.encode("ascii")
|
||||
|
||||
def log(self, request: SynapseRequest) -> None:
|
||||
def log(self, request: SynapseRequest) -> None: # type: ignore[override]
|
||||
pass
|
||||
|
||||
|
||||
|
||||
@@ -28,6 +28,7 @@ from types import TracebackType
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Awaitable,
|
||||
BinaryIO,
|
||||
Dict,
|
||||
Generator,
|
||||
List,
|
||||
@@ -37,19 +38,28 @@ from typing import (
|
||||
)
|
||||
|
||||
import attr
|
||||
from zope.interface import implementer
|
||||
|
||||
from twisted.internet import interfaces
|
||||
from twisted.internet.defer import Deferred
|
||||
from twisted.internet.interfaces import IConsumer
|
||||
from twisted.protocols.basic import FileSender
|
||||
from twisted.python.failure import Failure
|
||||
from twisted.web.server import Request
|
||||
|
||||
from synapse.api.errors import Codes, cs_error
|
||||
from synapse.http.server import finish_request, respond_with_json
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.logging.context import (
|
||||
defer_to_threadpool,
|
||||
make_deferred_yieldable,
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.util import Clock
|
||||
from synapse.util.async_helpers import DeferredEvent
|
||||
from synapse.util.stringutils import is_ascii
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.databases.main.media_repository import LocalMedia
|
||||
|
||||
|
||||
@@ -122,6 +132,7 @@ def respond_404(request: SynapseRequest) -> None:
|
||||
|
||||
|
||||
async def respond_with_file(
|
||||
hs: "HomeServer",
|
||||
request: SynapseRequest,
|
||||
media_type: str,
|
||||
file_path: str,
|
||||
@@ -138,7 +149,7 @@ async def respond_with_file(
|
||||
add_file_headers(request, media_type, file_size, upload_name)
|
||||
|
||||
with open(file_path, "rb") as f:
|
||||
await make_deferred_yieldable(FileSender().beginFileTransfer(f, request))
|
||||
await ThreadedFileSender(hs).beginFileTransfer(f, request)
|
||||
|
||||
finish_request(request)
|
||||
else:
|
||||
@@ -601,3 +612,151 @@ def _parseparam(s: bytes) -> Generator[bytes, None, None]:
|
||||
f = s[:end]
|
||||
yield f.strip()
|
||||
s = s[end:]
|
||||
|
||||
|
||||
@implementer(interfaces.IPushProducer)
|
||||
class ThreadedFileSender:
|
||||
"""
|
||||
A producer that sends the contents of a file to a consumer, reading from the
|
||||
file on a thread.
|
||||
|
||||
This works by having a loop in a threadpool repeatedly reading from the
|
||||
file, until the consumer pauses the producer. There is then a loop in the
|
||||
main thread that waits until the consumer resumes the producer and then
|
||||
starts reading in the threadpool again.
|
||||
|
||||
This is done to ensure that we're never waiting in the threadpool, as
|
||||
otherwise its easy to starve it of threads.
|
||||
"""
|
||||
|
||||
# How much data to read in one go.
|
||||
CHUNK_SIZE = 2**14
|
||||
|
||||
# How long we wait for the consumer to be ready again before aborting the
|
||||
# read.
|
||||
TIMEOUT_SECONDS = 90.0
|
||||
|
||||
def __init__(self, hs: "HomeServer") -> None:
|
||||
self.reactor = hs.get_reactor()
|
||||
self.thread_pool = hs.get_media_sender_thread_pool()
|
||||
|
||||
self.file: Optional[BinaryIO] = None
|
||||
self.deferred: "Deferred[None]" = Deferred()
|
||||
self.consumer: Optional[interfaces.IConsumer] = None
|
||||
|
||||
# Signals if the thread should keep reading/sending data. Set means
|
||||
# continue, clear means pause.
|
||||
self.wakeup_event = DeferredEvent(self.reactor)
|
||||
|
||||
# Signals if the thread should terminate, e.g. because the consumer has
|
||||
# gone away.
|
||||
self.stop_writing = False
|
||||
|
||||
def beginFileTransfer(
|
||||
self, file: BinaryIO, consumer: interfaces.IConsumer
|
||||
) -> "Deferred[None]":
|
||||
"""
|
||||
Begin transferring a file
|
||||
"""
|
||||
self.file = file
|
||||
self.consumer = consumer
|
||||
|
||||
self.consumer.registerProducer(self, True)
|
||||
|
||||
# We set the wakeup signal as we should start producing immediately.
|
||||
self.wakeup_event.set()
|
||||
run_in_background(self.start_read_loop)
|
||||
|
||||
return make_deferred_yieldable(self.deferred)
|
||||
|
||||
def resumeProducing(self) -> None:
|
||||
"""interfaces.IPushProducer"""
|
||||
self.wakeup_event.set()
|
||||
|
||||
def pauseProducing(self) -> None:
|
||||
"""interfaces.IPushProducer"""
|
||||
self.wakeup_event.clear()
|
||||
|
||||
def stopProducing(self) -> None:
|
||||
"""interfaces.IPushProducer"""
|
||||
|
||||
# Unregister the consumer so we don't try and interact with it again.
|
||||
if self.consumer:
|
||||
self.consumer.unregisterProducer()
|
||||
|
||||
self.consumer = None
|
||||
|
||||
# Terminate the loop.
|
||||
self.stop_writing = True
|
||||
self.wakeup_event.set()
|
||||
|
||||
if not self.deferred.called:
|
||||
self.deferred.errback(Exception("Consumer asked us to stop producing"))
|
||||
|
||||
async def start_read_loop(self) -> None:
|
||||
"""This is the loop that drives reading/writing"""
|
||||
try:
|
||||
while not self.stop_writing:
|
||||
# Start the loop in the threadpool to read data.
|
||||
more_data = await defer_to_threadpool(
|
||||
self.reactor, self.thread_pool, self._on_thread_read_loop
|
||||
)
|
||||
if not more_data:
|
||||
# Reached EOF, we can just return.
|
||||
return
|
||||
|
||||
if not self.wakeup_event.is_set():
|
||||
ret = await self.wakeup_event.wait(self.TIMEOUT_SECONDS)
|
||||
if not ret:
|
||||
raise Exception("Timed out waiting to resume")
|
||||
except Exception:
|
||||
self._error(Failure())
|
||||
finally:
|
||||
self._finish()
|
||||
|
||||
def _on_thread_read_loop(self) -> bool:
|
||||
"""This is the loop that happens on a thread.
|
||||
|
||||
Returns:
|
||||
Whether there is more data to send.
|
||||
"""
|
||||
|
||||
while not self.stop_writing and self.wakeup_event.is_set():
|
||||
# The file should always have been set before we get here.
|
||||
assert self.file is not None
|
||||
|
||||
chunk = self.file.read(self.CHUNK_SIZE)
|
||||
if not chunk:
|
||||
return False
|
||||
|
||||
self.reactor.callFromThread(self._write, chunk)
|
||||
|
||||
return True
|
||||
|
||||
def _write(self, chunk: bytes) -> None:
|
||||
"""Called from the thread to write a chunk of data"""
|
||||
if self.consumer:
|
||||
self.consumer.write(chunk)
|
||||
|
||||
def _error(self, failure: Failure) -> None:
|
||||
"""Called when there was a fatal error"""
|
||||
if self.consumer:
|
||||
self.consumer.unregisterProducer()
|
||||
self.consumer = None
|
||||
|
||||
if not self.deferred.called:
|
||||
self.deferred.errback(failure)
|
||||
|
||||
def _finish(self) -> None:
|
||||
"""Called when we have finished writing (either on success or
|
||||
failure)."""
|
||||
if self.file:
|
||||
self.file.close()
|
||||
self.file = None
|
||||
|
||||
if self.consumer:
|
||||
self.consumer.unregisterProducer()
|
||||
self.consumer = None
|
||||
|
||||
if not self.deferred.called:
|
||||
self.deferred.callback(None)
|
||||
|
||||
@@ -430,6 +430,7 @@ class MediaRepository:
|
||||
media_id: str,
|
||||
name: Optional[str],
|
||||
max_timeout_ms: int,
|
||||
allow_authenticated: bool = True,
|
||||
federation: bool = False,
|
||||
) -> None:
|
||||
"""Responds to requests for local media, if exists, or returns 404.
|
||||
@@ -442,6 +443,7 @@ class MediaRepository:
|
||||
the filename in the Content-Disposition header of the response.
|
||||
max_timeout_ms: the maximum number of milliseconds to wait for the
|
||||
media to be uploaded.
|
||||
allow_authenticated: whether media marked as authenticated may be served to this request
|
||||
federation: whether the local media being fetched is for a federation request
|
||||
|
||||
Returns:
|
||||
@@ -451,6 +453,10 @@ class MediaRepository:
|
||||
if not media_info:
|
||||
return
|
||||
|
||||
if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
|
||||
if media_info.authenticated:
|
||||
raise NotFoundError()
|
||||
|
||||
self.mark_recently_accessed(None, media_id)
|
||||
|
||||
media_type = media_info.media_type
|
||||
@@ -481,6 +487,7 @@ class MediaRepository:
|
||||
max_timeout_ms: int,
|
||||
ip_address: str,
|
||||
use_federation_endpoint: bool,
|
||||
allow_authenticated: bool = True,
|
||||
) -> None:
|
||||
"""Respond to requests for remote media.
|
||||
|
||||
@@ -495,6 +502,8 @@ class MediaRepository:
|
||||
ip_address: the IP address of the requester
|
||||
use_federation_endpoint: whether to request the remote media over the new
|
||||
federation `/download` endpoint
|
||||
allow_authenticated: whether media marked as authenticated may be served to this
|
||||
request
|
||||
|
||||
Returns:
|
||||
Resolves once a response has successfully been written to request
|
||||
@@ -526,6 +535,7 @@ class MediaRepository:
|
||||
self.download_ratelimiter,
|
||||
ip_address,
|
||||
use_federation_endpoint,
|
||||
allow_authenticated,
|
||||
)
|
||||
|
||||
# We deliberately stream the file outside the lock
|
||||
@@ -548,6 +558,7 @@ class MediaRepository:
|
||||
max_timeout_ms: int,
|
||||
ip_address: str,
|
||||
use_federation: bool,
|
||||
allow_authenticated: bool,
|
||||
) -> RemoteMedia:
|
||||
"""Gets the media info associated with the remote file, downloading
|
||||
if necessary.
|
||||
@@ -560,6 +571,8 @@ class MediaRepository:
|
||||
ip_address: IP address of the requester
|
||||
use_federation: if a download is necessary, whether to request the remote file
|
||||
over the federation `/download` endpoint
|
||||
allow_authenticated: whether media marked as authenticated may be served to this
|
||||
request
|
||||
|
||||
Returns:
|
||||
The media info of the file
|
||||
@@ -581,6 +594,7 @@ class MediaRepository:
|
||||
self.download_ratelimiter,
|
||||
ip_address,
|
||||
use_federation,
|
||||
allow_authenticated,
|
||||
)
|
||||
|
||||
# Ensure we actually use the responder so that it releases resources
|
||||
@@ -598,6 +612,7 @@ class MediaRepository:
|
||||
download_ratelimiter: Ratelimiter,
|
||||
ip_address: str,
|
||||
use_federation_endpoint: bool,
|
||||
allow_authenticated: bool,
|
||||
) -> Tuple[Optional[Responder], RemoteMedia]:
|
||||
"""Looks for media in local cache, if not there then attempt to
|
||||
download from remote server.
|
||||
@@ -619,6 +634,11 @@ class MediaRepository:
|
||||
"""
|
||||
media_info = await self.store.get_cached_remote_media(server_name, media_id)
|
||||
|
||||
if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
|
||||
# if it isn't cached then don't fetch it or if it's authenticated then don't serve it
|
||||
if not media_info or media_info.authenticated:
|
||||
raise NotFoundError()
|
||||
|
||||
# file_id is the ID we use to track the file locally. If we've already
|
||||
# seen the file then reuse the existing ID, otherwise generate a new
|
||||
# one.
|
||||
@@ -792,6 +812,11 @@ class MediaRepository:
|
||||
|
||||
logger.info("Stored remote media in file %r", fname)
|
||||
|
||||
if self.hs.config.media.enable_authenticated_media:
|
||||
authenticated = True
|
||||
else:
|
||||
authenticated = False
|
||||
|
||||
return RemoteMedia(
|
||||
media_origin=server_name,
|
||||
media_id=media_id,
|
||||
@@ -802,6 +827,7 @@ class MediaRepository:
|
||||
filesystem_id=file_id,
|
||||
last_access_ts=time_now_ms,
|
||||
quarantined_by=None,
|
||||
authenticated=authenticated,
|
||||
)
|
||||
|
||||
async def _federation_download_remote_file(
|
||||
@@ -915,6 +941,11 @@ class MediaRepository:
|
||||
|
||||
logger.debug("Stored remote media in file %r", fname)
|
||||
|
||||
if self.hs.config.media.enable_authenticated_media:
|
||||
authenticated = True
|
||||
else:
|
||||
authenticated = False
|
||||
|
||||
return RemoteMedia(
|
||||
media_origin=server_name,
|
||||
media_id=media_id,
|
||||
@@ -925,6 +956,7 @@ class MediaRepository:
|
||||
filesystem_id=file_id,
|
||||
last_access_ts=time_now_ms,
|
||||
quarantined_by=None,
|
||||
authenticated=authenticated,
|
||||
)
|
||||
|
||||
def _get_thumbnail_requirements(
|
||||
@@ -1030,7 +1062,12 @@ class MediaRepository:
|
||||
t_len = os.path.getsize(output_path)
|
||||
|
||||
await self.store.store_local_thumbnail(
|
||||
media_id, t_width, t_height, t_type, t_method, t_len
|
||||
media_id,
|
||||
t_width,
|
||||
t_height,
|
||||
t_type,
|
||||
t_method,
|
||||
t_len,
|
||||
)
|
||||
|
||||
return output_path
|
||||
|
||||
@@ -49,15 +49,11 @@ from zope.interface import implementer
|
||||
from twisted.internet import interfaces
|
||||
from twisted.internet.defer import Deferred
|
||||
from twisted.internet.interfaces import IConsumer
|
||||
from twisted.protocols.basic import FileSender
|
||||
|
||||
from synapse.api.errors import NotFoundError
|
||||
from synapse.logging.context import (
|
||||
defer_to_thread,
|
||||
make_deferred_yieldable,
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.logging.context import defer_to_thread, run_in_background
|
||||
from synapse.logging.opentracing import start_active_span, trace, trace_with_opname
|
||||
from synapse.media._base import ThreadedFileSender
|
||||
from synapse.util import Clock
|
||||
from synapse.util.file_consumer import BackgroundFileConsumer
|
||||
|
||||
@@ -213,7 +209,7 @@ class MediaStorage:
|
||||
local_path = os.path.join(self.local_media_directory, path)
|
||||
if os.path.exists(local_path):
|
||||
logger.debug("responding with local file %s", local_path)
|
||||
return FileResponder(open(local_path, "rb"))
|
||||
return FileResponder(self.hs, open(local_path, "rb"))
|
||||
logger.debug("local file %s did not exist", local_path)
|
||||
|
||||
for provider in self.storage_providers:
|
||||
@@ -336,13 +332,12 @@ class FileResponder(Responder):
|
||||
is closed when finished streaming.
|
||||
"""
|
||||
|
||||
def __init__(self, open_file: IO):
|
||||
def __init__(self, hs: "HomeServer", open_file: BinaryIO):
|
||||
self.hs = hs
|
||||
self.open_file = open_file
|
||||
|
||||
def write_to_consumer(self, consumer: IConsumer) -> Deferred:
|
||||
return make_deferred_yieldable(
|
||||
FileSender().beginFileTransfer(self.open_file, consumer)
|
||||
)
|
||||
return ThreadedFileSender(self.hs).beginFileTransfer(self.open_file, consumer)
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
@@ -549,7 +544,7 @@ class MultipartFileConsumer:
|
||||
Calculate the content length of the multipart response
|
||||
in bytes.
|
||||
"""
|
||||
if not self.length:
|
||||
if self.length is None:
|
||||
return None
|
||||
# calculate length of json field and content-type, disposition headers
|
||||
json_field = json.dumps(self.json_field)
|
||||
|
||||
@@ -145,6 +145,7 @@ class FileStorageProviderBackend(StorageProvider):
|
||||
|
||||
def __init__(self, hs: "HomeServer", config: str):
|
||||
self.hs = hs
|
||||
self.reactor = hs.get_reactor()
|
||||
self.cache_directory = hs.config.media.media_store_path
|
||||
self.base_directory = config
|
||||
|
||||
@@ -165,7 +166,7 @@ class FileStorageProviderBackend(StorageProvider):
|
||||
shutil_copyfile: Callable[[str, str], str] = shutil.copyfile
|
||||
with start_active_span("shutil_copyfile"):
|
||||
await defer_to_thread(
|
||||
self.hs.get_reactor(),
|
||||
self.reactor,
|
||||
shutil_copyfile,
|
||||
primary_fname,
|
||||
backup_fname,
|
||||
@@ -177,7 +178,7 @@ class FileStorageProviderBackend(StorageProvider):
|
||||
|
||||
backup_fname = os.path.join(self.base_directory, path)
|
||||
if os.path.isfile(backup_fname):
|
||||
return FileResponder(open(backup_fname, "rb"))
|
||||
return FileResponder(self.hs, open(backup_fname, "rb"))
|
||||
|
||||
return None
|
||||
|
||||
|
||||
@@ -26,7 +26,7 @@ from typing import TYPE_CHECKING, List, Optional, Tuple, Type
|
||||
|
||||
from PIL import Image
|
||||
|
||||
from synapse.api.errors import Codes, SynapseError, cs_error
|
||||
from synapse.api.errors import Codes, NotFoundError, SynapseError, cs_error
|
||||
from synapse.config.repository import THUMBNAIL_SUPPORTED_MEDIA_FORMAT_MAP
|
||||
from synapse.http.server import respond_with_json
|
||||
from synapse.http.site import SynapseRequest
|
||||
@@ -259,6 +259,7 @@ class ThumbnailProvider:
|
||||
media_storage: MediaStorage,
|
||||
):
|
||||
self.hs = hs
|
||||
self.reactor = hs.get_reactor()
|
||||
self.media_repo = media_repo
|
||||
self.media_storage = media_storage
|
||||
self.store = hs.get_datastores().main
|
||||
@@ -274,6 +275,7 @@ class ThumbnailProvider:
|
||||
m_type: str,
|
||||
max_timeout_ms: int,
|
||||
for_federation: bool,
|
||||
allow_authenticated: bool = True,
|
||||
) -> None:
|
||||
media_info = await self.media_repo.get_local_media_info(
|
||||
request, media_id, max_timeout_ms
|
||||
@@ -281,6 +283,12 @@ class ThumbnailProvider:
|
||||
if not media_info:
|
||||
return
|
||||
|
||||
# if the media the thumbnail is generated from is authenticated, don't serve the
|
||||
# thumbnail over an unauthenticated endpoint
|
||||
if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
|
||||
if media_info.authenticated:
|
||||
raise NotFoundError()
|
||||
|
||||
thumbnail_infos = await self.store.get_local_media_thumbnails(media_id)
|
||||
await self._select_and_respond_with_thumbnail(
|
||||
request,
|
||||
@@ -307,14 +315,20 @@ class ThumbnailProvider:
|
||||
desired_type: str,
|
||||
max_timeout_ms: int,
|
||||
for_federation: bool,
|
||||
allow_authenticated: bool = True,
|
||||
) -> None:
|
||||
media_info = await self.media_repo.get_local_media_info(
|
||||
request, media_id, max_timeout_ms
|
||||
)
|
||||
|
||||
if not media_info:
|
||||
return
|
||||
|
||||
# if the media the thumbnail is generated from is authenticated, don't serve the
|
||||
# thumbnail over an unauthenticated endpoint
|
||||
if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
|
||||
if media_info.authenticated:
|
||||
raise NotFoundError()
|
||||
|
||||
thumbnail_infos = await self.store.get_local_media_thumbnails(media_id)
|
||||
for info in thumbnail_infos:
|
||||
t_w = info.width == desired_width
|
||||
@@ -360,11 +374,11 @@ class ThumbnailProvider:
|
||||
await respond_with_multipart_responder(
|
||||
self.hs.get_clock(),
|
||||
request,
|
||||
FileResponder(open(file_path, "rb")),
|
||||
FileResponder(self.hs, open(file_path, "rb")),
|
||||
media_info,
|
||||
)
|
||||
else:
|
||||
await respond_with_file(request, desired_type, file_path)
|
||||
await respond_with_file(self.hs, request, desired_type, file_path)
|
||||
else:
|
||||
logger.warning("Failed to generate thumbnail")
|
||||
raise SynapseError(400, "Failed to generate thumbnail.")
|
||||
@@ -381,14 +395,27 @@ class ThumbnailProvider:
|
||||
max_timeout_ms: int,
|
||||
ip_address: str,
|
||||
use_federation: bool,
|
||||
allow_authenticated: bool = True,
|
||||
) -> None:
|
||||
media_info = await self.media_repo.get_remote_media_info(
|
||||
server_name, media_id, max_timeout_ms, ip_address, use_federation
|
||||
server_name,
|
||||
media_id,
|
||||
max_timeout_ms,
|
||||
ip_address,
|
||||
use_federation,
|
||||
allow_authenticated,
|
||||
)
|
||||
if not media_info:
|
||||
respond_404(request)
|
||||
return
|
||||
|
||||
# if the media the thumbnail is generated from is authenticated, don't serve the
|
||||
# thumbnail over an unauthenticated endpoint
|
||||
if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
|
||||
if media_info.authenticated:
|
||||
respond_404(request)
|
||||
return
|
||||
|
||||
thumbnail_infos = await self.store.get_remote_media_thumbnails(
|
||||
server_name, media_id
|
||||
)
|
||||
@@ -429,7 +456,7 @@ class ThumbnailProvider:
|
||||
)
|
||||
|
||||
if file_path:
|
||||
await respond_with_file(request, desired_type, file_path)
|
||||
await respond_with_file(self.hs, request, desired_type, file_path)
|
||||
else:
|
||||
logger.warning("Failed to generate thumbnail")
|
||||
raise SynapseError(400, "Failed to generate thumbnail.")
|
||||
@@ -446,16 +473,28 @@ class ThumbnailProvider:
|
||||
max_timeout_ms: int,
|
||||
ip_address: str,
|
||||
use_federation: bool,
|
||||
allow_authenticated: bool = True,
|
||||
) -> None:
|
||||
# TODO: Don't download the whole remote file
|
||||
# We should proxy the thumbnail from the remote server instead of
|
||||
# downloading the remote file and generating our own thumbnails.
|
||||
media_info = await self.media_repo.get_remote_media_info(
|
||||
server_name, media_id, max_timeout_ms, ip_address, use_federation
|
||||
server_name,
|
||||
media_id,
|
||||
max_timeout_ms,
|
||||
ip_address,
|
||||
use_federation,
|
||||
allow_authenticated,
|
||||
)
|
||||
if not media_info:
|
||||
return
|
||||
|
||||
# if the media the thumbnail is generated from is authenticated, don't serve the
|
||||
# thumbnail over an unauthenticated endpoint
|
||||
if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
|
||||
if media_info.authenticated:
|
||||
raise NotFoundError()
|
||||
|
||||
thumbnail_infos = await self.store.get_remote_media_thumbnails(
|
||||
server_name, media_id
|
||||
)
|
||||
@@ -485,8 +524,8 @@ class ThumbnailProvider:
|
||||
file_id: str,
|
||||
url_cache: bool,
|
||||
for_federation: bool,
|
||||
server_name: Optional[str] = None,
|
||||
media_info: Optional[LocalMedia] = None,
|
||||
server_name: Optional[str] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Respond to a request with an appropriate thumbnail from the previously generated thumbnails.
|
||||
|
||||
@@ -773,6 +773,7 @@ class Notifier:
|
||||
stream_token = await self.event_sources.bound_future_token(stream_token)
|
||||
|
||||
start = self.clock.time_msec()
|
||||
logged = False
|
||||
while True:
|
||||
current_token = self.event_sources.get_current_token()
|
||||
if stream_token.is_before_or_eq(current_token):
|
||||
@@ -783,11 +784,13 @@ class Notifier:
|
||||
if now - start > 10_000:
|
||||
return False
|
||||
|
||||
logger.info(
|
||||
"Waiting for current token to reach %s; currently at %s",
|
||||
stream_token,
|
||||
current_token,
|
||||
)
|
||||
if not logged:
|
||||
logger.info(
|
||||
"Waiting for current token to reach %s; currently at %s",
|
||||
stream_token,
|
||||
current_token,
|
||||
)
|
||||
logged = True
|
||||
|
||||
# TODO: be better
|
||||
await self.clock.sleep(0.5)
|
||||
|
||||
@@ -18,7 +18,8 @@
|
||||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
from typing import TYPE_CHECKING, Callable
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
from synapse.http.server import HttpServer, JsonResource
|
||||
from synapse.rest import admin
|
||||
@@ -67,11 +68,64 @@ from synapse.rest.client import (
|
||||
voip,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
RegisterServletsFunc = Callable[["HomeServer", HttpServer], None]
|
||||
|
||||
CLIENT_SERVLET_FUNCTIONS: Tuple[RegisterServletsFunc, ...] = (
|
||||
versions.register_servlets,
|
||||
initial_sync.register_servlets,
|
||||
room.register_deprecated_servlets,
|
||||
events.register_servlets,
|
||||
room.register_servlets,
|
||||
login.register_servlets,
|
||||
profile.register_servlets,
|
||||
presence.register_servlets,
|
||||
directory.register_servlets,
|
||||
voip.register_servlets,
|
||||
pusher.register_servlets,
|
||||
push_rule.register_servlets,
|
||||
logout.register_servlets,
|
||||
sync.register_servlets,
|
||||
filter.register_servlets,
|
||||
account.register_servlets,
|
||||
register.register_servlets,
|
||||
auth.register_servlets,
|
||||
receipts.register_servlets,
|
||||
read_marker.register_servlets,
|
||||
room_keys.register_servlets,
|
||||
keys.register_servlets,
|
||||
tokenrefresh.register_servlets,
|
||||
tags.register_servlets,
|
||||
account_data.register_servlets,
|
||||
reporting.register_servlets,
|
||||
openid.register_servlets,
|
||||
notifications.register_servlets,
|
||||
devices.register_servlets,
|
||||
thirdparty.register_servlets,
|
||||
sendtodevice.register_servlets,
|
||||
user_directory.register_servlets,
|
||||
room_upgrade_rest_servlet.register_servlets,
|
||||
capabilities.register_servlets,
|
||||
account_validity.register_servlets,
|
||||
relations.register_servlets,
|
||||
password_policy.register_servlets,
|
||||
knock.register_servlets,
|
||||
appservice_ping.register_servlets,
|
||||
admin.register_servlets_for_client_rest_resource,
|
||||
mutual_rooms.register_servlets,
|
||||
login_token_request.register_servlets,
|
||||
rendezvous.register_servlets,
|
||||
auth_issuer.register_servlets,
|
||||
)
|
||||
|
||||
SERVLET_GROUPS: Dict[str, Iterable[RegisterServletsFunc]] = {
|
||||
"client": CLIENT_SERVLET_FUNCTIONS,
|
||||
}
|
||||
|
||||
|
||||
class ClientRestResource(JsonResource):
|
||||
"""Matrix Client API REST resource.
|
||||
@@ -83,80 +137,56 @@ class ClientRestResource(JsonResource):
|
||||
* etc
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
def __init__(self, hs: "HomeServer", servlet_groups: Optional[List[str]] = None):
|
||||
JsonResource.__init__(self, hs, canonical_json=False)
|
||||
self.register_servlets(self, hs)
|
||||
if hs.config.media.can_load_media_repo:
|
||||
# This import is here to prevent a circular import failure
|
||||
from synapse.rest.client import media
|
||||
|
||||
SERVLET_GROUPS["media"] = (media.register_servlets,)
|
||||
self.register_servlets(self, hs, servlet_groups)
|
||||
|
||||
@staticmethod
|
||||
def register_servlets(client_resource: HttpServer, hs: "HomeServer") -> None:
|
||||
def register_servlets(
|
||||
client_resource: HttpServer,
|
||||
hs: "HomeServer",
|
||||
servlet_groups: Optional[Iterable[str]] = None,
|
||||
) -> None:
|
||||
# Some servlets are only registered on the main process (and not worker
|
||||
# processes).
|
||||
is_main_process = hs.config.worker.worker_app is None
|
||||
|
||||
versions.register_servlets(hs, client_resource)
|
||||
if not servlet_groups:
|
||||
servlet_groups = SERVLET_GROUPS.keys()
|
||||
|
||||
# Deprecated in r0
|
||||
initial_sync.register_servlets(hs, client_resource)
|
||||
room.register_deprecated_servlets(hs, client_resource)
|
||||
for servlet_group in servlet_groups:
|
||||
# Fail on unknown servlet groups.
|
||||
if servlet_group not in SERVLET_GROUPS:
|
||||
if servlet_group == "media":
|
||||
logger.warn(
|
||||
"media.can_load_media_repo needs to be configured for the media servlet to be available"
|
||||
)
|
||||
raise RuntimeError(
|
||||
f"Attempting to register unknown client servlet: '{servlet_group}'"
|
||||
)
|
||||
|
||||
# Partially deprecated in r0
|
||||
events.register_servlets(hs, client_resource)
|
||||
for servletfunc in SERVLET_GROUPS[servlet_group]:
|
||||
if not is_main_process and servletfunc in [
|
||||
pusher.register_servlets,
|
||||
logout.register_servlets,
|
||||
auth.register_servlets,
|
||||
tokenrefresh.register_servlets,
|
||||
reporting.register_servlets,
|
||||
openid.register_servlets,
|
||||
thirdparty.register_servlets,
|
||||
room_upgrade_rest_servlet.register_servlets,
|
||||
account_validity.register_servlets,
|
||||
admin.register_servlets_for_client_rest_resource,
|
||||
mutual_rooms.register_servlets,
|
||||
login_token_request.register_servlets,
|
||||
rendezvous.register_servlets,
|
||||
auth_issuer.register_servlets,
|
||||
]:
|
||||
continue
|
||||
|
||||
room.register_servlets(hs, client_resource)
|
||||
login.register_servlets(hs, client_resource)
|
||||
profile.register_servlets(hs, client_resource)
|
||||
presence.register_servlets(hs, client_resource)
|
||||
directory.register_servlets(hs, client_resource)
|
||||
voip.register_servlets(hs, client_resource)
|
||||
if is_main_process:
|
||||
pusher.register_servlets(hs, client_resource)
|
||||
push_rule.register_servlets(hs, client_resource)
|
||||
if is_main_process:
|
||||
logout.register_servlets(hs, client_resource)
|
||||
sync.register_servlets(hs, client_resource)
|
||||
filter.register_servlets(hs, client_resource)
|
||||
account.register_servlets(hs, client_resource)
|
||||
register.register_servlets(hs, client_resource)
|
||||
if is_main_process:
|
||||
auth.register_servlets(hs, client_resource)
|
||||
receipts.register_servlets(hs, client_resource)
|
||||
read_marker.register_servlets(hs, client_resource)
|
||||
room_keys.register_servlets(hs, client_resource)
|
||||
keys.register_servlets(hs, client_resource)
|
||||
if is_main_process:
|
||||
tokenrefresh.register_servlets(hs, client_resource)
|
||||
tags.register_servlets(hs, client_resource)
|
||||
account_data.register_servlets(hs, client_resource)
|
||||
if is_main_process:
|
||||
reporting.register_servlets(hs, client_resource)
|
||||
openid.register_servlets(hs, client_resource)
|
||||
notifications.register_servlets(hs, client_resource)
|
||||
devices.register_servlets(hs, client_resource)
|
||||
if is_main_process:
|
||||
thirdparty.register_servlets(hs, client_resource)
|
||||
sendtodevice.register_servlets(hs, client_resource)
|
||||
user_directory.register_servlets(hs, client_resource)
|
||||
if is_main_process:
|
||||
room_upgrade_rest_servlet.register_servlets(hs, client_resource)
|
||||
capabilities.register_servlets(hs, client_resource)
|
||||
if is_main_process:
|
||||
account_validity.register_servlets(hs, client_resource)
|
||||
relations.register_servlets(hs, client_resource)
|
||||
password_policy.register_servlets(hs, client_resource)
|
||||
knock.register_servlets(hs, client_resource)
|
||||
appservice_ping.register_servlets(hs, client_resource)
|
||||
if hs.config.server.enable_media_repo:
|
||||
from synapse.rest.client import media
|
||||
|
||||
media.register_servlets(hs, client_resource)
|
||||
|
||||
# moving to /_synapse/admin
|
||||
if is_main_process:
|
||||
admin.register_servlets_for_client_rest_resource(hs, client_resource)
|
||||
|
||||
# unstable
|
||||
if is_main_process:
|
||||
mutual_rooms.register_servlets(hs, client_resource)
|
||||
login_token_request.register_servlets(hs, client_resource)
|
||||
rendezvous.register_servlets(hs, client_resource)
|
||||
auth_issuer.register_servlets(hs, client_resource)
|
||||
servletfunc(hs, client_resource)
|
||||
|
||||
@@ -256,9 +256,15 @@ class KeyChangesServlet(RestServlet):
|
||||
|
||||
user_id = requester.user.to_string()
|
||||
|
||||
results = await self.device_handler.get_user_ids_changed(user_id, from_token)
|
||||
device_list_updates = await self.device_handler.get_user_ids_changed(
|
||||
user_id, from_token
|
||||
)
|
||||
|
||||
return 200, results
|
||||
response: JsonDict = {}
|
||||
response["changed"] = list(device_list_updates.changed)
|
||||
response["left"] = list(device_list_updates.left)
|
||||
|
||||
return 200, response
|
||||
|
||||
|
||||
class OneTimeKeyServlet(RestServlet):
|
||||
|
||||
@@ -67,7 +67,8 @@ from synapse.streams.config import PaginationConfig
|
||||
from synapse.types import JsonDict, Requester, StreamToken, ThirdPartyInstanceID, UserID
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.cancellation import cancellable
|
||||
from synapse.util.stringutils import parse_and_validate_server_name, random_string
|
||||
from synapse.util.events import generate_fake_event_id
|
||||
from synapse.util.stringutils import parse_and_validate_server_name
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
@@ -325,7 +326,7 @@ class RoomStateEventRestServlet(RestServlet):
|
||||
)
|
||||
event_id = event.event_id
|
||||
except ShadowBanError:
|
||||
event_id = "$" + random_string(43)
|
||||
event_id = generate_fake_event_id()
|
||||
|
||||
set_tag("event_id", event_id)
|
||||
ret = {"event_id": event_id}
|
||||
@@ -377,7 +378,7 @@ class RoomSendEventRestServlet(TransactionRestServlet):
|
||||
)
|
||||
event_id = event.event_id
|
||||
except ShadowBanError:
|
||||
event_id = "$" + random_string(43)
|
||||
event_id = generate_fake_event_id()
|
||||
|
||||
set_tag("event_id", event_id)
|
||||
return 200, {"event_id": event_id}
|
||||
@@ -1193,7 +1194,7 @@ class RoomRedactEventRestServlet(TransactionRestServlet):
|
||||
|
||||
event_id = event.event_id
|
||||
except ShadowBanError:
|
||||
event_id = "$" + random_string(43)
|
||||
event_id = generate_fake_event_id()
|
||||
|
||||
set_tag("event_id", event_id)
|
||||
return 200, {"event_id": event_id}
|
||||
|
||||
@@ -52,9 +52,9 @@ from synapse.http.servlet import (
|
||||
parse_string,
|
||||
)
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.opentracing import trace_with_opname
|
||||
from synapse.logging.opentracing import log_kv, set_tag, trace_with_opname
|
||||
from synapse.rest.admin.experimental_features import ExperimentalFeature
|
||||
from synapse.types import JsonDict, Requester, StreamToken
|
||||
from synapse.types import JsonDict, Requester, SlidingSyncStreamToken, StreamToken
|
||||
from synapse.types.rest.client import SlidingSyncBody
|
||||
from synapse.util import json_decoder
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
@@ -881,7 +881,6 @@ class SlidingSyncRestServlet(RestServlet):
|
||||
)
|
||||
|
||||
user = requester.user
|
||||
device_id = requester.device_id
|
||||
|
||||
timeout = parse_integer(request, "timeout", default=0)
|
||||
# Position in the stream
|
||||
@@ -889,22 +888,50 @@ class SlidingSyncRestServlet(RestServlet):
|
||||
|
||||
from_token = None
|
||||
if from_token_string is not None:
|
||||
from_token = await StreamToken.from_string(self.store, from_token_string)
|
||||
from_token = await SlidingSyncStreamToken.from_string(
|
||||
self.store, from_token_string
|
||||
)
|
||||
|
||||
# TODO: We currently don't know whether we're going to use sticky params or
|
||||
# maybe some filters like sync v2 where they are built up once and referenced
|
||||
# by filter ID. For now, we will just prototype with always passing everything
|
||||
# in.
|
||||
body = parse_and_validate_json_object_from_request(request, SlidingSyncBody)
|
||||
logger.info("Sliding sync request: %r", body)
|
||||
|
||||
# Tag and log useful data to differentiate requests.
|
||||
set_tag(
|
||||
"sliding_sync.sync_type", "initial" if from_token is None else "incremental"
|
||||
)
|
||||
set_tag("sliding_sync.conn_id", body.conn_id or "")
|
||||
log_kv(
|
||||
{
|
||||
"sliding_sync.lists": {
|
||||
list_name: {
|
||||
"ranges": list_config.ranges,
|
||||
"timeline_limit": list_config.timeline_limit,
|
||||
}
|
||||
for list_name, list_config in (body.lists or {}).items()
|
||||
},
|
||||
"sliding_sync.room_subscriptions": list(
|
||||
(body.room_subscriptions or {}).keys()
|
||||
),
|
||||
# We also include the number of room subscriptions because logs are
|
||||
# limited to 1024 characters and the large room ID list above can be cut
|
||||
# off.
|
||||
"sliding_sync.num_room_subscriptions": len(
|
||||
(body.room_subscriptions or {}).keys()
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
sync_config = SlidingSyncConfig(
|
||||
user=user,
|
||||
device_id=device_id,
|
||||
requester=requester,
|
||||
# FIXME: Currently, we're just manually copying the fields from the
|
||||
# `SlidingSyncBody` into the config. How can we gurantee into the future
|
||||
# `SlidingSyncBody` into the config. How can we guarantee into the future
|
||||
# that we don't forget any? I would like something more structured like
|
||||
# `copy_attributes(from=body, to=config)`
|
||||
conn_id=body.conn_id,
|
||||
lists=body.lists,
|
||||
room_subscriptions=body.room_subscriptions,
|
||||
extensions=body.extensions,
|
||||
@@ -927,7 +954,6 @@ class SlidingSyncRestServlet(RestServlet):
|
||||
|
||||
return 200, response_content
|
||||
|
||||
# TODO: Is there a better way to encode things?
|
||||
async def encode_response(
|
||||
self,
|
||||
requester: Requester,
|
||||
@@ -942,7 +968,9 @@ class SlidingSyncRestServlet(RestServlet):
|
||||
response["rooms"] = await self.encode_rooms(
|
||||
requester, sliding_sync_result.rooms
|
||||
)
|
||||
response["extensions"] = {} # TODO: sliding_sync_result.extensions
|
||||
response["extensions"] = await self.encode_extensions(
|
||||
requester, sliding_sync_result.extensions
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
@@ -995,8 +1023,21 @@ class SlidingSyncRestServlet(RestServlet):
|
||||
if room_result.avatar:
|
||||
serialized_rooms[room_id]["avatar"] = room_result.avatar
|
||||
|
||||
if room_result.heroes:
|
||||
serialized_rooms[room_id]["heroes"] = room_result.heroes
|
||||
if room_result.heroes is not None and len(room_result.heroes) > 0:
|
||||
serialized_heroes = []
|
||||
for hero in room_result.heroes:
|
||||
serialized_hero = {
|
||||
"user_id": hero.user_id,
|
||||
}
|
||||
if hero.display_name is not None:
|
||||
# Not a typo, just how "displayname" is spelled in the spec
|
||||
serialized_hero["displayname"] = hero.display_name
|
||||
|
||||
if hero.avatar_url is not None:
|
||||
serialized_hero["avatar_url"] = hero.avatar_url
|
||||
|
||||
serialized_heroes.append(serialized_hero)
|
||||
serialized_rooms[room_id]["heroes"] = serialized_heroes
|
||||
|
||||
# We should only include the `initial` key if it's `True` to save bandwidth.
|
||||
# The absense of this flag means `False`.
|
||||
@@ -1004,7 +1045,10 @@ class SlidingSyncRestServlet(RestServlet):
|
||||
serialized_rooms[room_id]["initial"] = room_result.initial
|
||||
|
||||
# This will be omitted for invite/knock rooms with `stripped_state`
|
||||
if room_result.required_state is not None:
|
||||
if (
|
||||
room_result.required_state is not None
|
||||
and len(room_result.required_state) > 0
|
||||
):
|
||||
serialized_required_state = (
|
||||
await self.event_serializer.serialize_events(
|
||||
room_result.required_state,
|
||||
@@ -1015,7 +1059,10 @@ class SlidingSyncRestServlet(RestServlet):
|
||||
serialized_rooms[room_id]["required_state"] = serialized_required_state
|
||||
|
||||
# This will be omitted for invite/knock rooms with `stripped_state`
|
||||
if room_result.timeline_events is not None:
|
||||
if (
|
||||
room_result.timeline_events is not None
|
||||
and len(room_result.timeline_events) > 0
|
||||
):
|
||||
serialized_timeline = await self.event_serializer.serialize_events(
|
||||
room_result.timeline_events,
|
||||
time_now,
|
||||
@@ -1043,7 +1090,10 @@ class SlidingSyncRestServlet(RestServlet):
|
||||
serialized_rooms[room_id]["is_dm"] = room_result.is_dm
|
||||
|
||||
# Stripped state only applies to invite/knock rooms
|
||||
if room_result.stripped_state is not None:
|
||||
if (
|
||||
room_result.stripped_state is not None
|
||||
and len(room_result.stripped_state) > 0
|
||||
):
|
||||
# TODO: `knocked_state` but that isn't specced yet.
|
||||
#
|
||||
# TODO: Instead of adding `knocked_state`, it would be good to rename
|
||||
@@ -1054,6 +1104,73 @@ class SlidingSyncRestServlet(RestServlet):
|
||||
|
||||
return serialized_rooms
|
||||
|
||||
async def encode_extensions(
|
||||
self, requester: Requester, extensions: SlidingSyncResult.Extensions
|
||||
) -> JsonDict:
|
||||
serialized_extensions: JsonDict = {}
|
||||
|
||||
if extensions.to_device is not None:
|
||||
serialized_extensions["to_device"] = {
|
||||
"next_batch": extensions.to_device.next_batch,
|
||||
"events": extensions.to_device.events,
|
||||
}
|
||||
|
||||
if extensions.e2ee is not None:
|
||||
serialized_extensions["e2ee"] = {
|
||||
# We always include this because
|
||||
# https://github.com/vector-im/element-android/issues/3725. The spec
|
||||
# isn't terribly clear on when this can be omitted and how a client
|
||||
# would tell the difference between "no keys present" and "nothing
|
||||
# changed" in terms of whole field absent / individual key type entry
|
||||
# absent Corresponding synapse issue:
|
||||
# https://github.com/matrix-org/synapse/issues/10456
|
||||
"device_one_time_keys_count": extensions.e2ee.device_one_time_keys_count,
|
||||
# https://github.com/matrix-org/matrix-doc/blob/54255851f642f84a4f1aaf7bc063eebe3d76752b/proposals/2732-olm-fallback-keys.md
|
||||
# states that this field should always be included, as long as the
|
||||
# server supports the feature.
|
||||
"device_unused_fallback_key_types": extensions.e2ee.device_unused_fallback_key_types,
|
||||
}
|
||||
|
||||
if extensions.e2ee.device_list_updates is not None:
|
||||
serialized_extensions["e2ee"]["device_lists"] = {}
|
||||
|
||||
serialized_extensions["e2ee"]["device_lists"]["changed"] = list(
|
||||
extensions.e2ee.device_list_updates.changed
|
||||
)
|
||||
serialized_extensions["e2ee"]["device_lists"]["left"] = list(
|
||||
extensions.e2ee.device_list_updates.left
|
||||
)
|
||||
|
||||
if extensions.account_data is not None:
|
||||
serialized_extensions["account_data"] = {
|
||||
# Same as the the top-level `account_data.events` field in Sync v2.
|
||||
"global": [
|
||||
{"type": account_data_type, "content": content}
|
||||
for account_data_type, content in extensions.account_data.global_account_data_map.items()
|
||||
],
|
||||
# Same as the joined room's account_data field in Sync v2, e.g the path
|
||||
# `rooms.join["!foo:bar"].account_data.events`.
|
||||
"rooms": {
|
||||
room_id: [
|
||||
{"type": account_data_type, "content": content}
|
||||
for account_data_type, content in event_map.items()
|
||||
]
|
||||
for room_id, event_map in extensions.account_data.account_data_by_room_map.items()
|
||||
},
|
||||
}
|
||||
|
||||
if extensions.receipts is not None:
|
||||
serialized_extensions["receipts"] = {
|
||||
"rooms": extensions.receipts.room_id_to_receipt_map,
|
||||
}
|
||||
|
||||
if extensions.typing is not None:
|
||||
serialized_extensions["typing"] = {
|
||||
"rooms": extensions.typing.room_id_to_typing_map,
|
||||
}
|
||||
|
||||
return serialized_extensions
|
||||
|
||||
|
||||
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||
SyncRestServlet(hs).register(http_server)
|
||||
|
||||
@@ -64,6 +64,7 @@ class VersionsRestServlet(RestServlet):
|
||||
|
||||
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||
msc3881_enabled = self.config.experimental.msc3881_enabled
|
||||
msc3575_enabled = self.config.experimental.msc3575_enabled
|
||||
|
||||
if self.auth.has_access_token(request):
|
||||
requester = await self.auth.get_user_by_req(
|
||||
@@ -77,6 +78,9 @@ class VersionsRestServlet(RestServlet):
|
||||
msc3881_enabled = await self.store.is_feature_enabled(
|
||||
user_id, ExperimentalFeature.MSC3881
|
||||
)
|
||||
msc3575_enabled = await self.store.is_feature_enabled(
|
||||
user_id, ExperimentalFeature.MSC3575
|
||||
)
|
||||
|
||||
return (
|
||||
200,
|
||||
@@ -169,6 +173,8 @@ class VersionsRestServlet(RestServlet):
|
||||
),
|
||||
# MSC4151: Report room API (Client-Server API)
|
||||
"org.matrix.msc4151": self.config.experimental.msc4151_enabled,
|
||||
# Simplified sliding sync
|
||||
"org.matrix.simplified_msc3575": msc3575_enabled,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
@@ -84,7 +84,7 @@ class DownloadResource(RestServlet):
|
||||
|
||||
if self._is_mine_server_name(server_name):
|
||||
await self.media_repo.get_local_media(
|
||||
request, media_id, file_name, max_timeout_ms
|
||||
request, media_id, file_name, max_timeout_ms, allow_authenticated=False
|
||||
)
|
||||
else:
|
||||
allow_remote = parse_boolean(request, "allow_remote", default=True)
|
||||
@@ -106,4 +106,5 @@ class DownloadResource(RestServlet):
|
||||
max_timeout_ms,
|
||||
ip_address,
|
||||
False,
|
||||
allow_authenticated=False,
|
||||
)
|
||||
|
||||
@@ -96,6 +96,7 @@ class ThumbnailResource(RestServlet):
|
||||
m_type,
|
||||
max_timeout_ms,
|
||||
False,
|
||||
allow_authenticated=False,
|
||||
)
|
||||
else:
|
||||
await self.thumbnail_provider.respond_local_thumbnail(
|
||||
@@ -107,6 +108,7 @@ class ThumbnailResource(RestServlet):
|
||||
m_type,
|
||||
max_timeout_ms,
|
||||
False,
|
||||
allow_authenticated=False,
|
||||
)
|
||||
self.media_repo.mark_recently_accessed(None, media_id)
|
||||
else:
|
||||
@@ -134,6 +136,7 @@ class ThumbnailResource(RestServlet):
|
||||
m_type,
|
||||
max_timeout_ms,
|
||||
ip_address,
|
||||
False,
|
||||
use_federation=False,
|
||||
allow_authenticated=False,
|
||||
)
|
||||
self.media_repo.mark_recently_accessed(server_name, media_id)
|
||||
|
||||
@@ -34,6 +34,7 @@ from typing_extensions import TypeAlias
|
||||
|
||||
from twisted.internet.interfaces import IOpenSSLContextFactory
|
||||
from twisted.internet.tcp import Port
|
||||
from twisted.python.threadpool import ThreadPool
|
||||
from twisted.web.iweb import IPolicyForHTTPS
|
||||
from twisted.web.resource import Resource
|
||||
|
||||
@@ -123,6 +124,7 @@ from synapse.http.client import (
|
||||
)
|
||||
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
|
||||
from synapse.media.media_repository import MediaRepository
|
||||
from synapse.metrics import register_threadpool
|
||||
from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager
|
||||
from synapse.module_api import ModuleApi
|
||||
from synapse.module_api.callbacks import ModuleApiCallbacks
|
||||
@@ -559,6 +561,7 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
def get_sync_handler(self) -> SyncHandler:
|
||||
return SyncHandler(self)
|
||||
|
||||
@cache_in_self
|
||||
def get_sliding_sync_handler(self) -> SlidingSyncHandler:
|
||||
return SlidingSyncHandler(self)
|
||||
|
||||
@@ -940,3 +943,24 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
@cache_in_self
|
||||
def get_task_scheduler(self) -> TaskScheduler:
|
||||
return TaskScheduler(self)
|
||||
|
||||
@cache_in_self
|
||||
def get_media_sender_thread_pool(self) -> ThreadPool:
|
||||
"""Fetch the threadpool used to read files when responding to media
|
||||
download requests."""
|
||||
|
||||
# We can choose a large threadpool size as these threads predominately
|
||||
# do IO rather than CPU work.
|
||||
media_threadpool = ThreadPool(
|
||||
name="media_threadpool", minthreads=1, maxthreads=50
|
||||
)
|
||||
|
||||
media_threadpool.start()
|
||||
self.get_reactor().addSystemEventTrigger(
|
||||
"during", "shutdown", media_threadpool.stop
|
||||
)
|
||||
|
||||
# Register the threadpool with our metrics.
|
||||
register_threadpool("media", media_threadpool)
|
||||
|
||||
return media_threadpool
|
||||
|
||||
@@ -120,10 +120,15 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||
"get_user_in_room_with_profile", (room_id, user_id)
|
||||
)
|
||||
self._attempt_to_invalidate_cache("get_rooms_for_user", (user_id,))
|
||||
self._attempt_to_invalidate_cache(
|
||||
"_get_rooms_for_local_user_where_membership_is_inner", (user_id,)
|
||||
)
|
||||
|
||||
# Purge other caches based on room state.
|
||||
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_room_type", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
|
||||
|
||||
def _invalidate_state_caches_all(self, room_id: str) -> None:
|
||||
"""Invalidates caches that are based on the current state, but does
|
||||
@@ -146,7 +151,12 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||
self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
|
||||
self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
|
||||
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"_get_rooms_for_local_user_where_membership_is_inner", None
|
||||
)
|
||||
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_room_type", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
|
||||
|
||||
def _attempt_to_invalidate_cache(
|
||||
self, cache_name: str, key: Optional[Collection[Any]]
|
||||
|
||||
@@ -268,13 +268,23 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
|
||||
|
||||
if data.type == EventTypes.Member:
|
||||
self.get_rooms_for_user.invalidate((data.state_key,)) # type: ignore[attr-defined]
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_rooms_for_user", (data.state_key,)
|
||||
)
|
||||
elif data.type == EventTypes.RoomEncryption:
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_room_encryption", (data.room_id,)
|
||||
)
|
||||
elif data.type == EventTypes.Create:
|
||||
self._attempt_to_invalidate_cache("get_room_type", (data.room_id,))
|
||||
elif row.type == EventsStreamAllStateRow.TypeId:
|
||||
assert isinstance(data, EventsStreamAllStateRow)
|
||||
# Similar to the above, but the entire caches are invalidated. This is
|
||||
# unfortunate for the membership caches, but should recover quickly.
|
||||
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
|
||||
self.get_rooms_for_user.invalidate_all() # type: ignore[attr-defined]
|
||||
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache("get_room_type", (data.room_id,))
|
||||
self._attempt_to_invalidate_cache("get_room_encryption", (data.room_id,))
|
||||
else:
|
||||
raise Exception("Unknown events stream row type %s" % (row.type,))
|
||||
|
||||
@@ -331,6 +341,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
"get_invited_rooms_for_local_user", (state_key,)
|
||||
)
|
||||
self._attempt_to_invalidate_cache("get_rooms_for_user", (state_key,))
|
||||
self._attempt_to_invalidate_cache(
|
||||
"_get_rooms_for_local_user_where_membership_is_inner", (state_key,)
|
||||
)
|
||||
|
||||
self._attempt_to_invalidate_cache(
|
||||
"did_forget",
|
||||
@@ -342,6 +355,10 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_forgotten_rooms_for_user", (state_key,)
|
||||
)
|
||||
elif etype == EventTypes.Create:
|
||||
self._attempt_to_invalidate_cache("get_room_type", (room_id,))
|
||||
elif etype == EventTypes.RoomEncryption:
|
||||
self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
|
||||
|
||||
if relates_to:
|
||||
self._attempt_to_invalidate_cache(
|
||||
@@ -393,12 +410,17 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
|
||||
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
|
||||
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"_get_rooms_for_local_user_where_membership_is_inner", None
|
||||
)
|
||||
self._attempt_to_invalidate_cache("did_forget", None)
|
||||
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache("get_references_for_event", None)
|
||||
self._attempt_to_invalidate_cache("get_thread_summary", None)
|
||||
self._attempt_to_invalidate_cache("get_thread_participated", None)
|
||||
self._attempt_to_invalidate_cache("get_threads", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_room_type", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
|
||||
|
||||
self._attempt_to_invalidate_cache("_get_state_group_for_event", None)
|
||||
|
||||
@@ -451,6 +473,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
|
||||
self._attempt_to_invalidate_cache("get_room_version_id", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_room_type", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
|
||||
|
||||
# And delete state caches.
|
||||
|
||||
|
||||
@@ -1313,6 +1313,11 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||
# We want to make the cache more effective, so we clamp to the last
|
||||
# change before the given ordering.
|
||||
last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id) # type: ignore[attr-defined]
|
||||
if last_change is None:
|
||||
# If the room isn't in the cache we know that the last change was
|
||||
# somewhere before the earliest known position of the cache, so we
|
||||
# can clamp to that.
|
||||
last_change = self._events_stream_cache.get_earliest_known_position() # type: ignore[attr-defined]
|
||||
|
||||
# We don't always have a full stream_to_exterm_id table, e.g. after
|
||||
# the upgrade that introduced it, so we make sure we never ask for a
|
||||
|
||||
@@ -64,6 +64,7 @@ class LocalMedia:
|
||||
quarantined_by: Optional[str]
|
||||
safe_from_quarantine: bool
|
||||
user_id: Optional[str]
|
||||
authenticated: Optional[bool]
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
@@ -77,6 +78,7 @@ class RemoteMedia:
|
||||
created_ts: int
|
||||
last_access_ts: int
|
||||
quarantined_by: Optional[str]
|
||||
authenticated: Optional[bool]
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
@@ -218,6 +220,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
||||
"last_access_ts",
|
||||
"safe_from_quarantine",
|
||||
"user_id",
|
||||
"authenticated",
|
||||
),
|
||||
allow_none=True,
|
||||
desc="get_local_media",
|
||||
@@ -235,6 +238,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
||||
last_access_ts=row[6],
|
||||
safe_from_quarantine=row[7],
|
||||
user_id=row[8],
|
||||
authenticated=row[9],
|
||||
)
|
||||
|
||||
async def get_local_media_by_user_paginate(
|
||||
@@ -290,7 +294,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
||||
last_access_ts,
|
||||
quarantined_by,
|
||||
safe_from_quarantine,
|
||||
user_id
|
||||
user_id,
|
||||
authenticated
|
||||
FROM local_media_repository
|
||||
WHERE user_id = ?
|
||||
ORDER BY {order_by_column} {order}, media_id ASC
|
||||
@@ -314,6 +319,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
||||
quarantined_by=row[7],
|
||||
safe_from_quarantine=bool(row[8]),
|
||||
user_id=row[9],
|
||||
authenticated=row[10],
|
||||
)
|
||||
for row in txn
|
||||
]
|
||||
@@ -417,12 +423,18 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
||||
time_now_ms: int,
|
||||
user_id: UserID,
|
||||
) -> None:
|
||||
if self.hs.config.media.enable_authenticated_media:
|
||||
authenticated = True
|
||||
else:
|
||||
authenticated = False
|
||||
|
||||
await self.db_pool.simple_insert(
|
||||
"local_media_repository",
|
||||
{
|
||||
"media_id": media_id,
|
||||
"created_ts": time_now_ms,
|
||||
"user_id": user_id.to_string(),
|
||||
"authenticated": authenticated,
|
||||
},
|
||||
desc="store_local_media_id",
|
||||
)
|
||||
@@ -438,6 +450,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
||||
user_id: UserID,
|
||||
url_cache: Optional[str] = None,
|
||||
) -> None:
|
||||
if self.hs.config.media.enable_authenticated_media:
|
||||
authenticated = True
|
||||
else:
|
||||
authenticated = False
|
||||
|
||||
await self.db_pool.simple_insert(
|
||||
"local_media_repository",
|
||||
{
|
||||
@@ -448,6 +465,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
||||
"media_length": media_length,
|
||||
"user_id": user_id.to_string(),
|
||||
"url_cache": url_cache,
|
||||
"authenticated": authenticated,
|
||||
},
|
||||
desc="store_local_media",
|
||||
)
|
||||
@@ -638,6 +656,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
||||
"filesystem_id",
|
||||
"last_access_ts",
|
||||
"quarantined_by",
|
||||
"authenticated",
|
||||
),
|
||||
allow_none=True,
|
||||
desc="get_cached_remote_media",
|
||||
@@ -654,6 +673,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
||||
filesystem_id=row[4],
|
||||
last_access_ts=row[5],
|
||||
quarantined_by=row[6],
|
||||
authenticated=row[7],
|
||||
)
|
||||
|
||||
async def store_cached_remote_media(
|
||||
@@ -666,6 +686,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
||||
upload_name: Optional[str],
|
||||
filesystem_id: str,
|
||||
) -> None:
|
||||
if self.hs.config.media.enable_authenticated_media:
|
||||
authenticated = True
|
||||
else:
|
||||
authenticated = False
|
||||
|
||||
await self.db_pool.simple_insert(
|
||||
"remote_media_cache",
|
||||
{
|
||||
@@ -677,6 +702,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
||||
"upload_name": upload_name,
|
||||
"filesystem_id": filesystem_id,
|
||||
"last_access_ts": time_now_ms,
|
||||
"authenticated": authenticated,
|
||||
},
|
||||
desc="store_cached_remote_media",
|
||||
)
|
||||
|
||||
@@ -144,6 +144,16 @@ class ProfileWorkerStore(SQLBaseStore):
|
||||
return 50
|
||||
|
||||
async def get_profileinfo(self, user_id: UserID) -> ProfileInfo:
|
||||
"""
|
||||
Fetch the display name and avatar URL of a user.
|
||||
|
||||
Args:
|
||||
user_id: The user ID to fetch the profile for.
|
||||
|
||||
Returns:
|
||||
The user's display name and avatar URL. Values may be null if unset
|
||||
or if the user doesn't exist.
|
||||
"""
|
||||
profile = await self.db_pool.simple_select_one(
|
||||
table="profiles",
|
||||
keyvalues={"full_user_id": user_id.to_string()},
|
||||
@@ -158,6 +168,15 @@ class ProfileWorkerStore(SQLBaseStore):
|
||||
return ProfileInfo(avatar_url=profile[1], display_name=profile[0])
|
||||
|
||||
async def get_profile_displayname(self, user_id: UserID) -> Optional[str]:
|
||||
"""
|
||||
Fetch the display name of a user.
|
||||
|
||||
Args:
|
||||
user_id: The user to get the display name for.
|
||||
|
||||
Raises:
|
||||
404 if the user does not exist.
|
||||
"""
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="profiles",
|
||||
keyvalues={"full_user_id": user_id.to_string()},
|
||||
@@ -166,6 +185,15 @@ class ProfileWorkerStore(SQLBaseStore):
|
||||
)
|
||||
|
||||
async def get_profile_avatar_url(self, user_id: UserID) -> Optional[str]:
|
||||
"""
|
||||
Fetch the avatar URL of a user.
|
||||
|
||||
Args:
|
||||
user_id: The user to get the avatar URL for.
|
||||
|
||||
Raises:
|
||||
404 if the user does not exist.
|
||||
"""
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="profiles",
|
||||
keyvalues={"full_user_id": user_id.to_string()},
|
||||
@@ -174,6 +202,12 @@ class ProfileWorkerStore(SQLBaseStore):
|
||||
)
|
||||
|
||||
async def create_profile(self, user_id: UserID) -> None:
|
||||
"""
|
||||
Create a blank profile for a user.
|
||||
|
||||
Args:
|
||||
user_id: The user to create the profile for.
|
||||
"""
|
||||
user_localpart = user_id.localpart
|
||||
await self.db_pool.simple_insert(
|
||||
table="profiles",
|
||||
|
||||
@@ -39,6 +39,7 @@ from typing import (
|
||||
import attr
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.logging.opentracing import trace
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
|
||||
@@ -279,8 +280,19 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
|
||||
@cached(max_entries=100000) # type: ignore[synapse-@cached-mutable]
|
||||
async def get_room_summary(self, room_id: str) -> Mapping[str, MemberSummary]:
|
||||
"""Get the details of a room roughly suitable for use by the room
|
||||
"""
|
||||
Get the details of a room roughly suitable for use by the room
|
||||
summary extension to /sync. Useful when lazy loading room members.
|
||||
|
||||
Returns the total count of members in the room by membership type, and a
|
||||
truncated list of members (the heroes). This will be the first 6 members of the
|
||||
room:
|
||||
- We want 5 heroes plus 1, in case one of them is the
|
||||
calling user.
|
||||
- They are ordered by `stream_ordering`, which are joined or
|
||||
invited. When no joined or invited members are available, this also includes
|
||||
banned and left users.
|
||||
|
||||
Args:
|
||||
room_id: The room ID to query
|
||||
Returns:
|
||||
@@ -308,23 +320,36 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
for count, membership in txn:
|
||||
res.setdefault(membership, MemberSummary([], count))
|
||||
|
||||
# we order by membership and then fairly arbitrarily by event_id so
|
||||
# heroes are consistent
|
||||
# Note, rejected events will have a null membership field, so
|
||||
# we we manually filter them out.
|
||||
# Order by membership (joins -> invites -> leave (former insiders) ->
|
||||
# everything else (outsiders like bans/knocks), then by `stream_ordering` so
|
||||
# the first members in the room show up first and to make the sort stable
|
||||
# (consistent heroes).
|
||||
#
|
||||
# Note: rejected events will have a null membership field, so we we manually
|
||||
# filter them out.
|
||||
sql = """
|
||||
SELECT state_key, membership, event_id
|
||||
FROM current_state_events
|
||||
WHERE type = 'm.room.member' AND room_id = ?
|
||||
AND membership IS NOT NULL
|
||||
ORDER BY
|
||||
CASE membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
|
||||
event_id ASC
|
||||
CASE membership WHEN ? THEN 1 WHEN ? THEN 2 WHEN ? THEN 3 ELSE 4 END ASC,
|
||||
event_stream_ordering ASC
|
||||
LIMIT ?
|
||||
"""
|
||||
|
||||
# 6 is 5 (number of heroes) plus 1, in case one of them is the calling user.
|
||||
txn.execute(sql, (room_id, Membership.JOIN, Membership.INVITE, 6))
|
||||
txn.execute(
|
||||
sql,
|
||||
(
|
||||
room_id,
|
||||
# Sort order
|
||||
Membership.JOIN,
|
||||
Membership.INVITE,
|
||||
Membership.LEAVE,
|
||||
# 6 is 5 (number of heroes) plus 1, in case one of them is the calling user.
|
||||
6,
|
||||
),
|
||||
)
|
||||
for user_id, membership, event_id in txn:
|
||||
summary = res[membership]
|
||||
# we will always have a summary for this membership type at this
|
||||
@@ -398,6 +423,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
return invite
|
||||
return None
|
||||
|
||||
@trace
|
||||
async def get_rooms_for_local_user_where_membership_is(
|
||||
self,
|
||||
user_id: str,
|
||||
@@ -421,9 +447,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
if not membership_list:
|
||||
return []
|
||||
|
||||
rooms = await self.db_pool.runInteraction(
|
||||
"get_rooms_for_local_user_where_membership_is",
|
||||
self._get_rooms_for_local_user_where_membership_is_txn,
|
||||
# Convert membership list to frozen set as a) it needs to be hashable,
|
||||
# and b) we don't care about the order.
|
||||
membership_list = frozenset(membership_list)
|
||||
|
||||
rooms = await self._get_rooms_for_local_user_where_membership_is_inner(
|
||||
user_id,
|
||||
membership_list,
|
||||
)
|
||||
@@ -442,6 +470,24 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
|
||||
return [room for room in rooms if room.room_id not in rooms_to_exclude]
|
||||
|
||||
@cached(max_entries=1000, tree=True)
|
||||
async def _get_rooms_for_local_user_where_membership_is_inner(
|
||||
self,
|
||||
user_id: str,
|
||||
membership_list: Collection[str],
|
||||
) -> Sequence[RoomsForUser]:
|
||||
if not membership_list:
|
||||
return []
|
||||
|
||||
rooms = await self.db_pool.runInteraction(
|
||||
"get_rooms_for_local_user_where_membership_is",
|
||||
self._get_rooms_for_local_user_where_membership_is_txn,
|
||||
user_id,
|
||||
membership_list,
|
||||
)
|
||||
|
||||
return rooms
|
||||
|
||||
def _get_rooms_for_local_user_where_membership_is_txn(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
@@ -1509,10 +1555,19 @@ def extract_heroes_from_room_summary(
|
||||
) -> List[str]:
|
||||
"""Determine the users that represent a room, from the perspective of the `me` user.
|
||||
|
||||
This function expects `MemberSummary.members` to already be sorted by
|
||||
`stream_ordering` like the results from `get_room_summary(...)`.
|
||||
|
||||
The rules which say which users we select are specified in the "Room Summary"
|
||||
section of
|
||||
https://spec.matrix.org/v1.4/client-server-api/#get_matrixclientv3sync
|
||||
|
||||
|
||||
Args:
|
||||
details: Mapping from membership type to member summary. We expect
|
||||
`MemberSummary.members` to already be sorted by `stream_ordering`.
|
||||
me: The user for whom we are determining the heroes for.
|
||||
|
||||
Returns a list (possibly empty) of heroes' mxids.
|
||||
"""
|
||||
empty_ms = MemberSummary([], 0)
|
||||
@@ -1527,11 +1582,11 @@ def extract_heroes_from_room_summary(
|
||||
r[0] for r in details.get(Membership.LEAVE, empty_ms).members if r[0] != me
|
||||
] + [r[0] for r in details.get(Membership.BAN, empty_ms).members if r[0] != me]
|
||||
|
||||
# FIXME: order by stream ordering rather than as returned by SQL
|
||||
# We expect `MemberSummary.members` to already be sorted by `stream_ordering`
|
||||
if joined_user_ids or invited_user_ids:
|
||||
return sorted(joined_user_ids + invited_user_ids)[0:5]
|
||||
return (joined_user_ids + invited_user_ids)[0:5]
|
||||
else:
|
||||
return sorted(gone_user_ids)[0:5]
|
||||
return gone_user_ids[0:5]
|
||||
|
||||
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
|
||||
@@ -30,6 +30,7 @@ from typing import (
|
||||
Iterable,
|
||||
List,
|
||||
Mapping,
|
||||
MutableMapping,
|
||||
Optional,
|
||||
Set,
|
||||
Tuple,
|
||||
@@ -41,7 +42,7 @@ from typing import (
|
||||
|
||||
import attr
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.constants import EventContentFields, EventTypes, Membership
|
||||
from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
|
||||
from synapse.events import EventBase
|
||||
@@ -72,10 +73,18 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
_T = TypeVar("_T")
|
||||
|
||||
|
||||
MAX_STATE_DELTA_HOPS = 100
|
||||
|
||||
|
||||
# Freeze so it's immutable and we can use it as a cache value
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class Sentinel:
|
||||
pass
|
||||
|
||||
|
||||
ROOM_UNKNOWN_SENTINEL = Sentinel()
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class EventMetadata:
|
||||
"""Returned by `get_metadata_for_events`"""
|
||||
@@ -298,6 +307,194 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
create_event = await self.get_event(create_id)
|
||||
return create_event
|
||||
|
||||
@cached(max_entries=10000)
|
||||
async def get_room_type(self, room_id: str) -> Optional[str]:
|
||||
raise NotImplementedError()
|
||||
|
||||
@cachedList(cached_method_name="get_room_type", list_name="room_ids")
|
||||
async def bulk_get_room_type(
|
||||
self, room_ids: Set[str]
|
||||
) -> Mapping[str, Union[Optional[str], Sentinel]]:
|
||||
"""
|
||||
Bulk fetch room types for the given rooms (via current state).
|
||||
|
||||
Since this function is cached, any missing values would be cached as `None`. In
|
||||
order to distinguish between an unencrypted room that has `None` encryption and
|
||||
a room that is unknown to the server where we might want to omit the value
|
||||
(which would make it cached as `None`), instead we use the sentinel value
|
||||
`ROOM_UNKNOWN_SENTINEL`.
|
||||
|
||||
Returns:
|
||||
A mapping from room ID to the room's type (`None` is a valid room type).
|
||||
Rooms unknown to this server will return `ROOM_UNKNOWN_SENTINEL`.
|
||||
"""
|
||||
|
||||
def txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> MutableMapping[str, Union[Optional[str], Sentinel]]:
|
||||
clause, args = make_in_list_sql_clause(
|
||||
txn.database_engine, "room_id", room_ids
|
||||
)
|
||||
|
||||
# We can't rely on `room_stats_state.room_type` if the server has left the
|
||||
# room because the `room_id` will still be in the table but everything will
|
||||
# be set to `None` but `None` is a valid room type value. We join against
|
||||
# the `room_stats_current` table which keeps track of the
|
||||
# `current_state_events` count (and a proxy value `local_users_in_room`
|
||||
# which can used to assume the server is participating in the room and has
|
||||
# current state) to ensure that the data in `room_stats_state` is up-to-date
|
||||
# with the current state.
|
||||
#
|
||||
# FIXME: Use `room_stats_current.current_state_events` instead of
|
||||
# `room_stats_current.local_users_in_room` once
|
||||
# https://github.com/element-hq/synapse/issues/17457 is fixed.
|
||||
sql = f"""
|
||||
SELECT room_id, room_type
|
||||
FROM room_stats_state
|
||||
INNER JOIN room_stats_current USING (room_id)
|
||||
WHERE
|
||||
{clause}
|
||||
AND local_users_in_room > 0
|
||||
"""
|
||||
|
||||
txn.execute(sql, args)
|
||||
|
||||
room_id_to_type_map = {}
|
||||
for row in txn:
|
||||
room_id_to_type_map[row[0]] = row[1]
|
||||
|
||||
return room_id_to_type_map
|
||||
|
||||
results = await self.db_pool.runInteraction(
|
||||
"bulk_get_room_type",
|
||||
txn,
|
||||
)
|
||||
|
||||
# If we haven't updated `room_stats_state` with the room yet, query the
|
||||
# create events directly. This should happen only rarely so we don't
|
||||
# mind if we do this in a loop.
|
||||
for room_id in room_ids - results.keys():
|
||||
try:
|
||||
create_event = await self.get_create_event_for_room(room_id)
|
||||
room_type = create_event.content.get(EventContentFields.ROOM_TYPE)
|
||||
results[room_id] = room_type
|
||||
except NotFoundError:
|
||||
# We use the sentinel value to distinguish between `None` which is a
|
||||
# valid room type and a room that is unknown to the server so the value
|
||||
# is just unset.
|
||||
results[room_id] = ROOM_UNKNOWN_SENTINEL
|
||||
|
||||
return results
|
||||
|
||||
@cached(max_entries=10000)
|
||||
async def get_room_encryption(self, room_id: str) -> Optional[str]:
|
||||
raise NotImplementedError()
|
||||
|
||||
@cachedList(cached_method_name="get_room_encryption", list_name="room_ids")
|
||||
async def bulk_get_room_encryption(
|
||||
self, room_ids: Set[str]
|
||||
) -> Mapping[str, Union[Optional[str], Sentinel]]:
|
||||
"""
|
||||
Bulk fetch room encryption for the given rooms (via current state).
|
||||
|
||||
Since this function is cached, any missing values would be cached as `None`. In
|
||||
order to distinguish between an unencrypted room that has `None` encryption and
|
||||
a room that is unknown to the server where we might want to omit the value
|
||||
(which would make it cached as `None`), instead we use the sentinel value
|
||||
`ROOM_UNKNOWN_SENTINEL`.
|
||||
|
||||
Returns:
|
||||
A mapping from room ID to the room's encryption algorithm if the room is
|
||||
encrypted, otherwise `None`. Rooms unknown to this server will return
|
||||
`ROOM_UNKNOWN_SENTINEL`.
|
||||
"""
|
||||
|
||||
def txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> MutableMapping[str, Union[Optional[str], Sentinel]]:
|
||||
clause, args = make_in_list_sql_clause(
|
||||
txn.database_engine, "room_id", room_ids
|
||||
)
|
||||
|
||||
# We can't rely on `room_stats_state.encryption` if the server has left the
|
||||
# room because the `room_id` will still be in the table but everything will
|
||||
# be set to `None` but `None` is a valid encryption value. We join against
|
||||
# the `room_stats_current` table which keeps track of the
|
||||
# `current_state_events` count (and a proxy value `local_users_in_room`
|
||||
# which can used to assume the server is participating in the room and has
|
||||
# current state) to ensure that the data in `room_stats_state` is up-to-date
|
||||
# with the current state.
|
||||
#
|
||||
# FIXME: Use `room_stats_current.current_state_events` instead of
|
||||
# `room_stats_current.local_users_in_room` once
|
||||
# https://github.com/element-hq/synapse/issues/17457 is fixed.
|
||||
sql = f"""
|
||||
SELECT room_id, encryption
|
||||
FROM room_stats_state
|
||||
INNER JOIN room_stats_current USING (room_id)
|
||||
WHERE
|
||||
{clause}
|
||||
AND local_users_in_room > 0
|
||||
"""
|
||||
|
||||
txn.execute(sql, args)
|
||||
|
||||
room_id_to_encryption_map = {}
|
||||
for row in txn:
|
||||
room_id_to_encryption_map[row[0]] = row[1]
|
||||
|
||||
return room_id_to_encryption_map
|
||||
|
||||
results = await self.db_pool.runInteraction(
|
||||
"bulk_get_room_encryption",
|
||||
txn,
|
||||
)
|
||||
|
||||
# If we haven't updated `room_stats_state` with the room yet, query the state
|
||||
# directly. This should happen only rarely so we don't mind if we do this in a
|
||||
# loop.
|
||||
encryption_event_ids: List[str] = []
|
||||
for room_id in room_ids - results.keys():
|
||||
state_map = await self.get_partial_filtered_current_state_ids(
|
||||
room_id,
|
||||
state_filter=StateFilter.from_types(
|
||||
[
|
||||
(EventTypes.Create, ""),
|
||||
(EventTypes.RoomEncryption, ""),
|
||||
]
|
||||
),
|
||||
)
|
||||
# We can use the create event as a canary to tell whether the server has
|
||||
# seen the room before
|
||||
create_event_id = state_map.get((EventTypes.Create, ""))
|
||||
encryption_event_id = state_map.get((EventTypes.RoomEncryption, ""))
|
||||
|
||||
if create_event_id is None:
|
||||
# We use the sentinel value to distinguish between `None` which is a
|
||||
# valid room type and a room that is unknown to the server so the value
|
||||
# is just unset.
|
||||
results[room_id] = ROOM_UNKNOWN_SENTINEL
|
||||
continue
|
||||
|
||||
if encryption_event_id is None:
|
||||
results[room_id] = None
|
||||
else:
|
||||
encryption_event_ids.append(encryption_event_id)
|
||||
|
||||
encryption_event_map = await self.get_events(encryption_event_ids)
|
||||
|
||||
for encryption_event_id in encryption_event_ids:
|
||||
encryption_event = encryption_event_map.get(encryption_event_id)
|
||||
# If the curent state says there is an encryption event, we should have it
|
||||
# in the database.
|
||||
assert encryption_event is not None
|
||||
|
||||
results[encryption_event.room_id] = encryption_event.content.get(
|
||||
EventContentFields.ENCRYPTION_ALGORITHM
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
@cached(max_entries=100000, iterable=True)
|
||||
async def get_partial_current_state_ids(self, room_id: str) -> StateMap[str]:
|
||||
"""Get the current state event ids for a room based on the
|
||||
|
||||
@@ -24,9 +24,13 @@ from typing import List, Optional, Tuple
|
||||
|
||||
import attr
|
||||
|
||||
from synapse.logging.opentracing import trace
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import LoggingTransaction
|
||||
from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause
|
||||
from synapse.storage.databases.main.stream import _filter_results_by_stream
|
||||
from synapse.types import RoomStreamToken, StrCollection
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
from synapse.util.iterutils import batch_iter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -156,3 +160,103 @@ class StateDeltasStore(SQLBaseStore):
|
||||
"get_max_stream_id_in_current_state_deltas",
|
||||
self._get_max_stream_id_in_current_state_deltas_txn,
|
||||
)
|
||||
|
||||
@trace
|
||||
async def get_current_state_deltas_for_room(
|
||||
self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
|
||||
) -> List[StateDelta]:
|
||||
"""Get the state deltas between two tokens."""
|
||||
|
||||
if not self._curr_state_delta_stream_cache.has_entity_changed(
|
||||
room_id, from_token.stream
|
||||
):
|
||||
return []
|
||||
|
||||
def get_current_state_deltas_for_room_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> List[StateDelta]:
|
||||
sql = """
|
||||
SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
|
||||
FROM current_state_delta_stream
|
||||
WHERE room_id = ? AND ? < stream_id AND stream_id <= ?
|
||||
ORDER BY stream_id ASC
|
||||
"""
|
||||
txn.execute(
|
||||
sql, (room_id, from_token.stream, to_token.get_max_stream_pos())
|
||||
)
|
||||
|
||||
return [
|
||||
StateDelta(
|
||||
stream_id=row[1],
|
||||
room_id=room_id,
|
||||
event_type=row[2],
|
||||
state_key=row[3],
|
||||
event_id=row[4],
|
||||
prev_event_id=row[5],
|
||||
)
|
||||
for row in txn
|
||||
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
|
||||
]
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
|
||||
)
|
||||
|
||||
@trace
|
||||
async def get_current_state_deltas_for_rooms(
|
||||
self,
|
||||
room_ids: StrCollection,
|
||||
from_token: RoomStreamToken,
|
||||
to_token: RoomStreamToken,
|
||||
) -> List[StateDelta]:
|
||||
"""Get the state deltas between two tokens for the set of rooms."""
|
||||
|
||||
room_ids = self._curr_state_delta_stream_cache.get_entities_changed(
|
||||
room_ids, from_token.stream
|
||||
)
|
||||
if not room_ids:
|
||||
return []
|
||||
|
||||
def get_current_state_deltas_for_rooms_txn(
|
||||
txn: LoggingTransaction,
|
||||
room_ids: StrCollection,
|
||||
) -> List[StateDelta]:
|
||||
clause, args = make_in_list_sql_clause(
|
||||
self.database_engine, "room_id", room_ids
|
||||
)
|
||||
|
||||
sql = f"""
|
||||
SELECT instance_name, stream_id, room_id, type, state_key, event_id, prev_event_id
|
||||
FROM current_state_delta_stream
|
||||
WHERE {clause} AND ? < stream_id AND stream_id <= ?
|
||||
ORDER BY stream_id ASC
|
||||
"""
|
||||
args.append(from_token.stream)
|
||||
args.append(to_token.get_max_stream_pos())
|
||||
|
||||
txn.execute(sql, args)
|
||||
|
||||
return [
|
||||
StateDelta(
|
||||
stream_id=row[1],
|
||||
room_id=row[2],
|
||||
event_type=row[3],
|
||||
state_key=row[4],
|
||||
event_id=row[5],
|
||||
prev_event_id=row[6],
|
||||
)
|
||||
for row in txn
|
||||
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
|
||||
]
|
||||
|
||||
results = []
|
||||
for batch in batch_iter(room_ids, 1000):
|
||||
deltas = await self.db_pool.runInteraction(
|
||||
"get_current_state_deltas_for_rooms",
|
||||
get_current_state_deltas_for_rooms_txn,
|
||||
batch,
|
||||
)
|
||||
|
||||
results.extend(deltas)
|
||||
|
||||
return results
|
||||
|
||||
@@ -51,6 +51,7 @@ from typing import (
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Protocol,
|
||||
Set,
|
||||
Tuple,
|
||||
cast,
|
||||
@@ -59,7 +60,7 @@ from typing import (
|
||||
|
||||
import attr
|
||||
from immutabledict import immutabledict
|
||||
from typing_extensions import Literal
|
||||
from typing_extensions import Literal, assert_never
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
@@ -67,7 +68,7 @@ from synapse.api.constants import Direction, EventTypes, Membership
|
||||
from synapse.api.filtering import Filter
|
||||
from synapse.events import EventBase
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.logging.opentracing import trace
|
||||
from synapse.logging.opentracing import tag_args, trace
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import (
|
||||
DatabasePool,
|
||||
@@ -78,10 +79,11 @@ from synapse.storage.database import (
|
||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
||||
from synapse.types import PersistedEventPosition, RoomStreamToken
|
||||
from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection
|
||||
from synapse.util.caches.descriptors import cached
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
from synapse.util.cancellation import cancellable
|
||||
from synapse.util.iterutils import batch_iter
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
@@ -96,6 +98,18 @@ _STREAM_TOKEN = "stream"
|
||||
_TOPOLOGICAL_TOKEN = "topological"
|
||||
|
||||
|
||||
class PaginateFunction(Protocol):
|
||||
async def __call__(
|
||||
self,
|
||||
*,
|
||||
room_id: str,
|
||||
from_key: RoomStreamToken,
|
||||
to_key: Optional[RoomStreamToken] = None,
|
||||
direction: Direction = Direction.BACKWARDS,
|
||||
limit: int = 0,
|
||||
) -> Tuple[List[EventBase], RoomStreamToken]: ...
|
||||
|
||||
|
||||
# Used as return values for pagination APIs
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class _EventDictReturn:
|
||||
@@ -279,7 +293,7 @@ def generate_pagination_bounds(
|
||||
|
||||
|
||||
def generate_next_token(
|
||||
direction: Direction, last_topo_ordering: int, last_stream_ordering: int
|
||||
direction: Direction, last_topo_ordering: Optional[int], last_stream_ordering: int
|
||||
) -> RoomStreamToken:
|
||||
"""
|
||||
Generate the next room stream token based on the currently returned data.
|
||||
@@ -446,7 +460,6 @@ def _filter_results_by_stream(
|
||||
The `instance_name` arg is optional to handle historic rows, and is
|
||||
interpreted as if it was "master".
|
||||
"""
|
||||
|
||||
if instance_name is None:
|
||||
instance_name = "master"
|
||||
|
||||
@@ -659,33 +672,43 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
|
||||
async def get_room_events_stream_for_rooms(
|
||||
self,
|
||||
*,
|
||||
room_ids: Collection[str],
|
||||
from_key: RoomStreamToken,
|
||||
to_key: RoomStreamToken,
|
||||
to_key: Optional[RoomStreamToken] = None,
|
||||
direction: Direction = Direction.BACKWARDS,
|
||||
limit: int = 0,
|
||||
order: str = "DESC",
|
||||
) -> Dict[str, Tuple[List[EventBase], RoomStreamToken]]:
|
||||
"""Get new room events in stream ordering since `from_key`.
|
||||
|
||||
Args:
|
||||
room_ids
|
||||
from_key: Token from which no events are returned before
|
||||
to_key: Token from which no events are returned after. (This
|
||||
is typically the current stream token)
|
||||
from_key: The token to stream from (starting point and heading in the given
|
||||
direction)
|
||||
to_key: The token representing the end stream position (end point)
|
||||
limit: Maximum number of events to return
|
||||
order: Either "DESC" or "ASC". Determines which events are
|
||||
returned when the result is limited. If "DESC" then the most
|
||||
recent `limit` events are returned, otherwise returns the
|
||||
oldest `limit` events.
|
||||
direction: Indicates whether we are paginating forwards or backwards
|
||||
from `from_key`.
|
||||
|
||||
Returns:
|
||||
A map from room id to a tuple containing:
|
||||
- list of recent events in the room
|
||||
- stream ordering key for the start of the chunk of events returned.
|
||||
|
||||
When Direction.FORWARDS: from_key < x <= to_key, (ascending order)
|
||||
When Direction.BACKWARDS: from_key >= x > to_key, (descending order)
|
||||
"""
|
||||
room_ids = self._events_stream_cache.get_entities_changed(
|
||||
room_ids, from_key.stream
|
||||
)
|
||||
if direction == Direction.FORWARDS:
|
||||
room_ids = self._events_stream_cache.get_entities_changed(
|
||||
room_ids, from_key.stream
|
||||
)
|
||||
elif direction == Direction.BACKWARDS:
|
||||
if to_key is not None:
|
||||
room_ids = self._events_stream_cache.get_entities_changed(
|
||||
room_ids, to_key.stream
|
||||
)
|
||||
else:
|
||||
assert_never(direction)
|
||||
|
||||
if not room_ids:
|
||||
return {}
|
||||
@@ -697,12 +720,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
defer.gatherResults(
|
||||
[
|
||||
run_in_background(
|
||||
self.get_room_events_stream_for_room,
|
||||
room_id,
|
||||
from_key,
|
||||
to_key,
|
||||
limit,
|
||||
order=order,
|
||||
self.paginate_room_events_by_stream_ordering,
|
||||
room_id=room_id,
|
||||
from_key=from_key,
|
||||
to_key=to_key,
|
||||
direction=direction,
|
||||
limit=limit,
|
||||
)
|
||||
for room_id in rm_ids
|
||||
],
|
||||
@@ -726,69 +749,122 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
if self._events_stream_cache.has_entity_changed(room_id, from_id)
|
||||
}
|
||||
|
||||
async def get_room_events_stream_for_room(
|
||||
async def paginate_room_events_by_stream_ordering(
|
||||
self,
|
||||
*,
|
||||
room_id: str,
|
||||
from_key: RoomStreamToken,
|
||||
to_key: RoomStreamToken,
|
||||
to_key: Optional[RoomStreamToken] = None,
|
||||
direction: Direction = Direction.BACKWARDS,
|
||||
limit: int = 0,
|
||||
order: str = "DESC",
|
||||
) -> Tuple[List[EventBase], RoomStreamToken]:
|
||||
"""Get new room events in stream ordering since `from_key`.
|
||||
"""
|
||||
Paginate events by `stream_ordering` in the room from the `from_key` in the
|
||||
given `direction` to the `to_key` or `limit`.
|
||||
|
||||
Args:
|
||||
room_id
|
||||
from_key: Token from which no events are returned before
|
||||
to_key: Token from which no events are returned after. (This
|
||||
is typically the current stream token)
|
||||
from_key: The token to stream from (starting point and heading in the given
|
||||
direction)
|
||||
to_key: The token representing the end stream position (end point)
|
||||
direction: Indicates whether we are paginating forwards or backwards
|
||||
from `from_key`.
|
||||
limit: Maximum number of events to return
|
||||
order: Either "DESC" or "ASC". Determines which events are
|
||||
returned when the result is limited. If "DESC" then the most
|
||||
recent `limit` events are returned, otherwise returns the
|
||||
oldest `limit` events.
|
||||
|
||||
Returns:
|
||||
The list of events (in ascending stream order) and the token from the start
|
||||
of the chunk of events returned.
|
||||
"""
|
||||
if from_key == to_key:
|
||||
return [], from_key
|
||||
The results as a list of events and a token that points to the end
|
||||
of the result set. If no events are returned then the end of the
|
||||
stream has been reached (i.e. there are no events between `from_key`
|
||||
and `to_key`).
|
||||
|
||||
has_changed = self._events_stream_cache.has_entity_changed(
|
||||
room_id, from_key.stream
|
||||
)
|
||||
When Direction.FORWARDS: from_key < x <= to_key, (ascending order)
|
||||
When Direction.BACKWARDS: from_key >= x > to_key, (descending order)
|
||||
"""
|
||||
|
||||
# FIXME: When going forwards, we should enforce that the `to_key` is not `None`
|
||||
# because we always need an upper bound when querying the events stream (as
|
||||
# otherwise we'll potentially pick up events that are not fully persisted).
|
||||
|
||||
# We should only be working with `stream_ordering` tokens here
|
||||
assert from_key is None or from_key.topological is None
|
||||
assert to_key is None or to_key.topological is None
|
||||
|
||||
# We can bail early if we're looking forwards, and our `to_key` is already
|
||||
# before our `from_key`.
|
||||
if (
|
||||
direction == Direction.FORWARDS
|
||||
and to_key is not None
|
||||
and to_key.is_before_or_eq(from_key)
|
||||
):
|
||||
# Token selection matches what we do below if there are no rows
|
||||
return [], to_key if to_key else from_key
|
||||
# Or vice-versa, if we're looking backwards and our `from_key` is already before
|
||||
# our `to_key`.
|
||||
elif (
|
||||
direction == Direction.BACKWARDS
|
||||
and to_key is not None
|
||||
and from_key.is_before_or_eq(to_key)
|
||||
):
|
||||
# Token selection matches what we do below if there are no rows
|
||||
return [], to_key if to_key else from_key
|
||||
|
||||
# We can do a quick sanity check to see if any events have been sent in the room
|
||||
# since the earlier token.
|
||||
has_changed = True
|
||||
if direction == Direction.FORWARDS:
|
||||
has_changed = self._events_stream_cache.has_entity_changed(
|
||||
room_id, from_key.stream
|
||||
)
|
||||
elif direction == Direction.BACKWARDS:
|
||||
if to_key is not None:
|
||||
has_changed = self._events_stream_cache.has_entity_changed(
|
||||
room_id, to_key.stream
|
||||
)
|
||||
else:
|
||||
assert_never(direction)
|
||||
|
||||
if not has_changed:
|
||||
return [], from_key
|
||||
# Token selection matches what we do below if there are no rows
|
||||
return [], to_key if to_key else from_key
|
||||
|
||||
order, from_bound, to_bound = generate_pagination_bounds(
|
||||
direction, from_key, to_key
|
||||
)
|
||||
|
||||
bounds = generate_pagination_where_clause(
|
||||
direction=direction,
|
||||
# The empty string will shortcut downstream code to only use the
|
||||
# `stream_ordering` column
|
||||
column_names=("", "stream_ordering"),
|
||||
from_token=from_bound,
|
||||
to_token=to_bound,
|
||||
engine=self.database_engine,
|
||||
)
|
||||
|
||||
def f(txn: LoggingTransaction) -> List[_EventDictReturn]:
|
||||
# To handle tokens with a non-empty instance_map we fetch more
|
||||
# results than necessary and then filter down
|
||||
min_from_id = from_key.stream
|
||||
max_to_id = to_key.get_max_stream_pos()
|
||||
|
||||
sql = """
|
||||
SELECT event_id, instance_name, topological_ordering, stream_ordering
|
||||
sql = f"""
|
||||
SELECT event_id, instance_name, stream_ordering
|
||||
FROM events
|
||||
WHERE
|
||||
room_id = ?
|
||||
AND not outlier
|
||||
AND stream_ordering > ? AND stream_ordering <= ?
|
||||
ORDER BY stream_ordering %s LIMIT ?
|
||||
""" % (
|
||||
order,
|
||||
)
|
||||
txn.execute(sql, (room_id, min_from_id, max_to_id, 2 * limit))
|
||||
AND {bounds}
|
||||
ORDER BY stream_ordering {order} LIMIT ?
|
||||
"""
|
||||
txn.execute(sql, (room_id, 2 * limit))
|
||||
|
||||
rows = [
|
||||
_EventDictReturn(event_id, None, stream_ordering)
|
||||
for event_id, instance_name, topological_ordering, stream_ordering in txn
|
||||
if _filter_results(
|
||||
from_key,
|
||||
to_key,
|
||||
instance_name,
|
||||
topological_ordering,
|
||||
stream_ordering,
|
||||
for event_id, instance_name, stream_ordering in txn
|
||||
if _filter_results_by_stream(
|
||||
lower_token=(
|
||||
to_key if direction == Direction.BACKWARDS else from_key
|
||||
),
|
||||
upper_token=(
|
||||
from_key if direction == Direction.BACKWARDS else to_key
|
||||
),
|
||||
instance_name=instance_name,
|
||||
stream_ordering=stream_ordering,
|
||||
)
|
||||
][:limit]
|
||||
return rows
|
||||
@@ -799,18 +875,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
[r.event_id for r in rows], get_prev_content=True
|
||||
)
|
||||
|
||||
if order.lower() == "desc":
|
||||
ret.reverse()
|
||||
|
||||
if rows:
|
||||
key = RoomStreamToken(stream=min(r.stream_ordering for r in rows))
|
||||
next_key = generate_next_token(
|
||||
direction=direction,
|
||||
last_topo_ordering=None,
|
||||
last_stream_ordering=rows[-1].stream_ordering,
|
||||
)
|
||||
else:
|
||||
# Assume we didn't get anything because there was nothing to
|
||||
# get.
|
||||
key = from_key
|
||||
# TODO (erikj): We should work out what to do here instead. (same as
|
||||
# `_paginate_room_events_by_topological_ordering_txn(...)`)
|
||||
next_key = to_key if to_key else from_key
|
||||
|
||||
return ret, key
|
||||
return ret, next_key
|
||||
|
||||
@trace
|
||||
async def get_current_state_delta_membership_changes_for_user(
|
||||
self,
|
||||
user_id: str,
|
||||
@@ -1116,7 +1194,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
|
||||
rows, token = await self.db_pool.runInteraction(
|
||||
"get_recent_event_ids_for_room",
|
||||
self._paginate_room_events_txn,
|
||||
self._paginate_room_events_by_topological_ordering_txn,
|
||||
room_id,
|
||||
from_token=end_token,
|
||||
limit=limit,
|
||||
@@ -1185,6 +1263,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
|
||||
return None
|
||||
|
||||
@trace
|
||||
async def get_last_event_pos_in_room_before_stream_ordering(
|
||||
self,
|
||||
room_id: str,
|
||||
@@ -1293,6 +1372,126 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
get_last_event_pos_in_room_before_stream_ordering_txn,
|
||||
)
|
||||
|
||||
async def bulk_get_last_event_pos_in_room_before_stream_ordering(
|
||||
self,
|
||||
room_ids: StrCollection,
|
||||
end_token: RoomStreamToken,
|
||||
) -> Dict[str, int]:
|
||||
"""Bulk fetch the stream position of the latest events in the given
|
||||
rooms
|
||||
"""
|
||||
|
||||
min_token = end_token.stream
|
||||
max_token = end_token.get_max_stream_pos()
|
||||
results: Dict[str, int] = {}
|
||||
|
||||
# First, we check for the rooms in the stream change cache to see if we
|
||||
# can just use the latest position from it.
|
||||
missing_room_ids: Set[str] = set()
|
||||
for room_id in room_ids:
|
||||
stream_pos = self._events_stream_cache.get_max_pos_of_last_change(room_id)
|
||||
if stream_pos and stream_pos <= min_token:
|
||||
results[room_id] = stream_pos
|
||||
else:
|
||||
missing_room_ids.add(room_id)
|
||||
|
||||
# Next, we query the stream position from the DB. At first we fetch all
|
||||
# positions less than the *max* stream pos in the token, then filter
|
||||
# them down. We do this as a) this is a cheaper query, and b) the vast
|
||||
# majority of rooms will have a latest token from before the min stream
|
||||
# pos.
|
||||
|
||||
def bulk_get_last_event_pos_txn(
|
||||
txn: LoggingTransaction, batch_room_ids: StrCollection
|
||||
) -> Dict[str, int]:
|
||||
# This query fetches the latest stream position in the rooms before
|
||||
# the given max position.
|
||||
clause, args = make_in_list_sql_clause(
|
||||
self.database_engine, "room_id", batch_room_ids
|
||||
)
|
||||
sql = f"""
|
||||
SELECT room_id, (
|
||||
SELECT stream_ordering FROM events AS e
|
||||
LEFT JOIN rejections USING (event_id)
|
||||
WHERE e.room_id = r.room_id
|
||||
AND stream_ordering <= ?
|
||||
AND NOT outlier
|
||||
AND rejection_reason IS NULL
|
||||
ORDER BY stream_ordering DESC
|
||||
LIMIT 1
|
||||
)
|
||||
FROM rooms AS r
|
||||
WHERE {clause}
|
||||
"""
|
||||
txn.execute(sql, [max_token] + args)
|
||||
return {row[0]: row[1] for row in txn}
|
||||
|
||||
recheck_rooms: Set[str] = set()
|
||||
for batched in batch_iter(missing_room_ids, 1000):
|
||||
result = await self.db_pool.runInteraction(
|
||||
"bulk_get_last_event_pos_in_room_before_stream_ordering",
|
||||
bulk_get_last_event_pos_txn,
|
||||
batched,
|
||||
)
|
||||
|
||||
# Check that the stream position for the rooms are from before the
|
||||
# minimum position of the token. If not then we need to fetch more
|
||||
# rows.
|
||||
for room_id, stream in result.items():
|
||||
if stream <= min_token:
|
||||
results[room_id] = stream
|
||||
else:
|
||||
recheck_rooms.add(room_id)
|
||||
|
||||
if not recheck_rooms:
|
||||
return results
|
||||
|
||||
# For the remaining rooms we need to fetch all rows between the min and
|
||||
# max stream positions in the end token, and filter out the rows that
|
||||
# are after the end token.
|
||||
#
|
||||
# This query should be fast as the range between the min and max should
|
||||
# be small.
|
||||
|
||||
def bulk_get_last_event_pos_recheck_txn(
|
||||
txn: LoggingTransaction, batch_room_ids: StrCollection
|
||||
) -> Dict[str, int]:
|
||||
clause, args = make_in_list_sql_clause(
|
||||
self.database_engine, "room_id", batch_room_ids
|
||||
)
|
||||
sql = f"""
|
||||
SELECT room_id, instance_name, stream_ordering
|
||||
FROM events
|
||||
WHERE ? < stream_ordering AND stream_ordering <= ?
|
||||
AND NOT outlier
|
||||
AND rejection_reason IS NULL
|
||||
AND {clause}
|
||||
ORDER BY stream_ordering ASC
|
||||
"""
|
||||
txn.execute(sql, [min_token, max_token] + args)
|
||||
|
||||
# We take the max stream ordering that is less than the token. Since
|
||||
# we ordered by stream ordering we just need to iterate through and
|
||||
# take the last matching stream ordering.
|
||||
txn_results: Dict[str, int] = {}
|
||||
for row in txn:
|
||||
room_id = row[0]
|
||||
event_pos = PersistedEventPosition(row[1], row[2])
|
||||
if not event_pos.persisted_after(end_token):
|
||||
txn_results[room_id] = event_pos.stream
|
||||
|
||||
return txn_results
|
||||
|
||||
for batched in batch_iter(recheck_rooms, 1000):
|
||||
recheck_result = await self.db_pool.runInteraction(
|
||||
"bulk_get_last_event_pos_in_room_before_stream_ordering_recheck",
|
||||
bulk_get_last_event_pos_recheck_txn,
|
||||
batched,
|
||||
)
|
||||
results.update(recheck_result)
|
||||
|
||||
return results
|
||||
|
||||
async def get_current_room_stream_token_for_room_id(
|
||||
self, room_id: str
|
||||
) -> RoomStreamToken:
|
||||
@@ -1501,7 +1700,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
topological=topological_ordering, stream=stream_ordering
|
||||
)
|
||||
|
||||
rows, start_token = self._paginate_room_events_txn(
|
||||
rows, start_token = self._paginate_room_events_by_topological_ordering_txn(
|
||||
txn,
|
||||
room_id,
|
||||
before_token,
|
||||
@@ -1511,7 +1710,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
)
|
||||
events_before = [r.event_id for r in rows]
|
||||
|
||||
rows, end_token = self._paginate_room_events_txn(
|
||||
rows, end_token = self._paginate_room_events_by_topological_ordering_txn(
|
||||
txn,
|
||||
room_id,
|
||||
after_token,
|
||||
@@ -1674,14 +1873,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
def has_room_changed_since(self, room_id: str, stream_id: int) -> bool:
|
||||
return self._events_stream_cache.has_entity_changed(room_id, stream_id)
|
||||
|
||||
def _paginate_room_events_txn(
|
||||
def _paginate_room_events_by_topological_ordering_txn(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
room_id: str,
|
||||
from_token: RoomStreamToken,
|
||||
to_token: Optional[RoomStreamToken] = None,
|
||||
direction: Direction = Direction.BACKWARDS,
|
||||
limit: int = -1,
|
||||
limit: int = 0,
|
||||
event_filter: Optional[Filter] = None,
|
||||
) -> Tuple[List[_EventDictReturn], RoomStreamToken]:
|
||||
"""Returns list of events before or after a given token.
|
||||
@@ -1703,6 +1902,24 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
been reached (i.e. there are no events between `from_token` and
|
||||
`to_token`), or `limit` is zero.
|
||||
"""
|
||||
# We can bail early if we're looking forwards, and our `to_key` is already
|
||||
# before our `from_token`.
|
||||
if (
|
||||
direction == Direction.FORWARDS
|
||||
and to_token is not None
|
||||
and to_token.is_before_or_eq(from_token)
|
||||
):
|
||||
# Token selection matches what we do below if there are no rows
|
||||
return [], to_token if to_token else from_token
|
||||
# Or vice-versa, if we're looking backwards and our `from_token` is already before
|
||||
# our `to_token`.
|
||||
elif (
|
||||
direction == Direction.BACKWARDS
|
||||
and to_token is not None
|
||||
and from_token.is_before_or_eq(to_token)
|
||||
):
|
||||
# Token selection matches what we do below if there are no rows
|
||||
return [], to_token if to_token else from_token
|
||||
|
||||
args: List[Any] = [room_id]
|
||||
|
||||
@@ -1787,7 +2004,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
"bounds": bounds,
|
||||
"order": order,
|
||||
}
|
||||
|
||||
txn.execute(sql, args)
|
||||
|
||||
# Filter the result set.
|
||||
@@ -1819,27 +2035,30 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
return rows, next_token
|
||||
|
||||
@trace
|
||||
async def paginate_room_events(
|
||||
@tag_args
|
||||
async def paginate_room_events_by_topological_ordering(
|
||||
self,
|
||||
*,
|
||||
room_id: str,
|
||||
from_key: RoomStreamToken,
|
||||
to_key: Optional[RoomStreamToken] = None,
|
||||
direction: Direction = Direction.BACKWARDS,
|
||||
limit: int = -1,
|
||||
limit: int = 0,
|
||||
event_filter: Optional[Filter] = None,
|
||||
) -> Tuple[List[EventBase], RoomStreamToken]:
|
||||
"""Returns list of events before or after a given token.
|
||||
|
||||
When Direction.FORWARDS: from_key < x <= to_key
|
||||
When Direction.BACKWARDS: from_key >= x > to_key
|
||||
"""
|
||||
Paginate events by `topological_ordering` (tie-break with `stream_ordering`) in
|
||||
the room from the `from_key` in the given `direction` to the `to_key` or
|
||||
`limit`.
|
||||
|
||||
Args:
|
||||
room_id
|
||||
from_key: The token used to stream from
|
||||
to_key: A token which if given limits the results to only those before
|
||||
from_key: The token to stream from (starting point and heading in the given
|
||||
direction)
|
||||
to_key: The token representing the end stream position (end point)
|
||||
direction: Indicates whether we are paginating forwards or backwards
|
||||
from `from_key`.
|
||||
limit: The maximum number of events to return.
|
||||
limit: Maximum number of events to return
|
||||
event_filter: If provided filters the events to those that match the filter.
|
||||
|
||||
Returns:
|
||||
@@ -1847,8 +2066,18 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
of the result set. If no events are returned then the end of the
|
||||
stream has been reached (i.e. there are no events between `from_key`
|
||||
and `to_key`).
|
||||
|
||||
When Direction.FORWARDS: from_key < x <= to_key, (ascending order)
|
||||
When Direction.BACKWARDS: from_key >= x > to_key, (descending order)
|
||||
"""
|
||||
|
||||
# FIXME: When going forwards, we should enforce that the `to_key` is not `None`
|
||||
# because we always need an upper bound when querying the events stream (as
|
||||
# otherwise we'll potentially pick up events that are not fully persisted).
|
||||
|
||||
# We have these checks outside of the transaction function (txn) to save getting
|
||||
# a DB connection and switching threads if we don't need to.
|
||||
#
|
||||
# We can bail early if we're looking forwards, and our `to_key` is already
|
||||
# before our `from_key`.
|
||||
if (
|
||||
@@ -1871,8 +2100,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
return [], to_key if to_key else from_key
|
||||
|
||||
rows, token = await self.db_pool.runInteraction(
|
||||
"paginate_room_events",
|
||||
self._paginate_room_events_txn,
|
||||
"paginate_room_events_by_topological_ordering",
|
||||
self._paginate_room_events_by_topological_ordering_txn,
|
||||
room_id,
|
||||
from_key,
|
||||
to_key,
|
||||
@@ -1983,3 +2212,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
return RoomStreamToken(stream=last_position.stream - 1)
|
||||
|
||||
return None
|
||||
|
||||
@trace
|
||||
def get_rooms_that_might_have_updates(
|
||||
self, room_ids: StrCollection, from_token: RoomStreamToken
|
||||
) -> StrCollection:
|
||||
"""Filters given room IDs down to those that might have updates, i.e.
|
||||
removes rooms that definitely do not have updates.
|
||||
"""
|
||||
return self._events_stream_cache.get_entities_changed(
|
||||
room_ids, from_token.stream
|
||||
)
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
#
|
||||
#
|
||||
|
||||
SCHEMA_VERSION = 85 # remember to update the list below when updating
|
||||
SCHEMA_VERSION = 86 # remember to update the list below when updating
|
||||
"""Represents the expectations made by the codebase about the database schema
|
||||
|
||||
This should be incremented whenever the codebase changes its requirements on the
|
||||
@@ -139,6 +139,9 @@ Changes in SCHEMA_VERSION = 84
|
||||
|
||||
Changes in SCHEMA_VERSION = 85
|
||||
- Add a column `suspended` to the `users` table
|
||||
|
||||
Changes in SCHEMA_VERSION = 86
|
||||
- Add a column `authenticated` to the tables `local_media_repository` and `remote_media_cache`
|
||||
"""
|
||||
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user