Compare commits

...

13 Commits

Author SHA1 Message Date
Andrew Ferrazzutti
f4320b5a49 Admin API: worker support for Query User Account (#19281) 2025-12-16 17:42:08 +00:00
Tulir Asokan
3989d22a37 Implement pagination for MSC2666 (#19279)
Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
2025-12-16 15:24:36 +00:00
Joshua Goins
0395b71e25 Fix Mastodon URL previews not showing anything useful (#19231)
Fixes #18444. Inside of UrlPreviewer, we need to combine two dicts (one
from oEmbed, and one from OpenGraph metadata in the HTML) and in Mastodon's case they were very
different.

Single Page Applications (SPAs) seem to sometimes provide better information in the OpenGraph tags
than the oEmbed stubs, because the oEmbed stubs are filled in with JavaScript that Synapse does
not execute.

This change improves previews on Mastodon and YouTube (for the same reason).

Tested to not regress previews of Twitter or GitHub.
2025-12-16 13:02:29 +00:00
Denis Kasak
29fd0116a5 Improve proxy support for the federation_client.py dev script (#19300)
Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
2025-12-16 11:06:07 +00:00
Travis Ralston
0f2b29511f Allow admins to bypass the quarantine check on media downloads (#19275)
Co-authored-by: turt2live <1190097+turt2live@users.noreply.github.com>
Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
2025-12-15 17:23:33 +00:00
Andre Klärner
466994743a Document importance of public_baseurl for delegation and OIDC (#19270)
I just stumbled across the fact that my config used delegation as
recommended by the docs, and hosted Synapse on a subdomain. However my
config never had `public_baseurl` set and worked without issues, until I
just now tried to setup OIDC.

OIDC is initialized by the client instructing to open a URL on the
homeserver, and initially the correct URL is called, but Synapse does
not recognize it without `public_baseurl` being set correctly. After
changing this it immediately started working.

So in order to prevent anybody from making the same mistake, this adds a
small clarifying block in the OIDC docs.
2025-12-12 18:07:39 -06:00
Devon Hudson
df24e0f302 Fix support for older versions of zope-interface (#19274)
Fixes #19269 

Versions of zope-interface from RHEL, Ubuntu LTS 22 & 24 and OpenSuse
don't support the new python union `X | Y` syntax for interfaces. This
PR partially reverts the change over to fully use the new syntax, adds a
minimum supported version of zope-interface to Synapse's dependency
list, and removes the linter auto-upgrades which prefer the newer
syntax.

### Pull Request Checklist

<!-- Please read
https://element-hq.github.io/synapse/latest/development/contributing_guide.html
before submitting your pull request -->

* [X] Pull request is based on the develop branch
* [X] Pull request includes a [changelog
file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog).
The entry should:
- Be a short description of your change which makes sense to users.
"Fixed a bug that prevented receiving messages from other servers."
instead of "Moved X method from `EventStore` to `EventWorkerStore`.".
  - Use markdown where necessary, mostly for `code blocks`.
  - End with either a period (.) or an exclamation mark (!).
  - Start with a capital letter.
- Feel free to credit yourself, by adding a sentence "Contributed by
@github_username." or "Contributed by [Your Name]." to the end of the
entry.
* [X] [Code
style](https://element-hq.github.io/synapse/latest/code_style.html) is
correct (run the
[linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))

---------

Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
2025-12-12 15:34:13 +00:00
Andrew Morgan
048629dd13 minor grammar fix
context: https://github.com/element-hq/synapse/pull/19260#discussion_r2614227743
2025-12-12 13:36:34 +00:00
Mathieu Velten
7347cc436e Add memberships admin API (#19260)
Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
2025-12-12 13:35:46 +00:00
Travis Ralston
3f636386a6 Add an Admin API endpoint for listing quarantined media (#19268)
Co-authored-by: turt2live <1190097+turt2live@users.noreply.github.com>
Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
2025-12-12 13:30:21 +00:00
Andrew Morgan
1f7f16477d Unpin Rust from 1.82.0 (#19302) 2025-12-12 11:31:55 +00:00
Erik Johnston
dfd00a986f Fix sliding sync performance slow down for long lived connections. (#19206)
Fixes https://github.com/element-hq/synapse/issues/19175

This PR moves tracking of what lazy loaded membership we've sent to each
room out of the required state table. This avoids that table from
continuously growing, which massively helps performance as we pull out
all matching rows for the connection when we receive a request.

The new table is only read when we have data in a room to send, so we
end up reading a lot fewer rows from the DB. Though we now read from
that table for every room we have events to return in, rather than once
at the start of the request.

For an explanation of how the new table works, see the
[comment](https://github.com/element-hq/synapse/blob/erikj/sss_better_membership_storage2/synapse/storage/schema/main/delta/93/02_sliding_sync_members.sql#L15-L38)
on the table schema.

The table is designed so that we can later prune old entries if we wish,
but that is not implemented in this PR.

Reviewable commit-by-commit.

---------

Co-authored-by: Eric Eastwood <erice@element.io>
2025-12-12 10:02:57 +00:00
Devon Hudson
cdf286d405 Use uv to test full set of minimum deps in CI (#19289)
Stemming from #19274 this updates the `olddeps` CI to test against not
just the minimum version of our explicit dependencies, but also the
minimum version of all implicit (transitive) dependencies that are
pulled in from the explicit dependencies themselves.

### Pull Request Checklist

<!-- Please read
https://element-hq.github.io/synapse/latest/development/contributing_guide.html
before submitting your pull request -->

* [X] Pull request is based on the develop branch
* [X] Pull request includes a [changelog
file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog).
The entry should:
- Be a short description of your change which makes sense to users.
"Fixed a bug that prevented receiving messages from other servers."
instead of "Moved X method from `EventStore` to `EventWorkerStore`.".
  - Use markdown where necessary, mostly for `code blocks`.
  - End with either a period (.) or an exclamation mark (!).
  - Start with a capital letter.
- Feel free to credit yourself, by adding a sentence "Contributed by
@github_username." or "Contributed by [Your Name]." to the end of the
entry.
* [X] [Code
style](https://element-hq.github.io/synapse/latest/code_style.html) is
correct (run the
[linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))
2025-12-11 17:58:27 +00:00
72 changed files with 2843 additions and 574 deletions

View File

@@ -7,4 +7,4 @@ if command -v yum &> /dev/null; then
fi
# Install a Rust toolchain
curl https://sh.rustup.rs -sSf | sh -s -- --default-toolchain 1.82.0 -y --profile minimal
curl https://sh.rustup.rs -sSf | sh -s -- --default-toolchain stable -y --profile minimal

View File

@@ -1,39 +0,0 @@
#!/usr/bin/env bash
# this script is run by GitHub Actions in a plain `jammy` container; it
# - installs the minimal system requirements, and poetry;
# - patches the project definition file to refer to old versions only;
# - creates a venv with these old versions using poetry; and finally
# - invokes `trial` to run the tests with old deps.
set -ex
# Prevent virtualenv from auto-updating pip to an incompatible version
export VIRTUALENV_NO_DOWNLOAD=1
# TODO: in the future, we could use an implementation of
# https://github.com/python-poetry/poetry/issues/3527
# https://github.com/pypa/pip/issues/8085
# to select the lowest possible versions, rather than resorting to this sed script.
# Patch the project definitions in-place:
# - `-E` use extended regex syntax.
# - Don't modify the line that defines required Python versions.
# - Replace all lower and tilde bounds with exact bounds.
# - Replace all caret bounds with exact bounds.
# - Delete all lines referring to psycopg2 - so no testing of postgres support.
# - Use pyopenssl 17.0, which is the oldest version that works with
# a `cryptography` compiled against OpenSSL 1.1.
# - Omit systemd: we're not logging to journal here.
sed -i -E '
/^\s*requires-python\s*=/b
s/[~>]=/==/g
s/\^/==/g
/psycopg2/d
s/pyOpenSSL\s*==\s*16\.0\.0"/pyOpenSSL==17.0.0"/
/systemd/d
' pyproject.toml
echo "::group::Patched pyproject.toml"
cat pyproject.toml
echo "::endgroup::"

View File

@@ -5,7 +5,7 @@ name: Build release artifacts
on:
# we build on PRs and develop to (hopefully) get early warning
# of things breaking (but only build one set of debs). PRs skip
# building wheels on macOS & ARM.
# building wheels on ARM.
pull_request:
push:
branches: ["develop", "release-*"]

View File

@@ -452,14 +452,12 @@ jobs:
python-version: '3.10'
- name: Prepare old deps
if: steps.cache-poetry-old-deps.outputs.cache-hit != 'true'
run: .ci/scripts/prepare_old_deps.sh
# Note: we install using `pip` here, not poetry. `poetry install` ignores the
# build-system section (https://github.com/python-poetry/poetry/issues/6154), but
# we explicitly want to test that you can `pip install` using the oldest version
# of poetry-core and setuptools-rust.
- run: pip install .[all,test]
# Note: we install using `uv` here, not poetry or pip to allow us to test with the
# minimum version of all dependencies, both those explicitly specified and those
# implicitly brought in by the explicit dependencies.
run: |
pip install uv
uv pip install --system --resolution=lowest .[all,test]
# We nuke the local copy, as we've installed synapse into the virtualenv
# (rather than use an editable install, which we no longer support). If we

1
changelog.d/19206.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix sliding sync performance slow down for long lived connections.

1
changelog.d/19231.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a bug where Mastodon posts (and possibly other embeds) have the wrong description for URL previews.

View File

@@ -0,0 +1 @@
Add `memberships` endpoint to the admin API. This is useful for forensics and T&S purpose.

View File

@@ -0,0 +1 @@
Add an admin API for retrieving a paginated list of quarantined media.

1
changelog.d/19270.doc Normal file
View File

@@ -0,0 +1 @@
Document the importance of `public_baseurl` when configuring OpenID Connect authentication.

1
changelog.d/19274.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix bug introduced in 1.143.0 that broke support for versions of `zope-interface` older than 6.2.

View File

@@ -0,0 +1 @@
Server admins can bypass the quarantine media check when downloading media by setting the `admin_unsafely_bypass_quarantine` query parameter to `true` on Client-Server API media download requests.

View File

@@ -0,0 +1 @@
Implemented pagination for the [MSC2666](https://github.com/matrix-org/matrix-spec-proposals/pull/2666) mutual rooms endpoint. Contributed by @tulir @ Beeper.

View File

@@ -0,0 +1 @@
Admin API: add worker support to `GET /_synapse/admin/v2/users/<user_id>`.

1
changelog.d/19289.misc Normal file
View File

@@ -0,0 +1 @@
Use `uv` to test olddeps to ensure all transitive dependencies use minimum versions.

View File

@@ -0,0 +1 @@
Improve proxy support for the `federation_client.py` dev script. Contributed by Denis Kasak (@dkasak).

1
changelog.d/19302.misc Normal file
View File

@@ -0,0 +1 @@
Unpin the version of Rust we use to build Synapse wheels (was 1.82.0) now that MacOS support has been dropped.

View File

@@ -73,6 +73,33 @@ Response:
}
```
## Listing all quarantined media
This API returns a list of all quarantined media on the server. It is paginated, and can be scoped to either local or
remote media. Note that the pagination values are also scoped to the request parameters - changing them but keeping the
same pagination values will result in unexpected results.
Request:
```http
GET /_synapse/admin/v1/media/quarantined?from=0&limit=100&kind=local
```
`from` and `limit` are optional parameters, and default to `0` and `100` respectively. They are the row index and number
of rows to return - they are not timestamps.
`kind` *MUST* either be `local` or `remote`.
The API returns a JSON body containing MXC URIs for the quarantined media, like the following:
```json
{
"media": [
"mxc://localhost/xwvutsrqponmlkjihgfedcba",
"mxc://localhost/abcdefghijklmnopqrstuvwx"
]
}
```
# Quarantine media
Quarantining media means that it is marked as inaccessible by users. It applies
@@ -88,6 +115,20 @@ is quarantined, Synapse will:
- Quarantine any existing cached remote media.
- Quarantine any future remote media.
## Downloading quarantined media
Normally, when media is quarantined, it will return a 404 error when downloaded.
Admins can bypass this by adding `?admin_unsafely_bypass_quarantine=true`
to the [normal download URL](https://spec.matrix.org/v1.16/client-server-api/#get_matrixclientv1mediadownloadservernamemediaid).
Bypassing the quarantine check is not recommended. Media is typically quarantined
to prevent harmful content from being served to users, which includes admins. Only
set the bypass parameter if you intentionally want to access potentially harmful
content.
Non-admin users cannot bypass quarantine checks, even when specifying the above
query parameter.
## Quarantining media by ID
This API quarantines a single piece of local or remote media.

View File

@@ -505,6 +505,55 @@ with a body of:
}
```
## List room memberships of a user
Gets a list of room memberships for a specific `user_id`. This
endpoint differs from
[`GET /_synapse/admin/v1/users/<user_id>/joined_rooms`](#list-joined-rooms-of-a-user)
in that it returns rooms with memberships other than "join".
The API is:
```
GET /_synapse/admin/v1/users/<user_id>/memberships
```
A response body like the following is returned:
```json
{
"memberships": {
"!DuGcnbhHGaSZQoNQR:matrix.org": "join",
"!ZtSaPCawyWtxfWiIy:matrix.org": "leave",
}
}
```
which is a list of room membership states for the given user. This endpoint can
be used with both local and remote users, with the caveat that the homeserver will
only be aware of the memberships for rooms that one of its local users has joined.
Remote user memberships may also be out of date if all local users have since left
a room. The homeserver will thus no longer receive membership updates about it.
The list includes rooms that the user has since left; other membership states (knock,
invite, etc.) are also possible.
Note that rooms will only disappear from this list if they are
[purged](./rooms.md#delete-room-api) from the homeserver.
**Parameters**
The following parameters should be set in the URL:
- `user_id` - fully qualified: for example, `@user:server.com`.
**Response**
The following fields are returned in the JSON response body:
- `memberships` - A map of `room_id` (string) to `membership` state (string).
## List joined rooms of a user
Gets a list of all `room_id` that a specific `user_id` is joined to and is a member of (participating in).

View File

@@ -50,6 +50,11 @@ setting in your configuration file.
See the [configuration manual](usage/configuration/config_documentation.md#oidc_providers) for some sample settings, as well as
the text below for example configurations for specific providers.
For setups using [`.well-known` delegation](delegate.md), make sure
[`public_baseurl`](usage/configuration/config_documentation.md#public_baseurl) is set
appropriately. If unset, Synapse defaults to `https://<server_name>/` which is used in
the OIDC callback URL.
## OIDC Back-Channel Logout
Synapse supports receiving [OpenID Connect Back-Channel Logout](https://openid.net/specs/openid-connect-backchannel-1_0.html) notifications.

View File

@@ -255,6 +255,8 @@ information.
^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$
^/_matrix/client/(r0|v3|unstable)/capabilities$
^/_matrix/client/(r0|v3|unstable)/notifications$
# Admin API requests
^/_synapse/admin/v1/rooms/[^/]+$
# Encryption requests
@@ -300,6 +302,9 @@ Additionally, the following REST endpoints can be handled for GET requests:
# Presence requests
^/_matrix/client/(api/v1|r0|v3|unstable)/presence/
# Admin API requests
^/_synapse/admin/v2/users/[^/]+$
Pagination requests can also be handled, but all requests for a given
room must be routed to the same instance. Additionally, care must be taken to
ensure that the purge history admin API is not used while pagination requests

109
poetry.lock generated
View File

@@ -31,7 +31,7 @@ description = "The ultimate Python library in building OAuth and OpenID Connect
optional = true
python-versions = ">=3.9"
groups = ["main"]
markers = "extra == \"all\" or extra == \"jwt\" or extra == \"oidc\""
markers = "extra == \"oidc\" or extra == \"jwt\" or extra == \"all\""
files = [
{file = "authlib-1.6.5-py2.py3-none-any.whl", hash = "sha256:3e0e0507807f842b02175507bdee8957a1d5707fd4afb17c32fb43fee90b6e3a"},
{file = "authlib-1.6.5.tar.gz", hash = "sha256:6aaf9c79b7cc96c900f0b284061691c5d4e61221640a948fe690b556a6d6d10b"},
@@ -481,7 +481,7 @@ description = "XML bomb protection for Python stdlib modules"
optional = true
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
groups = ["main"]
markers = "extra == \"all\" or extra == \"saml2\""
markers = "extra == \"saml2\" or extra == \"all\""
files = [
{file = "defusedxml-0.7.1-py2.py3-none-any.whl", hash = "sha256:a352e7e428770286cc899e2542b6cdaedb2b4953ff269a210103ec58f6198a61"},
{file = "defusedxml-0.7.1.tar.gz", hash = "sha256:1bb3032db185915b62d7c6209c5a8792be6a32ab2fedacc84e01b52c51aa3e69"},
@@ -506,7 +506,7 @@ description = "XPath 1.0/2.0/3.0/3.1 parsers and selectors for ElementTree and l
optional = true
python-versions = ">=3.7"
groups = ["main"]
markers = "extra == \"all\" or extra == \"saml2\""
markers = "extra == \"saml2\" or extra == \"all\""
files = [
{file = "elementpath-4.1.5-py3-none-any.whl", hash = "sha256:2ac1a2fb31eb22bbbf817f8cf6752f844513216263f0e3892c8e79782fe4bb55"},
{file = "elementpath-4.1.5.tar.gz", hash = "sha256:c2d6dc524b29ef751ecfc416b0627668119d8812441c555d7471da41d4bacb8d"},
@@ -556,7 +556,7 @@ description = "Python wrapper for hiredis"
optional = true
python-versions = ">=3.8"
groups = ["main"]
markers = "extra == \"all\" or extra == \"redis\""
markers = "extra == \"redis\" or extra == \"all\""
files = [
{file = "hiredis-3.3.0-cp310-cp310-macosx_10_15_universal2.whl", hash = "sha256:9937d9b69321b393fbace69f55423480f098120bc55a3316e1ca3508c4dbbd6f"},
{file = "hiredis-3.3.0-cp310-cp310-macosx_10_15_x86_64.whl", hash = "sha256:50351b77f89ba6a22aff430b993653847f36b71d444509036baa0f2d79d1ebf4"},
@@ -879,7 +879,7 @@ description = "Jaeger Python OpenTracing Tracer implementation"
optional = true
python-versions = ">=3.7"
groups = ["main"]
markers = "extra == \"all\" or extra == \"opentracing\""
markers = "extra == \"opentracing\" or extra == \"all\""
files = [
{file = "jaeger-client-4.8.0.tar.gz", hash = "sha256:3157836edab8e2c209bd2d6ae61113db36f7ee399e66b1dcbb715d87ab49bfe0"},
]
@@ -1017,7 +1017,7 @@ description = "A strictly RFC 4510 conforming LDAP V3 pure Python client library
optional = true
python-versions = "*"
groups = ["main"]
markers = "extra == \"all\" or extra == \"matrix-synapse-ldap3\""
markers = "extra == \"matrix-synapse-ldap3\" or extra == \"all\""
files = [
{file = "ldap3-2.9.1-py2.py3-none-any.whl", hash = "sha256:5869596fc4948797020d3f03b7939da938778a0f9e2009f7a072ccf92b8e8d70"},
{file = "ldap3-2.9.1.tar.gz", hash = "sha256:f3e7fc4718e3f09dda568b57100095e0ce58633bcabbed8667ce3f8fbaa4229f"},
@@ -1119,7 +1119,7 @@ description = "Powerful and Pythonic XML processing library combining libxml2/li
optional = true
python-versions = ">=3.8"
groups = ["main"]
markers = "extra == \"all\" or extra == \"url-preview\""
markers = "extra == \"url-preview\" or extra == \"all\""
files = [
{file = "lxml-6.0.2-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:e77dd455b9a16bbd2a5036a63ddbd479c19572af81b624e79ef422f929eef388"},
{file = "lxml-6.0.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:5d444858b9f07cefff6455b983aea9a67f7462ba1f6cbe4a21e8bf6791bf2153"},
@@ -1405,7 +1405,7 @@ description = "An LDAP3 auth provider for Synapse"
optional = true
python-versions = ">=3.7"
groups = ["main"]
markers = "extra == \"all\" or extra == \"matrix-synapse-ldap3\""
markers = "extra == \"matrix-synapse-ldap3\" or extra == \"all\""
files = [
{file = "matrix-synapse-ldap3-0.3.0.tar.gz", hash = "sha256:8bb6517173164d4b9cc44f49de411d8cebdb2e705d5dd1ea1f38733c4a009e1d"},
{file = "matrix_synapse_ldap3-0.3.0-py3-none-any.whl", hash = "sha256:8b4d701f8702551e98cc1d8c20dbed532de5613584c08d0df22de376ba99159d"},
@@ -1648,7 +1648,7 @@ description = "OpenTracing API for Python. See documentation at http://opentraci
optional = true
python-versions = "*"
groups = ["main"]
markers = "extra == \"all\" or extra == \"opentracing\""
markers = "extra == \"opentracing\" or extra == \"all\""
files = [
{file = "opentracing-2.4.0.tar.gz", hash = "sha256:a173117e6ef580d55874734d1fa7ecb6f3655160b8b8974a2a1e98e5ec9c840d"},
]
@@ -1838,7 +1838,7 @@ description = "psycopg2 - Python-PostgreSQL Database Adapter"
optional = true
python-versions = ">=3.9"
groups = ["main"]
markers = "extra == \"all\" or extra == \"postgres\""
markers = "extra == \"postgres\" or extra == \"all\""
files = [
{file = "psycopg2-2.9.11-cp310-cp310-win_amd64.whl", hash = "sha256:103e857f46bb76908768ead4e2d0ba1d1a130e7b8ed77d3ae91e8b33481813e8"},
{file = "psycopg2-2.9.11-cp311-cp311-win_amd64.whl", hash = "sha256:210daed32e18f35e3140a1ebe059ac29209dd96468f2f7559aa59f75ee82a5cb"},
@@ -1856,7 +1856,7 @@ description = ".. image:: https://travis-ci.org/chtd/psycopg2cffi.svg?branch=mas
optional = true
python-versions = "*"
groups = ["main"]
markers = "platform_python_implementation == \"PyPy\" and (extra == \"all\" or extra == \"postgres\")"
markers = "platform_python_implementation == \"PyPy\" and (extra == \"postgres\" or extra == \"all\")"
files = [
{file = "psycopg2cffi-2.9.0.tar.gz", hash = "sha256:7e272edcd837de3a1d12b62185eb85c45a19feda9e62fa1b120c54f9e8d35c52"},
]
@@ -1872,7 +1872,7 @@ description = "A Simple library to enable psycopg2 compatability"
optional = true
python-versions = "*"
groups = ["main"]
markers = "platform_python_implementation == \"PyPy\" and (extra == \"all\" or extra == \"postgres\")"
markers = "platform_python_implementation == \"PyPy\" and (extra == \"postgres\" or extra == \"all\")"
files = [
{file = "psycopg2cffi-compat-1.1.tar.gz", hash = "sha256:d25e921748475522b33d13420aad5c2831c743227dc1f1f2585e0fdb5c914e05"},
]
@@ -2154,7 +2154,7 @@ description = "A development tool to measure, monitor and analyze the memory beh
optional = true
python-versions = ">=3.6"
groups = ["main"]
markers = "extra == \"all\" or extra == \"cache-memory\""
markers = "extra == \"cache-memory\" or extra == \"all\""
files = [
{file = "Pympler-1.0.1-py3-none-any.whl", hash = "sha256:d260dda9ae781e1eab6ea15bacb84015849833ba5555f141d2d9b7b7473b307d"},
{file = "Pympler-1.0.1.tar.gz", hash = "sha256:993f1a3599ca3f4fcd7160c7545ad06310c9e12f70174ae7ae8d4e25f6c5d3fa"},
@@ -2207,6 +2207,63 @@ typing-extensions = {version = ">=4.9", markers = "python_version < \"3.13\" and
docs = ["sphinx (!=5.2.0,!=5.2.0.post0,!=7.2.5)", "sphinx_rtd_theme"]
test = ["pretend", "pytest (>=3.0.1)", "pytest-rerunfailures"]
[[package]]
name = "pyparsing"
version = "3.2.5"
description = "pyparsing - Classes and methods to define and execute parsing grammars"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "pyparsing-3.2.5-py3-none-any.whl", hash = "sha256:e38a4f02064cf41fe6593d328d0512495ad1f3d8a91c4f73fc401b3079a59a5e"},
{file = "pyparsing-3.2.5.tar.gz", hash = "sha256:2df8d5b7b2802ef88e8d016a2eb9c7aeaa923529cd251ed0fe4608275d4105b6"},
]
[package.extras]
diagrams = ["jinja2", "railroad-diagrams"]
[[package]]
name = "pyrsistent"
version = "0.20.0"
description = "Persistent/Functional/Immutable data structures"
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "pyrsistent-0.20.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:8c3aba3e01235221e5b229a6c05f585f344734bd1ad42a8ac51493d74722bbce"},
{file = "pyrsistent-0.20.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c1beb78af5423b879edaf23c5591ff292cf7c33979734c99aa66d5914ead880f"},
{file = "pyrsistent-0.20.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:21cc459636983764e692b9eba7144cdd54fdec23ccdb1e8ba392a63666c60c34"},
{file = "pyrsistent-0.20.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f5ac696f02b3fc01a710427585c855f65cd9c640e14f52abe52020722bb4906b"},
{file = "pyrsistent-0.20.0-cp310-cp310-win32.whl", hash = "sha256:0724c506cd8b63c69c7f883cc233aac948c1ea946ea95996ad8b1380c25e1d3f"},
{file = "pyrsistent-0.20.0-cp310-cp310-win_amd64.whl", hash = "sha256:8441cf9616d642c475684d6cf2520dd24812e996ba9af15e606df5f6fd9d04a7"},
{file = "pyrsistent-0.20.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:0f3b1bcaa1f0629c978b355a7c37acd58907390149b7311b5db1b37648eb6958"},
{file = "pyrsistent-0.20.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5cdd7ef1ea7a491ae70d826b6cc64868de09a1d5ff9ef8d574250d0940e275b8"},
{file = "pyrsistent-0.20.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:cae40a9e3ce178415040a0383f00e8d68b569e97f31928a3a8ad37e3fde6df6a"},
{file = "pyrsistent-0.20.0-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6288b3fa6622ad8a91e6eb759cfc48ff3089e7c17fb1d4c59a919769314af224"},
{file = "pyrsistent-0.20.0-cp311-cp311-win32.whl", hash = "sha256:7d29c23bdf6e5438c755b941cef867ec2a4a172ceb9f50553b6ed70d50dfd656"},
{file = "pyrsistent-0.20.0-cp311-cp311-win_amd64.whl", hash = "sha256:59a89bccd615551391f3237e00006a26bcf98a4d18623a19909a2c48b8e986ee"},
{file = "pyrsistent-0.20.0-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:09848306523a3aba463c4b49493a760e7a6ca52e4826aa100ee99d8d39b7ad1e"},
{file = "pyrsistent-0.20.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a14798c3005ec892bbada26485c2eea3b54109cb2533713e355c806891f63c5e"},
{file = "pyrsistent-0.20.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b14decb628fac50db5e02ee5a35a9c0772d20277824cfe845c8a8b717c15daa3"},
{file = "pyrsistent-0.20.0-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2e2c116cc804d9b09ce9814d17df5edf1df0c624aba3b43bc1ad90411487036d"},
{file = "pyrsistent-0.20.0-cp312-cp312-win32.whl", hash = "sha256:e78d0c7c1e99a4a45c99143900ea0546025e41bb59ebc10182e947cf1ece9174"},
{file = "pyrsistent-0.20.0-cp312-cp312-win_amd64.whl", hash = "sha256:4021a7f963d88ccd15b523787d18ed5e5269ce57aa4037146a2377ff607ae87d"},
{file = "pyrsistent-0.20.0-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:79ed12ba79935adaac1664fd7e0e585a22caa539dfc9b7c7c6d5ebf91fb89054"},
{file = "pyrsistent-0.20.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f920385a11207dc372a028b3f1e1038bb244b3ec38d448e6d8e43c6b3ba20e98"},
{file = "pyrsistent-0.20.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4f5c2d012671b7391803263419e31b5c7c21e7c95c8760d7fc35602353dee714"},
{file = "pyrsistent-0.20.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ef3992833fbd686ee783590639f4b8343a57f1f75de8633749d984dc0eb16c86"},
{file = "pyrsistent-0.20.0-cp38-cp38-win32.whl", hash = "sha256:881bbea27bbd32d37eb24dd320a5e745a2a5b092a17f6debc1349252fac85423"},
{file = "pyrsistent-0.20.0-cp38-cp38-win_amd64.whl", hash = "sha256:6d270ec9dd33cdb13f4d62c95c1a5a50e6b7cdd86302b494217137f760495b9d"},
{file = "pyrsistent-0.20.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:ca52d1ceae015859d16aded12584c59eb3825f7b50c6cfd621d4231a6cc624ce"},
{file = "pyrsistent-0.20.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b318ca24db0f0518630e8b6f3831e9cba78f099ed5c1d65ffe3e023003043ba0"},
{file = "pyrsistent-0.20.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fed2c3216a605dc9a6ea50c7e84c82906e3684c4e80d2908208f662a6cbf9022"},
{file = "pyrsistent-0.20.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2e14c95c16211d166f59c6611533d0dacce2e25de0f76e4c140fde250997b3ca"},
{file = "pyrsistent-0.20.0-cp39-cp39-win32.whl", hash = "sha256:f058a615031eea4ef94ead6456f5ec2026c19fb5bd6bfe86e9665c4158cf802f"},
{file = "pyrsistent-0.20.0-cp39-cp39-win_amd64.whl", hash = "sha256:58b8f6366e152092194ae68fefe18b9f0b4f89227dfd86a07770c3d86097aebf"},
{file = "pyrsistent-0.20.0-py3-none-any.whl", hash = "sha256:c55acc4733aad6560a7f5f818466631f07efc001fd023f34a6c203f8b6df0f0b"},
{file = "pyrsistent-0.20.0.tar.gz", hash = "sha256:4c48f78f62ab596c679086084d0dd13254ae4f3d6c72a83ffdf5ebdef8f265a4"},
]
[[package]]
name = "pysaml2"
version = "7.5.0"
@@ -2214,7 +2271,7 @@ description = "Python implementation of SAML Version 2 Standard"
optional = true
python-versions = ">=3.9,<4.0"
groups = ["main"]
markers = "extra == \"all\" or extra == \"saml2\""
markers = "extra == \"saml2\" or extra == \"all\""
files = [
{file = "pysaml2-7.5.0-py3-none-any.whl", hash = "sha256:bc6627cc344476a83c757f440a73fda1369f13b6fda1b4e16bca63ffbabb5318"},
{file = "pysaml2-7.5.0.tar.gz", hash = "sha256:f36871d4e5ee857c6b85532e942550d2cf90ea4ee943d75eb681044bbc4f54f7"},
@@ -2239,7 +2296,7 @@ description = "Extensions to the standard Python datetime module"
optional = true
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7"
groups = ["main"]
markers = "extra == \"all\" or extra == \"saml2\""
markers = "extra == \"saml2\" or extra == \"all\""
files = [
{file = "python-dateutil-2.8.2.tar.gz", hash = "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86"},
{file = "python_dateutil-2.8.2-py2.py3-none-any.whl", hash = "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9"},
@@ -2267,7 +2324,7 @@ description = "World timezone definitions, modern and historical"
optional = true
python-versions = "*"
groups = ["main"]
markers = "extra == \"all\" or extra == \"saml2\""
markers = "extra == \"saml2\" or extra == \"all\""
files = [
{file = "pytz-2025.2-py2.py3-none-any.whl", hash = "sha256:5ddf76296dd8c44c26eb8f4b6f35488f3ccbf6fbbd7adee0b7262d43f0ec2f00"},
{file = "pytz-2025.2.tar.gz", hash = "sha256:360b9e3dbb49a209c21ad61809c7fb453643e048b38924c765813546746e81c3"},
@@ -2671,7 +2728,7 @@ description = "Python client for Sentry (https://sentry.io)"
optional = true
python-versions = ">=3.6"
groups = ["main"]
markers = "extra == \"all\" or extra == \"sentry\""
markers = "extra == \"sentry\" or extra == \"all\""
files = [
{file = "sentry_sdk-2.46.0-py2.py3-none-any.whl", hash = "sha256:4eeeb60198074dff8d066ea153fa6f241fef1668c10900ea53a4200abc8da9b1"},
{file = "sentry_sdk-2.46.0.tar.gz", hash = "sha256:91821a23460725734b7741523021601593f35731808afc0bb2ba46c27b8acd91"},
@@ -2881,7 +2938,7 @@ description = "Tornado IOLoop Backed Concurrent Futures"
optional = true
python-versions = "*"
groups = ["main"]
markers = "extra == \"all\" or extra == \"opentracing\""
markers = "extra == \"opentracing\" or extra == \"all\""
files = [
{file = "threadloop-1.0.2-py2-none-any.whl", hash = "sha256:5c90dbefab6ffbdba26afb4829d2a9df8275d13ac7dc58dccb0e279992679599"},
{file = "threadloop-1.0.2.tar.gz", hash = "sha256:8b180aac31013de13c2ad5c834819771992d350267bddb854613ae77ef571944"},
@@ -2897,7 +2954,7 @@ description = "Python bindings for the Apache Thrift RPC system"
optional = true
python-versions = "*"
groups = ["main"]
markers = "extra == \"all\" or extra == \"opentracing\""
markers = "extra == \"opentracing\" or extra == \"all\""
files = [
{file = "thrift-0.16.0.tar.gz", hash = "sha256:2b5b6488fcded21f9d312aa23c9ff6a0195d0f6ae26ddbd5ad9e3e25dfc14408"},
]
@@ -2970,7 +3027,7 @@ description = "Tornado is a Python web framework and asynchronous networking lib
optional = true
python-versions = ">=3.9"
groups = ["main"]
markers = "extra == \"all\" or extra == \"opentracing\""
markers = "extra == \"opentracing\" or extra == \"all\""
files = [
{file = "tornado-6.5-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:f81067dad2e4443b015368b24e802d0083fecada4f0a4572fdb72fc06e54a9a6"},
{file = "tornado-6.5-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:9ac1cbe1db860b3cbb251e795c701c41d343f06a96049d6274e7c77559117e41"},
@@ -3104,7 +3161,7 @@ description = "non-blocking redis client for python"
optional = true
python-versions = "*"
groups = ["main"]
markers = "extra == \"all\" or extra == \"redis\""
markers = "extra == \"redis\" or extra == \"all\""
files = [
{file = "txredisapi-1.4.11-py3-none-any.whl", hash = "sha256:ac64d7a9342b58edca13ef267d4fa7637c1aa63f8595e066801c1e8b56b22d0b"},
{file = "txredisapi-1.4.11.tar.gz", hash = "sha256:3eb1af99aefdefb59eb877b1dd08861efad60915e30ad5bf3d5bf6c5cedcdbc6"},
@@ -3350,7 +3407,7 @@ description = "An XML Schema validator and decoder"
optional = true
python-versions = ">=3.7"
groups = ["main"]
markers = "extra == \"all\" or extra == \"saml2\""
markers = "extra == \"saml2\" or extra == \"all\""
files = [
{file = "xmlschema-2.4.0-py3-none-any.whl", hash = "sha256:dc87be0caaa61f42649899189aab2fd8e0d567f2cf548433ba7b79278d231a4a"},
{file = "xmlschema-2.4.0.tar.gz", hash = "sha256:d74cd0c10866ac609e1ef94a5a69b018ad16e39077bc6393408b40c6babee793"},
@@ -3468,15 +3525,15 @@ docs = ["Sphinx", "repoze.sphinx.autointerface"]
test = ["zope.i18nmessageid", "zope.testing", "zope.testrunner"]
[extras]
all = ["authlib", "hiredis", "jaeger-client", "lxml", "matrix-synapse-ldap3", "opentracing", "psycopg2", "psycopg2cffi", "psycopg2cffi-compat", "pympler", "pysaml2", "sentry-sdk", "txredisapi"]
all = ["authlib", "defusedxml", "hiredis", "jaeger-client", "lxml", "matrix-synapse-ldap3", "opentracing", "psycopg2", "psycopg2cffi", "psycopg2cffi-compat", "pympler", "pysaml2", "pytz", "sentry-sdk", "thrift", "tornado", "txredisapi"]
cache-memory = ["pympler"]
jwt = ["authlib"]
matrix-synapse-ldap3 = ["matrix-synapse-ldap3"]
oidc = ["authlib"]
opentracing = ["jaeger-client", "opentracing"]
opentracing = ["jaeger-client", "opentracing", "thrift", "tornado"]
postgres = ["psycopg2", "psycopg2cffi", "psycopg2cffi-compat"]
redis = ["hiredis", "txredisapi"]
saml2 = ["pysaml2"]
saml2 = ["defusedxml", "pysaml2", "pytz"]
sentry = ["sentry-sdk"]
systemd = ["systemd-python"]
test = ["idna", "parameterized"]
@@ -3485,4 +3542,4 @@ url-preview = ["lxml"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.10.0,<4.0.0"
content-hash = "960ddae65fde8574f0f36b6988622fc4baf7646823c36699c5cd4773cad8b0ed"
content-hash = "1caa5072f6304122c89377420f993a54f54587f3618ccc8094ec31642264592c"

View File

@@ -42,7 +42,8 @@ dependencies = [
"Twisted[tls]>=21.2.0",
"treq>=21.5.0",
# Twisted has required pyopenssl 16.0 since about Twisted 16.6.
"pyOpenSSL>=16.0.0",
# pyOpenSSL 16.2.0 fixes compatibility with OpenSSL 1.1.0.
"pyOpenSSL>=16.2.0",
"PyYAML>=5.3",
"pyasn1>=0.1.9",
"pyasn1-modules>=0.0.7",
@@ -95,6 +96,25 @@ dependencies = [
# This is used for parsing multipart responses
"python-multipart>=0.0.9",
# Transitive dependency constraints
# These dependencies aren't directly required by Synapse.
# However, in order for Synapse to build, Synapse requires a higher minimum version
# for these dependencies than the minimum specified by the direct dependency.
# We should periodically check to see if these dependencies are still necessary and
# remove any that are no longer required.
"cffi>=1.15", # via cryptography
"pynacl>=1.3", # via signedjson
"pyparsing>=2.4", # via packaging
"pyrsistent>=0.18.0", # via jsonschema
"requests>=2.16.0", # 2.16.0+ no longer vendors urllib3, avoiding Python 3.10+ incompatibility
"urllib3>=1.26.5", # via treq; 1.26.5 fixes Python 3.10+ collections.abc compatibility
# 5.2 is the current version in Debian oldstable. If we don't care to support that, then 5.4 is
# the minimum version from Ubuntu 22.04 and RHEL 9. (as of 2025-12)
# When bumping this version to 6.2 or above, refer to https://github.com/element-hq/synapse/pull/19274
# for details of Synapse improvements that may be unlocked. Particularly around the use of `|`
# syntax with zope interface types.
"zope-interface>=5.2", # via twisted
]
[project.optional-dependencies]
@@ -104,7 +124,16 @@ postgres = [
"psycopg2cffi>=2.8;platform_python_implementation == 'PyPy'",
"psycopg2cffi-compat==1.1;platform_python_implementation == 'PyPy'",
]
saml2 = ["pysaml2>=4.5.0"]
saml2 = [
"pysaml2>=4.5.0",
# Transitive dependencies from pysaml2
# These dependencies aren't directly required by Synapse.
# However, in order for Synapse to build, Synapse requires a higher minimum version
# for these dependencies than the minimum specified by the direct dependency.
"defusedxml>=0.7.1", # via pysaml2
"pytz>=2018.3", # via pysaml2
]
oidc = ["authlib>=0.15.1"]
# systemd-python is necessary for logging to the systemd journal via
# `systemd.journal.JournalHandler`, as is documented in
@@ -112,13 +141,23 @@ oidc = ["authlib>=0.15.1"]
systemd = ["systemd-python>=231"]
url-preview = ["lxml>=4.6.3"]
sentry = ["sentry-sdk>=0.7.2"]
opentracing = ["jaeger-client>=4.2.0", "opentracing>=2.2.0"]
opentracing = [
"jaeger-client>=4.2.0",
"opentracing>=2.2.0",
# Transitive dependencies from jaeger-client
# These dependencies aren't directly required by Synapse.
# However, in order for Synapse to build, Synapse requires a higher minimum version
# for these dependencies than the minimum specified by the direct dependency.
"thrift>=0.10", # via jaeger-client
"tornado>=6.0", # via jaeger-client
]
jwt = ["authlib"]
# hiredis is not a *strict* dependency, but it makes things much faster.
# (if it is not installed, we fall back to slow code.)
redis = ["txredisapi>=1.4.7", "hiredis"]
redis = ["txredisapi>=1.4.7", "hiredis>=0.3"]
# Required to use experimental `caches.track_memory_usage` config option.
cache-memory = ["pympler"]
cache-memory = ["pympler>=1.0"]
# If this is updated, don't forget to update the equivalent lines in
# `dependency-groups.dev` below.
test = ["parameterized>=0.9.0", "idna>=3.3"]
@@ -149,12 +188,22 @@ all = [
# opentracing
"jaeger-client>=4.2.0", "opentracing>=2.2.0",
# redis
"txredisapi>=1.4.7", "hiredis",
"txredisapi>=1.4.7", "hiredis>=0.3",
# cache-memory
"pympler",
# 1.0 added support for python 3.10, our current minimum supported python version
"pympler>=1.0",
# omitted:
# - test: it's useful to have this separate from dev deps in the olddeps job
# - systemd: this is a system-based requirement
# Transitive dependencies
# These dependencies aren't directly required by Synapse.
# However, in order for Synapse to build, Synapse requires a higher minimum version
# for these dependencies than the minimum specified by the direct dependency.
"defusedxml>=0.7.1", # via pysaml2
"pytz>=2018.3", # via pysaml2
"thrift>=0.10", # via jaeger-client
"tornado>=6.0", # via jaeger-client
]
[project.urls]
@@ -339,15 +388,10 @@ select = [
"G",
# pyupgrade
"UP006",
"UP007",
"UP045",
]
extend-safe-fixes = [
# pyupgrade rules compatible with Python >= 3.9
"UP006",
"UP007",
# pyupgrade rules compatible with Python >= 3.10
"UP045",
# Allow ruff to automatically fix trailing spaces within a multi-line string/comment.
"W293"
]
@@ -427,9 +471,6 @@ skip = "cp3??t-* *i686* *macosx*"
enable = "pypy"
# We need a rust compiler.
#
# We temporarily pin Rust to 1.82.0 to work around
# https://github.com/element-hq/synapse/issues/17988
before-all = "sh .ci/before_build_wheel.sh"
environment= { PATH = "$PATH:$HOME/.cargo/bin" }

View File

@@ -14,7 +14,6 @@ import sqlglot.expressions
SCHEMA_FILE_REGEX = re.compile(r"^synapse/storage/schema/(.*)/delta/(.*)/(.*)$")
# The base branch we want to check against. We use the main development branch
# on the assumption that is what we are developing against.
DEVELOP_BRANCH = "develop"

View File

@@ -145,7 +145,7 @@ def request(
print("Requesting %s" % dest, file=sys.stderr)
s = requests.Session()
s.mount("matrix-federation://", MatrixConnectionAdapter())
s.mount("matrix-federation://", MatrixConnectionAdapter(verify_tls=verify_tls))
headers: dict[str, str] = {
"Authorization": authorization_headers[0],
@@ -267,6 +267,17 @@ def read_args_from_config(args: argparse.Namespace) -> None:
class MatrixConnectionAdapter(HTTPAdapter):
"""
A Matrix federation-aware HTTP Adapter.
"""
verify_tls: bool
"""whether to verify the remote server's TLS certificate."""
def __init__(self, verify_tls: bool = True) -> None:
self.verify_tls = verify_tls
super().__init__()
def send(
self,
request: PreparedRequest,
@@ -280,7 +291,7 @@ class MatrixConnectionAdapter(HTTPAdapter):
assert isinstance(request.url, str)
parsed = urlparse.urlsplit(request.url)
server_name = parsed.netloc
well_known = self._get_well_known(parsed.netloc)
well_known = self._get_well_known(parsed.netloc, verify_tls=self.verify_tls)
if well_known:
server_name = well_known
@@ -318,6 +329,21 @@ class MatrixConnectionAdapter(HTTPAdapter):
print(
f"Connecting to {host}:{port} with SNI {ssl_server_name}", file=sys.stderr
)
if proxies:
scheme = parsed.scheme
if isinstance(scheme, bytes):
scheme = scheme.decode("utf-8")
proxy_for_scheme = proxies.get(scheme)
if proxy_for_scheme:
return self.proxy_manager_for(proxy_for_scheme).connection_from_host(
host,
port=port,
scheme="https",
pool_kwargs={"server_hostname": ssl_server_name},
)
return self.poolmanager.connection_from_host(
host,
port=port,
@@ -368,7 +394,7 @@ class MatrixConnectionAdapter(HTTPAdapter):
return server_name, 8448, server_name
@staticmethod
def _get_well_known(server_name: str) -> str | None:
def _get_well_known(server_name: str, verify_tls: bool = True) -> str | None:
if ":" in server_name:
# explicit port, or ipv6 literal. Either way, no .well-known
return None
@@ -379,7 +405,7 @@ class MatrixConnectionAdapter(HTTPAdapter):
print(f"fetching {uri}", file=sys.stderr)
try:
resp = requests.get(uri)
resp = requests.get(uri, verify=verify_tls)
if resp.status_code != 200:
print("%s gave %i" % (uri, resp.status_code), file=sys.stderr)
return None

View File

@@ -36,6 +36,7 @@ from typing import (
Awaitable,
Callable,
NoReturn,
Optional,
cast,
)
from wsgiref.simple_server import WSGIServer
@@ -455,7 +456,7 @@ def listen_http(
root_resource: Resource,
version_string: str,
max_request_body_size: int,
context_factory: IOpenSSLContextFactory | None,
context_factory: Optional[IOpenSSLContextFactory],
reactor: ISynapseReactor = reactor,
) -> list[Port]:
"""

View File

@@ -24,7 +24,7 @@ import logging
import os
import sys
import tempfile
from typing import Mapping, Sequence
from typing import Mapping, Optional, Sequence
from twisted.internet import defer, task
@@ -291,7 +291,7 @@ def load_config(argv_options: list[str]) -> tuple[HomeServerConfig, argparse.Nam
def create_homeserver(
config: HomeServerConfig,
reactor: ISynapseReactor | None = None,
reactor: Optional[ISynapseReactor] = None,
) -> AdminCmdServer:
"""
Create a homeserver instance for the Synapse admin command process.

View File

@@ -21,6 +21,7 @@
#
import logging
import sys
from typing import Optional
from twisted.web.resource import Resource
@@ -335,7 +336,7 @@ def load_config(argv_options: list[str]) -> HomeServerConfig:
def create_homeserver(
config: HomeServerConfig,
reactor: ISynapseReactor | None = None,
reactor: Optional[ISynapseReactor] = None,
) -> GenericWorkerServer:
"""
Create a homeserver instance for the Synapse worker process.

View File

@@ -22,7 +22,7 @@
import logging
import os
import sys
from typing import Iterable
from typing import Iterable, Optional
from twisted.internet.tcp import Port
from twisted.web.resource import EncodingResourceWrapper, Resource
@@ -350,7 +350,7 @@ def load_or_generate_config(argv_options: list[str]) -> HomeServerConfig:
def create_homeserver(
config: HomeServerConfig,
reactor: ISynapseReactor | None = None,
reactor: Optional[ISynapseReactor] = None,
) -> SynapseHomeServer:
"""
Create a homeserver instance for the Synapse main process.

View File

@@ -13,7 +13,7 @@
#
import logging
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional
from twisted.internet.interfaces import IDelayedCall
@@ -74,7 +74,7 @@ class DelayedEventsHandler:
cfg=self._config.ratelimiting.rc_delayed_event_mgmt,
)
self._next_delayed_event_call: IDelayedCall | None = None
self._next_delayed_event_call: Optional[IDelayedCall] = None
# The current position in the current_state_delta stream
self._event_pos: int | None = None

View File

@@ -22,7 +22,7 @@
import logging
import random
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Mapping, Sequence
from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence
from canonicaljson import encode_canonical_json
@@ -111,7 +111,7 @@ class MessageHandler:
# The scheduled call to self._expire_event. None if no call is currently
# scheduled.
self._scheduled_expiry: IDelayedCall | None = None
self._scheduled_expiry: Optional[IDelayedCall] = None
if not hs.config.worker.worker_app:
self.hs.run_as_background_process(

View File

@@ -17,6 +17,7 @@ import logging
from itertools import chain
from typing import TYPE_CHECKING, AbstractSet, Mapping
import attr
from prometheus_client import Histogram
from typing_extensions import assert_never
@@ -62,6 +63,7 @@ from synapse.types.handlers.sliding_sync import (
HaveSentRoomFlag,
MutablePerConnectionState,
PerConnectionState,
RoomLazyMembershipChanges,
RoomSyncConfig,
SlidingSyncConfig,
SlidingSyncResult,
@@ -106,7 +108,7 @@ class SlidingSyncHandler:
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
self.is_mine_id = hs.is_mine_id
self.connection_store = SlidingSyncConnectionStore(self.store)
self.connection_store = SlidingSyncConnectionStore(self.clock, self.store)
self.extensions = SlidingSyncExtensionHandler(hs)
self.room_lists = SlidingSyncRoomLists(hs)
@@ -981,14 +983,15 @@ class SlidingSyncHandler:
#
# Calculate the `StateFilter` based on the `required_state` for the room
required_state_filter = StateFilter.none()
# The requested `required_state_map` with the lazy membership expanded and
# `$ME` replaced with the user's ID. This allows us to see what membership we've
# sent down to the client in the next request.
#
# Make a copy so we can modify it. Still need to be careful to make a copy of
# the state key sets if we want to add/remove from them. We could make a deep
# copy but this saves us some work.
expanded_required_state_map = dict(room_sync_config.required_state_map)
# Keep track of which users' state we may need to fetch. We split this
# into explicit users and lazy loaded users.
explicit_user_state = set()
lazy_load_user_ids = set()
# Whether lazy-loading of room members is enabled.
lazy_load_room_members = False
if room_membership_for_user_at_to_token.membership not in (
Membership.INVITE,
Membership.KNOCK,
@@ -1036,7 +1039,6 @@ class SlidingSyncHandler:
else:
required_state_types: list[tuple[str, str | None]] = []
num_wild_state_keys = 0
lazy_load_room_members = False
num_others = 0
for (
state_type,
@@ -1068,43 +1070,60 @@ class SlidingSyncHandler:
timeline_event.state_key
)
# The client needs to know the membership of everyone in
# the timeline we're returning.
lazy_load_user_ids.update(timeline_membership)
# Update the required state filter so we pick up the new
# membership
for user_id in timeline_membership:
required_state_types.append(
(EventTypes.Member, user_id)
)
if limited or initial:
# If the timeline is limited, we only need to
# return the membership changes for people in
# the timeline.
for user_id in timeline_membership:
required_state_types.append(
(EventTypes.Member, user_id)
)
else:
# For non-limited timelines we always return all
# membership changes. This is so that clients
# who have fetched the full membership list
# already can continue to maintain it for
# non-limited syncs.
#
# This assumes that for non-limited syncs there
# won't be many membership changes that wouldn't
# have been included already (this can only
# happen if membership state was rolled back due
# to state resolution anyway).
#
# `None` is a wildcard in the `StateFilter`
required_state_types.append((EventTypes.Member, None))
# Add an explicit entry for each user in the timeline
#
# Make a new set or copy of the state key set so we can
# modify it without affecting the original
# `required_state_map`
expanded_required_state_map[EventTypes.Member] = (
expanded_required_state_map.get(
EventTypes.Member, set()
# Record the extra members we're returning.
lazy_load_user_ids.update(
state_key
for event_type, state_key in room_state_delta_id_map
if event_type == EventTypes.Member
)
| timeline_membership
)
elif state_key == StateValues.ME:
else:
num_others += 1
required_state_types.append((state_type, user.to_string()))
# Replace `$ME` with the user's ID so we can deduplicate
# when someone requests the same state with `$ME` or with
# their user ID.
#
# Make a new set or copy of the state key set so we can
# modify it without affecting the original
# `required_state_map`
expanded_required_state_map[EventTypes.Member] = (
expanded_required_state_map.get(
EventTypes.Member, set()
)
| {user.to_string()}
normalized_state_key = state_key
if state_key == StateValues.ME:
normalized_state_key = user.to_string()
if state_type == EventTypes.Member:
# Also track explicitly requested member state for
# lazy membership tracking.
explicit_user_state.add(normalized_state_key)
required_state_types.append(
(state_type, normalized_state_key)
)
else:
num_others += 1
required_state_types.append((state_type, state_key))
set_tag(
SynapseTags.FUNC_ARG_PREFIX
@@ -1122,6 +1141,10 @@ class SlidingSyncHandler:
required_state_filter = StateFilter.from_types(required_state_types)
# Remove any explicitly requested user state from the lazy-loaded set,
# as we track them separately.
lazy_load_user_ids -= explicit_user_state
# We need this base set of info for the response so let's just fetch it along
# with the `required_state` for the room
hero_room_state = [
@@ -1149,6 +1172,22 @@ class SlidingSyncHandler:
# We can return all of the state that was requested if this was the first
# time we've sent the room down this connection.
room_state: StateMap[EventBase] = {}
# Includes the state for the heroes if we need them (may contain other
# state as well).
hero_membership_state: StateMap[EventBase] = {}
# By default, we mark all `lazy_load_user_ids` as being sent down
# for the first time in this sync. We later check if we sent any of them
# down previously and update `returned_user_id_to_last_seen_ts_map` if
# we have.
returned_user_id_to_last_seen_ts_map = {}
if lazy_load_room_members:
returned_user_id_to_last_seen_ts_map = dict.fromkeys(lazy_load_user_ids)
new_connection_state.room_lazy_membership[room_id] = RoomLazyMembershipChanges(
returned_user_id_to_last_seen_ts_map=returned_user_id_to_last_seen_ts_map
)
if initial:
room_state = await self.get_current_state_at(
room_id=room_id,
@@ -1156,28 +1195,97 @@ class SlidingSyncHandler:
state_filter=state_filter,
to_token=to_token,
)
# The `room_state` includes the hero membership state if needed.
# We'll later filter this down so we don't need to do so here.
hero_membership_state = room_state
else:
assert from_token is not None
assert from_bound is not None
if prev_room_sync_config is not None:
# Define `all_required_user_state` as all user state we want, which
# is the explicitly requested members, any needed for lazy
# loading, and users whose membership has changed.
all_required_user_state = explicit_user_state | lazy_load_user_ids
for state_type, state_key in room_state_delta_id_map:
if state_type == EventTypes.Member:
all_required_user_state.add(state_key)
# We need to know what user state we previously sent down the
# connection so we can determine what has changed.
#
# We need to fetch all users whose memberships we may want
# to send down this sync. This includes (and matches
# `all_required_user_state`):
# 1. Explicitly requested user state
# 2. Lazy loaded members, i.e. users who appear in the
# timeline.
# 3. The users whose membership has changed in the room, i.e.
# in the state deltas.
#
# This is to correctly handle the cases where a user was
# previously sent down as a lazy loaded member:
# - and is now explicitly requested (so shouldn't be sent down
# again); or
# - their membership has changed (so we need to invalidate
# their entry in the lazy loaded table if we don't send the
# change down).
if all_required_user_state:
previously_returned_user_to_last_seen = (
await self.store.get_sliding_sync_connection_lazy_members(
connection_position=from_token.connection_position,
room_id=room_id,
user_ids=all_required_user_state,
)
)
# Update the room lazy membership changes to track which
# lazy loaded members were needed for this sync. This is so
# that we can correctly track the last time we sent down
# users' membership (and so can evict old membership state
# from the DB tables).
returned_user_id_to_last_seen_ts_map.update(
(user_id, timestamp)
for user_id, timestamp in previously_returned_user_to_last_seen.items()
if user_id in lazy_load_user_ids
)
else:
previously_returned_user_to_last_seen = {}
# Check if there are any changes to the required state config
# that we need to handle.
changed_required_state_map, added_state_filter = (
_required_state_changes(
user.to_string(),
prev_required_state_map=prev_room_sync_config.required_state_map,
request_required_state_map=expanded_required_state_map,
state_deltas=room_state_delta_id_map,
)
changes_return = _required_state_changes(
user.to_string(),
prev_required_state_map=prev_room_sync_config.required_state_map,
request_required_state_map=room_sync_config.required_state_map,
previously_returned_lazy_user_ids=previously_returned_user_to_last_seen.keys(),
request_lazy_load_user_ids=lazy_load_user_ids,
state_deltas=room_state_delta_id_map,
)
changed_required_state_map = changes_return.changed_required_state_map
if added_state_filter:
new_connection_state.room_lazy_membership[
room_id
].invalidated_user_ids = changes_return.lazy_members_invalidated
# Add any previously returned explicit memberships to the lazy
# loaded table. This happens when a client requested explicit
# members and then converted them to lazy loading.
for user_id in changes_return.extra_users_to_add_to_lazy_cache:
# We don't know the right timestamp to use here, as we don't
# know the last time we would have sent the membership down.
# So we don't overwrite it if we have a timestamp already,
# and fallback to `None` (which means now) if we don't.
returned_user_id_to_last_seen_ts_map.setdefault(user_id, None)
if changes_return.added_state_filter:
# Some state entries got added, so we pull out the current
# state for them. If we don't do this we'd only send down new deltas.
state_ids = await self.get_current_state_ids_at(
room_id=room_id,
room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
state_filter=added_state_filter,
state_filter=changes_return.added_state_filter,
to_token=to_token,
)
room_state_delta_id_map.update(state_ids)
@@ -1189,6 +1297,7 @@ class SlidingSyncHandler:
# If the membership changed and we have to get heroes, get the remaining
# heroes from the state
hero_membership_state = {}
if hero_user_ids:
hero_membership_state = await self.get_current_state_at(
room_id=room_id,
@@ -1196,7 +1305,6 @@ class SlidingSyncHandler:
state_filter=StateFilter.from_types(hero_room_state),
to_token=to_token,
)
room_state.update(hero_membership_state)
required_room_state: StateMap[EventBase] = {}
if required_state_filter != StateFilter.none():
@@ -1219,7 +1327,7 @@ class SlidingSyncHandler:
# Assemble heroes: extract the info from the state we just fetched
heroes: list[SlidingSyncResult.RoomResult.StrippedHero] = []
for hero_user_id in hero_user_ids:
member_event = room_state.get((EventTypes.Member, hero_user_id))
member_event = hero_membership_state.get((EventTypes.Member, hero_user_id))
if member_event is not None:
heroes.append(
SlidingSyncResult.RoomResult.StrippedHero(
@@ -1281,7 +1389,7 @@ class SlidingSyncHandler:
bump_stamp = 0
room_sync_required_state_map_to_persist: Mapping[str, AbstractSet[str]] = (
expanded_required_state_map
room_sync_config.required_state_map
)
if changed_required_state_map:
room_sync_required_state_map_to_persist = changed_required_state_map
@@ -1471,13 +1579,37 @@ class SlidingSyncHandler:
return None
@attr.s(auto_attribs=True)
class _RequiredStateChangesReturn:
"""Return type for _required_state_changes."""
changed_required_state_map: Mapping[str, AbstractSet[str]] | None
"""The updated required state map to store in the room config, or None if
there is no change."""
added_state_filter: StateFilter
"""The state filter to use to fetch any additional current state that needs
to be returned to the client."""
extra_users_to_add_to_lazy_cache: AbstractSet[str] = frozenset()
"""The set of user IDs we should add to the lazy members cache that we had
previously returned. Handles the case where a user was previously sent down
explicitly but is now being lazy loaded."""
lazy_members_invalidated: AbstractSet[str] = frozenset()
"""The set of user IDs whose membership has changed but we didn't send down,
so we need to invalidate them from the cache."""
def _required_state_changes(
user_id: str,
*,
prev_required_state_map: Mapping[str, AbstractSet[str]],
request_required_state_map: Mapping[str, AbstractSet[str]],
previously_returned_lazy_user_ids: AbstractSet[str],
request_lazy_load_user_ids: AbstractSet[str],
state_deltas: StateMap[str],
) -> tuple[Mapping[str, AbstractSet[str]] | None, StateFilter]:
) -> _RequiredStateChangesReturn:
"""Calculates the changes between the required state room config from the
previous requests compared with the current request.
@@ -1491,14 +1623,62 @@ def _required_state_changes(
added, removed and then added again to the required state. In that case we
only want to re-send that entry down sync if it has changed.
Returns:
A 2-tuple of updated required state config (or None if there is no update)
and the state filter to use to fetch extra current state that we need to
return.
Args:
user_id: The user ID of the user making the request.
prev_required_state_map: The required state map from the previous
request.
request_required_state_map: The required state map from the current
request.
previously_returned_lazy_user_ids: The set of user IDs whose membership
we have previously returned to the client due to lazy loading. This
is filtered to only include users who have either sent events in the
`timeline`, `required_state` or whose membership changed.
request_lazy_load_user_ids: The set of user IDs whose lazy-loaded
membership is required for this request.
state_deltas: The state deltas in the room in the request token range,
considering user membership. See `get_current_state_deltas_for_room`
for more details.
"""
# First we find any lazy members that have been invalidated due to state
# changes that we are not sending down.
lazy_members_invalidated = set()
for event_type, state_key in state_deltas:
if event_type != EventTypes.Member:
continue
if state_key in request_lazy_load_user_ids:
# Because it's part of the `request_lazy_load_user_ids`, we're going to
# send this member change down.
continue
if state_key not in previously_returned_lazy_user_ids:
# We've not previously returned this member so nothing to
# invalidate.
continue
lazy_members_invalidated.add(state_key)
if prev_required_state_map == request_required_state_map:
# There has been no change. Return immediately.
return None, StateFilter.none()
# There has been no change in state, just need to check lazy members.
newly_returned_lazy_members = (
request_lazy_load_user_ids - previously_returned_lazy_user_ids
)
if newly_returned_lazy_members:
# There are some new lazy members we need to fetch.
added_types: list[tuple[str, str | None]] = []
for new_user_id in newly_returned_lazy_members:
added_types.append((EventTypes.Member, new_user_id))
added_state_filter = StateFilter.from_types(added_types)
else:
added_state_filter = StateFilter.none()
return _RequiredStateChangesReturn(
changed_required_state_map=None,
added_state_filter=added_state_filter,
lazy_members_invalidated=lazy_members_invalidated,
)
prev_wildcard = prev_required_state_map.get(StateValues.WILDCARD, set())
request_wildcard = request_required_state_map.get(StateValues.WILDCARD, set())
@@ -1508,17 +1688,29 @@ def _required_state_changes(
# already fetching everything, we don't have to fetch anything now that they've
# narrowed.
if StateValues.WILDCARD in prev_wildcard:
return request_required_state_map, StateFilter.none()
return _RequiredStateChangesReturn(
changed_required_state_map=request_required_state_map,
added_state_filter=StateFilter.none(),
lazy_members_invalidated=lazy_members_invalidated,
)
# If a event type wildcard has been added or removed we don't try and do
# anything fancy, and instead always update the effective room required
# state config to match the request.
if request_wildcard - prev_wildcard:
# Some keys were added, so we need to fetch everything
return request_required_state_map, StateFilter.all()
return _RequiredStateChangesReturn(
changed_required_state_map=request_required_state_map,
added_state_filter=StateFilter.all(),
lazy_members_invalidated=lazy_members_invalidated,
)
if prev_wildcard - request_wildcard:
# Keys were only removed, so we don't have to fetch everything.
return request_required_state_map, StateFilter.none()
return _RequiredStateChangesReturn(
changed_required_state_map=request_required_state_map,
added_state_filter=StateFilter.none(),
lazy_members_invalidated=lazy_members_invalidated,
)
# Contains updates to the required state map compared with the previous room
# config. This has the same format as `RoomSyncConfig.required_state`
@@ -1550,6 +1742,17 @@ def _required_state_changes(
# Nothing *added*, so we skip. Removals happen below.
continue
# Handle the special case of adding `$LAZY` membership, where we want to
# always record the change to be lazy loading, as we immediately start
# using the lazy loading tables so there is no point *not* recording the
# change to lazy load in the effective room config.
if event_type == EventTypes.Member:
old_state_key_lazy = StateValues.LAZY in old_state_keys
request_state_key_lazy = StateValues.LAZY in request_state_keys
if not old_state_key_lazy and request_state_key_lazy:
changes[event_type] = request_state_keys
continue
# We only remove state keys from the effective state if they've been
# removed from the request *and* the state has changed. This ensures
# that if a client removes and then re-adds a state key, we only send
@@ -1620,9 +1823,31 @@ def _required_state_changes(
# LAZY values should also be ignore for event types that are
# not membership.
pass
elif event_type == EventTypes.Member:
if state_key not in previously_returned_lazy_user_ids:
# Only add *explicit* members we haven't previously sent
# down.
added.append((event_type, state_key))
else:
added.append((event_type, state_key))
previously_required_state_members = set(
prev_required_state_map.get(EventTypes.Member, ())
)
if StateValues.ME in previously_required_state_members:
previously_required_state_members.add(user_id)
# We also need to pull out any lazy members that are now required but
# haven't previously been returned.
for required_user_id in (
request_lazy_load_user_ids
# Remove previously returned users
- previously_returned_lazy_user_ids
# Exclude previously explicitly requested members.
- previously_required_state_members
):
added.append((EventTypes.Member, required_user_id))
added_state_filter = StateFilter.from_types(added)
# Figure out what changes we need to apply to the effective required state
@@ -1663,13 +1888,25 @@ def _required_state_changes(
changes[event_type] = request_state_keys
continue
# When handling $LAZY membership, we want to either a) not update the
# state or b) update it to match the request. This is to avoid churn of
# the effective required state for rooms (we deduplicate required state
# between rooms), and because we can store the previously returned
# explicit memberships with the lazy loaded memberships.
if event_type == EventTypes.Member:
old_state_key_lazy = StateValues.LAZY in old_state_keys
request_state_key_lazy = StateValues.LAZY in request_state_keys
has_lazy = old_state_key_lazy or request_state_key_lazy
# If a "$LAZY" has been added or removed we always update to match
# the request.
if old_state_key_lazy != request_state_key_lazy:
# If a "$LAZY" has been added or removed we always update the effective room
# required state config to match the request.
changes[event_type] = request_state_keys
continue
# Or if we have lazy membership and there are invalidated
# explicit memberships.
if has_lazy and invalidated_state_keys:
changes[event_type] = request_state_keys
continue
@@ -1684,6 +1921,28 @@ def _required_state_changes(
if invalidated_state_keys:
changes[event_type] = old_state_keys - invalidated_state_keys
# Check for any explicit membership changes that were removed that we can
# add to the lazy members previously returned. This is so that we don't
# return a user due to lazy loading if they were previously returned as an
# explicit membership.
users_to_add_to_lazy_cache: set[str] = set()
membership_changes = changes.get(EventTypes.Member, set())
if membership_changes and StateValues.LAZY in request_state_keys:
for state_key in prev_required_state_map.get(EventTypes.Member, set()):
if state_key == StateValues.WILDCARD or state_key == StateValues.LAZY:
# Ignore non-user IDs.
continue
if state_key == StateValues.ME:
# Normalize to proper user ID
state_key = user_id
# We remember the user if they haven't been invalidated
if (EventTypes.Member, state_key) not in state_deltas:
users_to_add_to_lazy_cache.add(state_key)
new_required_state_map = None
if changes:
# Update the required state config based on the changes.
new_required_state_map = dict(prev_required_state_map)
@@ -1694,6 +1953,9 @@ def _required_state_changes(
# Remove entries with empty state keys.
new_required_state_map.pop(event_type, None)
return new_required_state_map, added_state_filter
else:
return None, added_state_filter
return _RequiredStateChangesReturn(
changed_required_state_map=new_required_state_map,
added_state_filter=added_state_filter,
lazy_members_invalidated=lazy_members_invalidated,
extra_users_to_add_to_lazy_cache=users_to_add_to_lazy_cache,
)

View File

@@ -13,7 +13,6 @@
#
import logging
from typing import TYPE_CHECKING
import attr
@@ -25,9 +24,7 @@ from synapse.types.handlers.sliding_sync import (
PerConnectionState,
SlidingSyncConfig,
)
if TYPE_CHECKING:
pass
from synapse.util.clock import Clock
logger = logging.getLogger(__name__)
@@ -61,7 +58,8 @@ class SlidingSyncConnectionStore:
to mapping of room ID to `HaveSentRoom`.
"""
store: "DataStore"
clock: Clock
store: DataStore
async def get_and_clear_connection_positions(
self,
@@ -101,7 +99,7 @@ class SlidingSyncConnectionStore:
If there are no changes to the state this may return the same token as
the existing per-connection state.
"""
if not new_connection_state.has_updates():
if not new_connection_state.has_updates(self.clock):
if from_token is not None:
return from_token.connection_position
else:

View File

@@ -21,7 +21,7 @@
import logging
from http import HTTPStatus
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional
from twisted.internet.interfaces import IDelayedCall
@@ -125,7 +125,7 @@ class UserDirectoryHandler(StateDeltasHandler):
# Guard to ensure we only have one process for refreshing remote profiles
self._is_refreshing_remote_profiles = False
# Handle to cancel the `call_later` of `kick_off_remote_profile_refresh_process`
self._refresh_remote_profiles_call_later: IDelayedCall | None = None
self._refresh_remote_profiles_call_later: Optional[IDelayedCall] = None
# Guard to ensure we only have one process for refreshing remote profiles
# for the given servers.

View File

@@ -28,6 +28,7 @@ from typing import (
BinaryIO,
Callable,
Mapping,
Optional,
Protocol,
)
@@ -313,7 +314,7 @@ class BlocklistingAgentWrapper(Agent):
method: bytes,
uri: bytes,
headers: Headers | None = None,
bodyProducer: IBodyProducer | None = None,
bodyProducer: Optional[IBodyProducer] = None,
) -> defer.Deferred:
h = urllib.parse.urlparse(uri.decode("ascii"))
@@ -1033,7 +1034,7 @@ class BodyExceededMaxSize(Exception):
class _DiscardBodyWithMaxSizeProtocol(protocol.Protocol):
"""A protocol which immediately errors upon receiving data."""
transport: ITCPTransport | None = None
transport: Optional[ITCPTransport] = None
def __init__(self, deferred: defer.Deferred):
self.deferred = deferred
@@ -1075,7 +1076,7 @@ class _MultipartParserProtocol(protocol.Protocol):
Protocol to read and parse a MSC3916 multipart/mixed response
"""
transport: ITCPTransport | None = None
transport: Optional[ITCPTransport] = None
def __init__(
self,
@@ -1188,7 +1189,7 @@ class _MultipartParserProtocol(protocol.Protocol):
class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
"""A protocol which reads body to a stream, erroring if the body exceeds a maximum size."""
transport: ITCPTransport | None = None
transport: Optional[ITCPTransport] = None
def __init__(
self, stream: ByteWriteable, deferred: defer.Deferred, max_size: int | None

View File

@@ -19,7 +19,7 @@
#
import logging
import urllib.parse
from typing import Any, Generator
from typing import Any, Generator, Optional
from urllib.request import ( # type: ignore[attr-defined]
proxy_bypass_environment,
)
@@ -173,7 +173,7 @@ class MatrixFederationAgent:
method: bytes,
uri: bytes,
headers: Headers | None = None,
bodyProducer: IBodyProducer | None = None,
bodyProducer: Optional[IBodyProducer] = None,
) -> Generator[defer.Deferred, Any, IResponse]:
"""
Args:

View File

@@ -33,6 +33,7 @@ from typing import (
Callable,
Generic,
Literal,
Optional,
TextIO,
TypeVar,
cast,
@@ -691,7 +692,7 @@ class MatrixFederationHttpClient:
destination_bytes, method_bytes, url_to_sign_bytes, json
)
data = encode_canonical_json(json)
producer: IBodyProducer | None = QuieterFileBodyProducer(
producer: Optional[IBodyProducer] = QuieterFileBodyProducer(
BytesIO(data), cooperator=self._cooperator
)
else:

View File

@@ -22,7 +22,7 @@
import json
import logging
import urllib.parse
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, Any, Optional, cast
from twisted.internet import protocol
from twisted.internet.interfaces import ITCPTransport
@@ -237,7 +237,7 @@ class _ProxyResponseBody(protocol.Protocol):
request.
"""
transport: ITCPTransport | None = None
transport: Optional[ITCPTransport] = None
def __init__(self, request: "SynapseRequest") -> None:
self._request = request

View File

@@ -21,7 +21,7 @@
import logging
import random
import re
from typing import Any, Collection, Sequence, cast
from typing import Any, Collection, Optional, Sequence, cast
from urllib.parse import urlparse
from urllib.request import ( # type: ignore[attr-defined]
proxy_bypass_environment,
@@ -119,8 +119,8 @@ class ProxyAgent(_AgentBase):
self,
*,
reactor: IReactorCore,
proxy_reactor: IReactorCore | None = None,
contextFactory: IPolicyForHTTPS | None = None,
proxy_reactor: Optional[IReactorCore] = None,
contextFactory: Optional[IPolicyForHTTPS] = None,
connectTimeout: float | None = None,
bindAddress: bytes | None = None,
pool: HTTPConnectionPool | None = None,
@@ -175,7 +175,7 @@ class ProxyAgent(_AgentBase):
self._policy_for_https = contextFactory
self._reactor = cast(IReactorTime, reactor)
self._federation_proxy_endpoint: IStreamClientEndpoint | None = None
self._federation_proxy_endpoint: Optional[IStreamClientEndpoint] = None
self._federation_proxy_credentials: ProxyCredentials | None = None
if federation_proxy_locations:
assert federation_proxy_credentials is not None, (
@@ -221,7 +221,7 @@ class ProxyAgent(_AgentBase):
method: bytes,
uri: bytes,
headers: Headers | None = None,
bodyProducer: IBodyProducer | None = None,
bodyProducer: Optional[IBodyProducer] = None,
) -> "defer.Deferred[IResponse]":
"""
Issue a request to the server indicated by the given uri.
@@ -365,11 +365,11 @@ class ProxyAgent(_AgentBase):
def http_proxy_endpoint(
proxy: bytes | None,
reactor: IReactorCore,
tls_options_factory: IPolicyForHTTPS | None,
tls_options_factory: Optional[IPolicyForHTTPS],
timeout: float = 30,
bindAddress: bytes | str | tuple[bytes | str, int] | None = None,
attemptDelay: float | None = None,
) -> tuple[IStreamClientEndpoint | None, ProxyCredentials | None]:
) -> tuple[Optional[IStreamClientEndpoint], ProxyCredentials | None]:
"""Parses an http proxy setting and returns an endpoint for the proxy
Args:

View File

@@ -20,6 +20,7 @@
#
import logging
from typing import Optional
from zope.interface import implementer
@@ -149,7 +150,7 @@ class ReplicationAgent(_AgentBase):
method: bytes,
uri: bytes,
headers: Headers | None = None,
bodyProducer: IBodyProducer | None = None,
bodyProducer: Optional[IBodyProducer] = None,
) -> "defer.Deferred[IResponse]":
"""
Issue a request to the server indicated by the given uri.

View File

@@ -25,7 +25,7 @@ import traceback
from collections import deque
from ipaddress import IPv4Address, IPv6Address, ip_address
from math import floor
from typing import Callable
from typing import Callable, Optional
import attr
from zope.interface import implementer
@@ -113,7 +113,7 @@ class RemoteHandler(logging.Handler):
port: int,
maximum_buffer: int = 1000,
level: int = logging.NOTSET,
_reactor: IReactorTime | None = None,
_reactor: Optional[IReactorTime] = None,
):
super().__init__(level=level)
self.host = host

View File

@@ -3,7 +3,7 @@ import time
from logging import Handler, LogRecord
from logging.handlers import MemoryHandler
from threading import Thread
from typing import cast
from typing import Optional, cast
from twisted.internet.interfaces import IReactorCore
@@ -26,7 +26,7 @@ class PeriodicallyFlushingMemoryHandler(MemoryHandler):
target: Handler | None = None,
flushOnClose: bool = True,
period: float = 5.0,
reactor: IReactorCore | None = None,
reactor: Optional[IReactorCore] = None,
) -> None:
"""
period: the period between automatic flushes

View File

@@ -30,6 +30,7 @@ from typing import (
Awaitable,
BinaryIO,
Generator,
Optional,
)
import attr
@@ -705,7 +706,7 @@ class ThreadedFileSender:
self.file: BinaryIO | None = None
self.deferred: "Deferred[None]" = Deferred()
self.consumer: interfaces.IConsumer | None = None
self.consumer: Optional[IConsumer] = None
# Signals if the thread should keep reading/sending data. Set means
# continue, clear means pause.

View File

@@ -439,7 +439,11 @@ class MediaRepository:
return await self.store.get_cached_remote_media(origin, media_id)
async def get_local_media_info(
self, request: SynapseRequest, media_id: str, max_timeout_ms: int
self,
request: SynapseRequest,
media_id: str,
max_timeout_ms: int,
bypass_quarantine: bool = False,
) -> LocalMedia | None:
"""Gets the info dictionary for given local media ID. If the media has
not been uploaded yet, this function will wait up to ``max_timeout_ms``
@@ -451,6 +455,7 @@ class MediaRepository:
the file_id for local content.)
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
bypass_quarantine: whether to bypass quarantine checks
Returns:
Either the info dictionary for the given local media ID or
@@ -466,7 +471,7 @@ class MediaRepository:
respond_404(request)
return None
if media_info.quarantined_by:
if media_info.quarantined_by and not bypass_quarantine:
logger.info("Media %s is quarantined", media_id)
respond_404(request)
return None
@@ -500,6 +505,7 @@ class MediaRepository:
max_timeout_ms: int,
allow_authenticated: bool = True,
federation: bool = False,
bypass_quarantine: bool = False,
) -> None:
"""Responds to requests for local media, if exists, or returns 404.
@@ -513,11 +519,14 @@ class MediaRepository:
media to be uploaded.
allow_authenticated: whether media marked as authenticated may be served to this request
federation: whether the local media being fetched is for a federation request
bypass_quarantine: whether to bypass quarantine checks
Returns:
Resolves once a response has successfully been written to request
"""
media_info = await self.get_local_media_info(request, media_id, max_timeout_ms)
media_info = await self.get_local_media_info(
request, media_id, max_timeout_ms, bypass_quarantine=bypass_quarantine
)
if not media_info:
return
@@ -561,6 +570,7 @@ class MediaRepository:
ip_address: str,
use_federation_endpoint: bool,
allow_authenticated: bool = True,
bypass_quarantine: bool = False,
) -> None:
"""Respond to requests for remote media.
@@ -577,6 +587,7 @@ class MediaRepository:
federation `/download` endpoint
allow_authenticated: whether media marked as authenticated may be served to this
request
bypass_quarantine: whether to bypass quarantine checks
Returns:
Resolves once a response has successfully been written to request
@@ -609,6 +620,7 @@ class MediaRepository:
ip_address,
use_federation_endpoint,
allow_authenticated,
bypass_quarantine=bypass_quarantine,
)
# Check if the media is cached on the client, if so return 304. We need
@@ -697,6 +709,7 @@ class MediaRepository:
ip_address: str,
use_federation_endpoint: bool,
allow_authenticated: bool,
bypass_quarantine: bool = False,
) -> tuple[Responder | None, RemoteMedia]:
"""Looks for media in local cache, if not there then attempt to
download from remote server.
@@ -712,6 +725,7 @@ class MediaRepository:
ip_address: the IP address of the requester
use_federation_endpoint: whether to request the remote media over the new federation
/download endpoint
bypass_quarantine: whether to bypass quarantine checks
Returns:
A tuple of responder and the media info of the file.
@@ -732,7 +746,7 @@ class MediaRepository:
file_id = media_info.filesystem_id
file_info = FileInfo(server_name, file_id)
if media_info.quarantined_by:
if media_info.quarantined_by and not bypass_quarantine:
logger.info("Media is quarantined")
raise NotFoundError()
@@ -914,6 +928,7 @@ class MediaRepository:
filesystem_id=file_id,
last_access_ts=time_now_ms,
quarantined_by=None,
quarantined_ts=None,
authenticated=authenticated,
sha256=sha256writer.hexdigest(),
)
@@ -1047,6 +1062,7 @@ class MediaRepository:
filesystem_id=file_id,
last_access_ts=time_now_ms,
quarantined_by=None,
quarantined_ts=None,
authenticated=authenticated,
sha256=sha256writer.hexdigest(),
)

View File

@@ -331,10 +331,16 @@ class UrlPreviewer:
# response failed or is incomplete.
og_from_html = parse_html_to_open_graph(tree)
# Compile the Open Graph response by using the scraped
# information from the HTML and overlaying any information
# from the oEmbed response.
og = {**og_from_html, **og_from_oembed}
# Compile an Open Graph response by combining the oEmbed response
# and the information from the HTML, with information in the HTML
# preferred.
#
# The ordering here is intentional: certain websites (especially
# SPA JavaScript-based ones) including Mastodon and YouTube provide
# almost complete OpenGraph descriptions but only stubs for oEmbed,
# with further oEmbed information being populated with JavaScript,
# that Synapse won't execute.
og = og_from_oembed | og_from_html
await self._precache_image_url(user, media_info, og)
else:

View File

@@ -20,7 +20,7 @@
#
import logging
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from twisted.internet.interfaces import IDelayedCall
@@ -71,7 +71,7 @@ class EmailPusher(Pusher):
self.server_name = hs.hostname
self.store = self.hs.get_datastores().main
self.email = pusher_config.pushkey
self.timed_call: IDelayedCall | None = None
self.timed_call: Optional[IDelayedCall] = None
self.throttle_params: dict[str, ThrottleParams] = {}
self._inited = False

View File

@@ -21,7 +21,7 @@
import logging
import random
import urllib.parse
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional
from prometheus_client import Counter
@@ -120,7 +120,7 @@ class HttpPusher(Pusher):
self.data = pusher_config.data
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.failing_since = pusher_config.failing_since
self.timed_call: IDelayedCall | None = None
self.timed_call: Optional[IDelayedCall] = None
self._is_processing = False
self._group_unread_count_by_room = (
hs.config.push.push_group_unread_count_by_room

View File

@@ -114,10 +114,12 @@ from synapse.rest.admin.users import (
UserByThreePid,
UserInvitesCount,
UserJoinedRoomCount,
UserMembershipRestServlet,
UserJoinedRoomsRestServlet,
UserMembershipsRestServlet,
UserRegisterServlet,
UserReplaceMasterCrossSigningKeyRestServlet,
UserRestServletV2,
UserRestServletV2Get,
UsersRestServletV2,
UsersRestServletV3,
UserTokenRestServlet,
@@ -280,6 +282,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
# matrix_authentication_service integration uses the dedicated MAS API.
if hs.config.experimental.msc3861.enabled:
register_servlets_for_msc3861_delegation(hs, http_server)
else:
UserRestServletV2Get(hs).register(http_server)
return
@@ -297,7 +301,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
VersionServlet(hs).register(http_server)
if not auth_delegated:
UserAdminServlet(hs).register(http_server)
UserMembershipRestServlet(hs).register(http_server)
UserJoinedRoomsRestServlet(hs).register(http_server)
UserMembershipsRestServlet(hs).register(http_server)
if not auth_delegated:
UserTokenRestServlet(hs).register(http_server)
UserRestServletV2(hs).register(http_server)

View File

@@ -293,6 +293,38 @@ class ListMediaInRoom(RestServlet):
return HTTPStatus.OK, {"local": local_mxcs, "remote": remote_mxcs}
class ListQuarantinedMedia(RestServlet):
"""Lists all quarantined media on the server."""
PATTERNS = admin_patterns("/media/quarantined$")
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self.auth = hs.get_auth()
async def on_GET(
self,
request: SynapseRequest,
) -> tuple[int, JsonDict]:
await assert_requester_is_admin(self.auth, request)
start = parse_integer(request, "from", default=0)
limit = parse_integer(request, "limit", default=100)
local_or_remote = parse_string(request, "kind", required=True)
if local_or_remote not in ["local", "remote"]:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Query parameter `kind` must be either 'local' or 'remote'.",
)
mxcs = await self.store.get_quarantined_media_mxcs(
start, limit, local_or_remote == "local"
)
return HTTPStatus.OK, {"media": mxcs}
class PurgeMediaCacheRestServlet(RestServlet):
PATTERNS = admin_patterns("/purge_media_cache$")
@@ -532,6 +564,7 @@ def register_servlets_for_media_repo(hs: "HomeServer", http_server: HttpServer)
ProtectMediaByID(hs).register(http_server)
UnprotectMediaByID(hs).register(http_server)
ListMediaInRoom(hs).register(http_server)
ListQuarantinedMedia(hs).register(http_server)
# XXX DeleteMediaByDateSize must be registered before DeleteMediaByID as
# their URL routes overlap.
DeleteMediaByDateSize(hs).register(http_server)

View File

@@ -210,7 +210,7 @@ class UsersRestServletV3(UsersRestServletV2):
return parse_boolean(request, "deactivated")
class UserRestServletV2(RestServlet):
class UserRestServletV2Get(RestServlet):
PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)$", "v2")
"""Get request to list user details.
@@ -220,22 +220,6 @@ class UserRestServletV2(RestServlet):
returns:
200 OK with user details if success otherwise an error.
Put request to allow an administrator to add or modify a user.
This needs user to have administrator access in Synapse.
We use PUT instead of POST since we already know the id of the user
object to create. POST could be used to create guests.
PUT /_synapse/admin/v2/users/<user_id>
{
"password": "secret",
"displayname": "User"
}
returns:
201 OK with new user object if user was created or
200 OK with modified user object if user was modified
otherwise an error.
"""
def __init__(self, hs: "HomeServer"):
@@ -267,6 +251,28 @@ class UserRestServletV2(RestServlet):
return HTTPStatus.OK, user_info_dict
class UserRestServletV2(UserRestServletV2Get):
"""
Put request to allow an administrator to add or modify a user.
This needs user to have administrator access in Synapse.
We use PUT instead of POST since we already know the id of the user
object to create. POST could be used to create guests.
Note: This inherits from `UserRestServletV2Get`, so also supports the `GET` route.
PUT /_synapse/admin/v2/users/<user_id>
{
"password": "secret",
"displayname": "User"
}
returns:
201 OK with new user object if user was created or
200 OK with modified user object if user was modified
otherwise an error.
"""
async def on_PUT(
self, request: SynapseRequest, user_id: str
) -> tuple[int, JsonMapping]:
@@ -1031,7 +1037,7 @@ class UserAdminServlet(RestServlet):
return HTTPStatus.OK, {}
class UserMembershipRestServlet(RestServlet):
class UserJoinedRoomsRestServlet(RestServlet):
"""
Get list of joined room ID's for a user.
"""
@@ -1054,6 +1060,28 @@ class UserMembershipRestServlet(RestServlet):
return HTTPStatus.OK, rooms_response
class UserMembershipsRestServlet(RestServlet):
"""
Get list of room memberships for a user.
"""
PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/memberships$")
def __init__(self, hs: "HomeServer"):
self.is_mine = hs.is_mine
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
async def on_GET(
self, request: SynapseRequest, user_id: str
) -> tuple[int, JsonDict]:
await assert_requester_is_admin(self.auth, request)
memberships = await self.store.get_memberships_for_user(user_id)
return HTTPStatus.OK, {"memberships": memberships}
class PushersRestServlet(RestServlet):
"""
Gets information about all pushers for a specific `user_id`.

View File

@@ -23,6 +23,7 @@
import logging
import re
from synapse.api.errors import Codes, cs_error
from synapse.http.server import (
HttpServer,
respond_with_json,
@@ -235,7 +236,23 @@ class DownloadResource(RestServlet):
# Validate the server name, raising if invalid
parse_and_validate_server_name(server_name)
await self.auth.get_user_by_req(request, allow_guest=True)
requester = await self.auth.get_user_by_req(request, allow_guest=True)
is_admin = await self.auth.is_server_admin(requester)
bypass_quarantine = False
if parse_string(request, "admin_unsafely_bypass_quarantine") == "true":
if is_admin:
logger.info("Admin bypassing quarantine for media download")
bypass_quarantine = True
else:
respond_with_json(
request,
400,
cs_error(
"Must be a server admin to bypass quarantine",
code=Codes.UNKNOWN,
),
send_cors=True,
)
set_cors_headers(request)
set_corp_headers(request)
@@ -259,7 +276,11 @@ class DownloadResource(RestServlet):
if self._is_mine_server_name(server_name):
await self.media_repo.get_local_media(
request, media_id, file_name, max_timeout_ms
request,
media_id,
file_name,
max_timeout_ms,
bypass_quarantine=bypass_quarantine,
)
else:
ip_address = request.getClientAddress().host
@@ -271,6 +292,7 @@ class DownloadResource(RestServlet):
max_timeout_ms,
ip_address,
True,
bypass_quarantine=bypass_quarantine,
)

View File

@@ -19,9 +19,12 @@
#
#
import logging
from bisect import bisect
from http import HTTPStatus
from typing import TYPE_CHECKING
from unpaddedbase64 import decode_base64, encode_base64
from synapse.api.errors import Codes, SynapseError
from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_strings_from_args
@@ -35,10 +38,34 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
MUTUAL_ROOMS_BATCH_LIMIT = 100
def _parse_mutual_rooms_batch_token_args(args: dict[bytes, list[bytes]]) -> str | None:
from_batches = parse_strings_from_args(args, "from")
if not from_batches:
return None
if len(from_batches) > 1:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Duplicate from query parameter",
errcode=Codes.INVALID_PARAM,
)
if from_batches[0]:
try:
return decode_base64(from_batches[0]).decode("utf-8")
except Exception:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Malformed from token",
errcode=Codes.INVALID_PARAM,
)
return None
class UserMutualRoomsServlet(RestServlet):
"""
GET /uk.half-shot.msc2666/user/mutual_rooms?user_id={user_id} HTTP/1.1
GET /uk.half-shot.msc2666/user/mutual_rooms?user_id={user_id}&from={token} HTTP/1.1
"""
PATTERNS = client_patterns(
@@ -56,6 +83,7 @@ class UserMutualRoomsServlet(RestServlet):
args: dict[bytes, list[bytes]] = request.args # type: ignore
user_ids = parse_strings_from_args(args, "user_id", required=True)
from_batch = _parse_mutual_rooms_batch_token_args(args)
if len(user_ids) > 1:
raise SynapseError(
@@ -64,29 +92,52 @@ class UserMutualRoomsServlet(RestServlet):
errcode=Codes.INVALID_PARAM,
)
# We don't do batching, so a batch token is illegal by default
if b"batch_token" in args:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Unknown batch_token",
errcode=Codes.INVALID_PARAM,
)
user_id = user_ids[0]
requester = await self.auth.get_user_by_req(request)
if user_id == requester.user.to_string():
raise SynapseError(
HTTPStatus.UNPROCESSABLE_ENTITY,
HTTPStatus.BAD_REQUEST,
"You cannot request a list of shared rooms with yourself",
errcode=Codes.INVALID_PARAM,
errcode=Codes.UNKNOWN,
)
rooms = await self.store.get_mutual_rooms_between_users(
frozenset((requester.user.to_string(), user_id))
# Sort here instead of the database function, so that we don't expose
# clients to any unrelated changes to the sorting algorithm.
rooms = sorted(
await self.store.get_mutual_rooms_between_users(
frozenset((requester.user.to_string(), user_id))
)
)
return 200, {"joined": list(rooms)}
if from_batch:
# A from_batch token was provided, so cut off any rooms where the ID is
# lower than or equal to the token. This method doesn't care whether the
# provided token room still exists, nor whether it's even a real room ID.
#
# However, if rooms with a lower ID are added after the token was issued,
# they will not be included until the client makes a new request without a
# from token. This is considered acceptable, as clients generally won't
# persist these results for long periods.
rooms = rooms[bisect(rooms, from_batch) :]
if len(rooms) <= MUTUAL_ROOMS_BATCH_LIMIT:
# We've reached the end of the list, don't return a batch token
return 200, {"joined": rooms}
rooms = rooms[:MUTUAL_ROOMS_BATCH_LIMIT]
# We use urlsafe unpadded base64 encoding for the batch token in order to
# handle funny room IDs in old pre-v12 rooms properly. We also truncate it
# to stay within the 255-character limit of opaque tokens.
next_batch = encode_base64(rooms[-1].encode("utf-8"), urlsafe=True)[:255]
# Due to the truncation, it is technically possible to have conflicting next
# batches by creating hundreds of rooms with the same 191 character prefix
# in the room ID. In the event that some silly user does that, don't let
# them paginate further.
if next_batch == from_batch:
return 200, {"joined": rooms}
return 200, {"joined": list(rooms), "next_batch": next_batch}
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:

View File

@@ -34,6 +34,7 @@ from typing import (
Any,
Awaitable,
Callable,
Optional,
TypeVar,
cast,
)
@@ -320,7 +321,7 @@ class HomeServer(metaclass=abc.ABCMeta):
self,
hostname: str,
config: HomeServerConfig,
reactor: ISynapseReactor | None = None,
reactor: Optional[ISynapseReactor] = None,
):
"""
Args:
@@ -353,7 +354,7 @@ class HomeServer(metaclass=abc.ABCMeta):
self._module_web_resources_consumed = False
# This attribute is set by the free function `refresh_certificate`.
self.tls_server_context_factory: IOpenSSLContextFactory | None = None
self.tls_server_context_factory: Optional[IOpenSSLContextFactory] = None
self._is_shutdown = False
self._async_shutdown_handlers: list[ShutdownInfo] = []

View File

@@ -61,6 +61,7 @@ class LocalMedia:
url_cache: str | None
last_access_ts: int
quarantined_by: str | None
quarantined_ts: int | None
safe_from_quarantine: bool
user_id: str | None
authenticated: bool | None
@@ -78,6 +79,7 @@ class RemoteMedia:
created_ts: int
last_access_ts: int
quarantined_by: str | None
quarantined_ts: int | None
authenticated: bool | None
sha256: str | None
@@ -243,6 +245,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
"user_id",
"authenticated",
"sha256",
"quarantined_ts",
),
allow_none=True,
desc="get_local_media",
@@ -262,6 +265,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
user_id=row[8],
authenticated=row[9],
sha256=row[10],
quarantined_ts=row[11],
)
async def get_local_media_by_user_paginate(
@@ -319,7 +323,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
safe_from_quarantine,
user_id,
authenticated,
sha256
sha256,
quarantined_ts
FROM local_media_repository
WHERE user_id = ?
ORDER BY {order_by_column} {order}, media_id ASC
@@ -345,6 +350,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
user_id=row[9],
authenticated=row[10],
sha256=row[11],
quarantined_ts=row[12],
)
for row in txn
]
@@ -695,6 +701,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
"quarantined_by",
"authenticated",
"sha256",
"quarantined_ts",
),
allow_none=True,
desc="get_cached_remote_media",
@@ -713,6 +720,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
quarantined_by=row[6],
authenticated=row[7],
sha256=row[8],
quarantined_ts=row[9],
)
async def store_cached_remote_media(

View File

@@ -945,6 +945,50 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
max_lifetime=max_lifetime,
)
async def get_quarantined_media_mxcs(
self, index_start: int, index_limit: int, local: bool
) -> list[str]:
"""Retrieves all the quarantined media MXC URIs starting from the given position,
ordered from oldest quarantined timestamp, then alphabetically by media ID
(including origin).
Note that on established servers the "quarantined timestamp" may be zero due to
being introduced after the quarantine timestamp field was introduced.
Args:
index_start: The position to start from.
index_limit: The maximum number of results to return.
local: When true, only local media will be returned. When false, only remote media will be returned.
Returns:
The quarantined media as a list of media IDs.
"""
def _get_quarantined_media_mxcs_txn(
txn: LoggingTransaction,
) -> list[str]:
# We order by quarantined timestamp *and* media ID (including origin, when
# known) to ensure the ordering is stable for established servers.
if local:
sql = "SELECT '' as media_origin, media_id FROM local_media_repository WHERE quarantined_by IS NOT NULL ORDER BY quarantined_ts, media_id ASC LIMIT ? OFFSET ?"
else:
sql = "SELECT media_origin, media_id FROM remote_media_cache WHERE quarantined_by IS NOT NULL ORDER BY quarantined_ts, media_origin, media_id ASC LIMIT ? OFFSET ?"
txn.execute(sql, (index_limit, index_start))
mxcs = []
for media_origin, media_id in txn:
if local:
media_origin = self.hs.hostname
mxcs.append(f"mxc://{media_origin}/{media_id}")
return mxcs
return await self.db_pool.runInteraction(
"get_quarantined_media_mxcs",
_get_quarantined_media_mxcs_txn,
)
async def get_media_mxcs_in_room(self, room_id: str) -> tuple[list[str], list[str]]:
"""Retrieves all the local and remote media MXC URIs in a given room
@@ -952,7 +996,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
room_id
Returns:
The local and remote media as a lists of the media IDs.
The local and remote media as lists of the media IDs.
"""
def _get_media_mxcs_in_room_txn(
@@ -1147,6 +1191,10 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
The total number of media items quarantined
"""
total_media_quarantined = 0
now_ts: int | None = self.clock.time_msec()
if quarantined_by is None:
now_ts = None
# Effectively a legacy path, update any media that was explicitly named.
if media_ids:
@@ -1155,13 +1203,13 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
)
sql = f"""
UPDATE local_media_repository
SET quarantined_by = ?
SET quarantined_by = ?, quarantined_ts = ?
WHERE {sql_many_clause_sql}"""
if quarantined_by is not None:
sql += " AND safe_from_quarantine = FALSE"
txn.execute(sql, [quarantined_by] + sql_many_clause_args)
txn.execute(sql, [quarantined_by, now_ts] + sql_many_clause_args)
# Note that a rowcount of -1 can be used to indicate no rows were affected.
total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0
@@ -1172,13 +1220,13 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
)
sql = f"""
UPDATE local_media_repository
SET quarantined_by = ?
SET quarantined_by = ?, quarantined_ts = ?
WHERE {sql_many_clause_sql}"""
if quarantined_by is not None:
sql += " AND safe_from_quarantine = FALSE"
txn.execute(sql, [quarantined_by] + sql_many_clause_args)
txn.execute(sql, [quarantined_by, now_ts] + sql_many_clause_args)
total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0
return total_media_quarantined
@@ -1202,6 +1250,10 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
The total number of media items quarantined
"""
total_media_quarantined = 0
now_ts: int | None = self.clock.time_msec()
if quarantined_by is None:
now_ts = None
if media:
sql_in_list_clause, sql_args = make_tuple_in_list_sql_clause(
@@ -1211,10 +1263,10 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
)
sql = f"""
UPDATE remote_media_cache
SET quarantined_by = ?
SET quarantined_by = ?, quarantined_ts = ?
WHERE {sql_in_list_clause}"""
txn.execute(sql, [quarantined_by] + sql_args)
txn.execute(sql, [quarantined_by, now_ts] + sql_args)
total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0
total_media_quarantined = 0
@@ -1224,9 +1276,9 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
)
sql = f"""
UPDATE remote_media_cache
SET quarantined_by = ?
SET quarantined_by = ?, quarantined_ts = ?
WHERE {sql_many_clause_sql}"""
txn.execute(sql, [quarantined_by] + sql_many_clause_args)
txn.execute(sql, [quarantined_by, now_ts] + sql_many_clause_args)
total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0
return total_media_quarantined

View File

@@ -747,6 +747,27 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
return frozenset(room_ids)
async def get_memberships_for_user(self, user_id: str) -> dict[str, str]:
"""Returns a dict of room_id to membership state for a given user.
If a remote user only returns rooms this server is currently
participating in.
"""
rows = cast(
list[tuple[str, str]],
await self.db_pool.simple_select_list(
"current_state_events",
keyvalues={
"type": EventTypes.Member,
"state_key": user_id,
},
retcols=["room_id", "membership"],
desc="get_memberships_for_user",
),
)
return dict(rows)
@cached(max_entries=500000, iterable=True)
async def get_rooms_for_user(self, user_id: str) -> frozenset[str]:
"""Returns a set of room_ids the user is currently joined to.

View File

@@ -14,7 +14,7 @@
import logging
from typing import TYPE_CHECKING, Mapping, cast
from typing import TYPE_CHECKING, AbstractSet, Mapping, cast
import attr
@@ -26,13 +26,16 @@ from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
make_in_list_sql_clause,
)
from synapse.storage.engines import PostgresEngine
from synapse.types import MultiWriterStreamToken, RoomStreamToken
from synapse.types.handlers.sliding_sync import (
HaveSentRoom,
HaveSentRoomFlag,
MutablePerConnectionState,
PerConnectionState,
RoomLazyMembershipChanges,
RoomStatusMap,
RoomSyncConfig,
)
@@ -373,6 +376,13 @@ class SlidingSyncStore(SQLBaseStore):
value_values=values,
)
self._persist_sliding_sync_connection_lazy_members_txn(
txn,
connection_key,
connection_position,
per_connection_state.room_lazy_membership,
)
return connection_position
@cached(iterable=True, max_entries=100000)
@@ -446,6 +456,23 @@ class SlidingSyncStore(SQLBaseStore):
"""
txn.execute(sql, (connection_key, connection_position))
# Move any lazy membership entries for this connection position to have
# `NULL` connection position, indicating that it applies to all future
# positions on this connection. This is safe because we have deleted all
# other (potentially forked) connection positions, and so all future
# positions in this connection will be a continuation of the current
# position. Thus any lazy membership entries we have sent down will still
# be valid.
self.db_pool.simple_update_txn(
txn,
table="sliding_sync_connection_lazy_members",
keyvalues={
"connection_key": connection_key,
"connection_position": connection_position,
},
updatevalues={"connection_position": None},
)
# Fetch and create a mapping from required state ID to the actual
# required state for the connection.
rows = self.db_pool.simple_select_list_txn(
@@ -525,8 +552,153 @@ class SlidingSyncStore(SQLBaseStore):
receipts=RoomStatusMap(receipts),
account_data=RoomStatusMap(account_data),
room_configs=room_configs,
room_lazy_membership={},
)
async def get_sliding_sync_connection_lazy_members(
self,
connection_position: int,
room_id: str,
user_ids: AbstractSet[str],
) -> Mapping[str, int]:
"""Get which user IDs in the room we have previously sent lazy
membership for.
Args:
connection_position: The sliding sync connection position.
room_id: The room ID to get lazy members for.
user_ids: The user IDs to check whether we've previously sent
because of lazy membership.
Returns:
The mapping of user IDs to the last seen timestamp for those user
IDs. Only includes user IDs that we have previously sent lazy
membership for, and so may be a subset of the `user_ids` passed in.
"""
def get_sliding_sync_connection_lazy_members_txn(
txn: LoggingTransaction,
) -> Mapping[str, int]:
user_clause, user_args = make_in_list_sql_clause(
txn.database_engine, "user_id", user_ids
)
# Fetch all the lazy membership entries for the given connection,
# room and user IDs. We don't have the `connection_key` here, so we
# join against `sliding_sync_connection_positions` to get it.
#
# Beware that there are two `connection_position` columns in the
# query which are different, the one in
# `sliding_sync_connection_positions` is the one we match to get the
# connection_key, whereas the one in
# `sliding_sync_connection_lazy_members` is what we filter against
# (it may be null or the same as the one passed in).
#
# FIXME: We should pass in `connection_key` here to avoid the join.
# We don't do this currently as the caller doesn't have it handy.
sql = f"""
SELECT user_id, members.connection_position, last_seen_ts
FROM sliding_sync_connection_lazy_members AS members
INNER JOIN sliding_sync_connection_positions AS pos USING (connection_key)
WHERE pos.connection_position = ? AND room_id = ? AND {user_clause}
"""
txn.execute(sql, (connection_position, room_id, *user_args))
# Filter out any cache entries that only apply to forked connection
# positions. Entries with `NULL` `connection_position` apply to all
# positions on the connection.
return {
user_id: last_seen_ts
for user_id, db_connection_position, last_seen_ts in txn
if db_connection_position == connection_position
or db_connection_position is None
}
return await self.db_pool.runInteraction(
"get_sliding_sync_connection_lazy_members",
get_sliding_sync_connection_lazy_members_txn,
db_autocommit=True, # Avoid transaction for single read
)
def _persist_sliding_sync_connection_lazy_members_txn(
self,
txn: LoggingTransaction,
connection_key: int,
new_connection_position: int,
all_changes: dict[str, RoomLazyMembershipChanges],
) -> None:
"""Persist that we have sent lazy membership for the given user IDs."""
now = self.clock.time_msec()
# Figure out which cache entries to add or update.
#
# These are either a) new entries we've never sent before (i.e. with a
# None last_seen_ts), or b) where the `last_seen_ts` is old enough that
# we want to update it.
#
# We don't update the timestamp every time to avoid hammering the DB
# with writes, and we don't need the timestamp to be precise. It is used
# to evict old entries that haven't been used in a while.
to_update: list[tuple[str, str]] = []
for room_id, room_changes in all_changes.items():
user_ids_to_update = room_changes.get_returned_user_ids_to_update(
self.clock
)
to_update.extend((room_id, user_id) for user_id in user_ids_to_update)
if to_update:
# Upsert the new/updated entries.
#
# Ignore conflicts where the existing entry has a different
# connection position (i.e. from a forked connection position). This
# may mean that we lose some updates, but that's acceptable as this
# is a cache and its fine for it to *not* include rows. (Downstream
# this will cause us to maybe send a few extra lazy members down
# sync, but we're allowed to send extra members).
sql = """
INSERT INTO sliding_sync_connection_lazy_members
(connection_key, connection_position, room_id, user_id, last_seen_ts)
VALUES {value_placeholder}
ON CONFLICT (connection_key, room_id, user_id)
DO UPDATE SET last_seen_ts = EXCLUDED.last_seen_ts
WHERE sliding_sync_connection_lazy_members.connection_position IS NULL
OR sliding_sync_connection_lazy_members.connection_position = EXCLUDED.connection_position
"""
args = [
(connection_key, new_connection_position, room_id, user_id, now)
for room_id, user_id in to_update
]
if isinstance(self.database_engine, PostgresEngine):
sql = sql.format(value_placeholder="?")
txn.execute_values(sql, args, fetch=False)
else:
sql = sql.format(value_placeholder="(?, ?, ?, ?, ?)")
txn.execute_batch(sql, args)
# Remove any invalidated entries.
to_remove: list[tuple[str, str]] = []
for room_id, room_changes in all_changes.items():
for user_id in room_changes.invalidated_user_ids:
to_remove.append((room_id, user_id))
if to_remove:
# We don't try and match on connection position here: it's fine to
# remove it from all forks. This is a cache so it's fine to expire
# arbitrary entries, the worst that happens is we send a few extra
# lazy members down sync.
self.db_pool.simple_delete_many_batch_txn(
txn,
table="sliding_sync_connection_lazy_members",
keys=("connection_key", "room_id", "user_id"),
values=[
(connection_key, room_id, user_id) for room_id, user_id in to_remove
],
)
@wrap_as_background_process("delete_old_sliding_sync_connections")
async def delete_old_sliding_sync_connections(self) -> None:
"""Delete sliding sync connections that have not been used for a long time."""
@@ -564,6 +736,10 @@ class PerConnectionStateDB:
room_configs: Mapping[str, "RoomSyncConfig"]
room_lazy_membership: dict[str, RoomLazyMembershipChanges]
"""Lazy membership changes to persist alongside this state. Only used
when persisting."""
@staticmethod
async def from_state(
per_connection_state: "MutablePerConnectionState", store: "DataStore"
@@ -618,6 +794,7 @@ class PerConnectionStateDB:
receipts=RoomStatusMap(receipts),
account_data=RoomStatusMap(account_data),
room_configs=per_connection_state.room_configs.maps[0],
room_lazy_membership=per_connection_state.room_lazy_membership,
)
async def to_state(self, store: "DataStore") -> "PerConnectionState":

View File

@@ -0,0 +1,60 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 Element Creations Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- Tracks which member states have been sent to the client for lazy-loaded
-- members in sliding sync. This is a *cache* as it doesn't matter if we send
-- down members we've previously sent down, i.e. it's safe to delete any rows.
--
-- We could have tracked these as part of the
-- `sliding_sync_connection_required_state` table, but that would bloat that
-- table significantly as most rooms will have lazy-loaded members. We want to
-- keep that table small as we always pull out all rows for the connection for
-- every request, so storing lots of data there would be bad for performance. To
-- keep that table small we also deduplicate the requested state across
-- different rooms, which if we stored lazy members there would prevent.
--
-- We track a *rough* `last_seen_ts` for each user in each room which indicates
-- when we last would've sent their member state to the client. `last_seen_ts`
-- is used so that we can remove members which haven't been seen for a while to
-- save space. This is a *rough* timestamp as we don't want to update the
-- timestamp every time to avoid hammering the DB with writes, and we don't need
-- the timestamp to be precise (as it is used to evict old entries that haven't
-- been used in a while).
--
-- Care must be taken when handling "forked" positions, i.e. we have responded
-- to a request with a position and then get another different request using the
-- previous position as a base. We track this by including a
-- `connection_position` for newly inserted rows. When we advance the position
-- we set this to NULL for all rows which were present at that position, and
-- delete all other rows. When reading rows we can then filter out any rows
-- which have a non-NULL `connection_position` which is not the current
-- position.
--
-- I.e. `connection_position` is NULL for rows which are valid for *all*
-- positions on the connection, and is non-NULL for rows which are only valid
-- for a specific position.
--
-- When invalidating rows, we can just delete them. Technically this could
-- invalidate for a forked position, but this is acceptable as equivalent to a
-- cache eviction.
CREATE TABLE sliding_sync_connection_lazy_members (
connection_key BIGINT NOT NULL REFERENCES sliding_sync_connections(connection_key) ON DELETE CASCADE,
connection_position BIGINT REFERENCES sliding_sync_connection_positions(connection_position) ON DELETE CASCADE,
room_id TEXT NOT NULL,
user_id TEXT NOT NULL,
last_seen_ts BIGINT NOT NULL
);
CREATE UNIQUE INDEX sliding_sync_connection_lazy_members_idx ON sliding_sync_connection_lazy_members (connection_key, room_id, user_id);
CREATE INDEX sliding_sync_connection_lazy_members_pos_idx ON sliding_sync_connection_lazy_members (connection_key, connection_position) WHERE connection_position IS NOT NULL;

View File

@@ -0,0 +1,27 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 Element Creations, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- Add a timestamp for when the sliding sync connection position was last used,
-- only updated with a small granularity.
--
-- This should be NOT NULL, but we need to consider existing rows. In future we
-- may want to either backfill this or delete all rows with a NULL value (and
-- then make it NOT NULL).
ALTER TABLE local_media_repository ADD COLUMN quarantined_ts BIGINT;
ALTER TABLE remote_media_cache ADD COLUMN quarantined_ts BIGINT;
UPDATE local_media_repository SET quarantined_ts = 0 WHERE quarantined_by IS NOT NULL;
UPDATE remote_media_cache SET quarantined_ts = 0 WHERE quarantined_by IS NOT NULL;
-- Note: We *probably* should have an index on quarantined_ts, but we're going
-- to try to defer that to a future migration after seeing the performance impact.

View File

@@ -49,12 +49,21 @@ from synapse.types import (
UserID,
)
from synapse.types.rest.client import SlidingSyncBody
from synapse.util.clock import Clock
from synapse.util.duration import Duration
if TYPE_CHECKING:
from synapse.handlers.relations import BundledAggregations
logger = logging.getLogger(__name__)
# How often to update the last seen timestamp for lazy members.
#
# We don't update the timestamp every time to avoid hammering the DB with
# writes, and we don't need the timestamp to be precise (as it is used to evict
# old entries that haven't been used in a while).
LAZY_MEMBERS_UPDATE_INTERVAL = Duration(hours=1)
class SlidingSyncConfig(SlidingSyncBody):
"""
@@ -891,6 +900,69 @@ class PerConnectionState:
return len(self.rooms) + len(self.receipts) + len(self.room_configs)
@attr.s(auto_attribs=True)
class RoomLazyMembershipChanges:
"""Changes to lazily-loaded room memberships for a given room."""
returned_user_id_to_last_seen_ts_map: Mapping[str, int | None] = attr.Factory(dict)
"""Map from user ID to timestamp for users whose membership we have lazily
loaded in this room an request. The timestamp indicates the time we
previously needed the membership, or None if we sent it down for the first
time in this request.
We track a *rough* `last_seen_ts` for each user in each room which indicates
when we last would've sent their member state to the client. This is used so
that we can remove members which haven't been seen for a while to save
space.
Note: this will include users whose membership we would have sent down but
didn't due to us having previously sent them.
"""
invalidated_user_ids: AbstractSet[str] = attr.Factory(set)
"""Set of user IDs whose latest membership we have *not* sent down"""
def get_returned_user_ids_to_update(self, clock: Clock) -> StrCollection:
"""Get the user IDs whose last seen timestamp we need to update in the
database.
This is a subset of user IDs in `returned_user_id_to_last_seen_ts_map`,
whose timestamp is either None (first time we've sent them) or older
than `LAZY_MEMBERS_UPDATE_INTERVAL`.
We only update the timestamp in the database every so often to avoid
hammering the DB with writes. We don't need the timestamp to be precise,
as the timestamp is used to evict old entries that haven't been used in
a while.
"""
now_ms = clock.time_msec()
return [
user_id
for user_id, last_seen_ts in self.returned_user_id_to_last_seen_ts_map.items()
if last_seen_ts is None
or now_ms - last_seen_ts >= LAZY_MEMBERS_UPDATE_INTERVAL.as_millis()
]
def has_updates(self, clock: Clock) -> bool:
"""Check if there are any updates to the lazy membership changes.
Called to check if we need to persist changes to the lazy membership
state for the room. We want to avoid persisting the state if there are
no changes, to avoid unnecessary writes (and cache misses due to new
connection position).
"""
# We consider there to be updates if there are any invalidated user
# IDs...
if self.invalidated_user_ids:
return True
# ...or if any of the returned user IDs need their last seen timestamp
# updating in the database.
return bool(self.get_returned_user_ids_to_update(clock))
@attr.s(auto_attribs=True)
class MutablePerConnectionState(PerConnectionState):
"""A mutable version of `PerConnectionState`"""
@@ -903,12 +975,28 @@ class MutablePerConnectionState(PerConnectionState):
room_configs: typing.ChainMap[str, RoomSyncConfig]
def has_updates(self) -> bool:
# A map from room ID to the lazily-loaded memberships needed for the
# request in that room.
room_lazy_membership: dict[str, RoomLazyMembershipChanges] = attr.Factory(dict)
def has_updates(self, clock: Clock) -> bool:
"""Check if there are any updates to the per-connection state that need
persisting.
It is important that we don't spuriously do persistence, as that will
always generate a new connection position which will invalidate some of
the caches. It doesn't need to be perfect, but we should avoid always
generating new connection positions when doing lazy loading
"""
return (
bool(self.rooms.get_updates())
or bool(self.receipts.get_updates())
or bool(self.account_data.get_updates())
or bool(self.get_room_config_updates())
or any(
change.has_updates(clock)
for change in self.room_lazy_membership.values()
)
)
def get_room_config_updates(self) -> Mapping[str, RoomSyncConfig]:

View File

@@ -19,7 +19,7 @@
#
import queue
from typing import Any, BinaryIO, cast
from typing import Any, BinaryIO, Optional, Union, cast
from twisted.internet import threads
from twisted.internet.defer import Deferred
@@ -50,7 +50,7 @@ class BackgroundFileConsumer:
self._reactor: ISynapseReactor = reactor
# Producer we're registered with
self._producer: IPushProducer | IPullProducer | None = None
self._producer: Optional[Union[IPushProducer, IPullProducer]] = None
# True if PushProducer, false if PullProducer
self.streaming = False
@@ -72,7 +72,7 @@ class BackgroundFileConsumer:
self._write_exception: Exception | None = None
def registerProducer(
self, producer: IPushProducer | IPullProducer, streaming: bool
self, producer: Union[IPushProducer, IPullProducer], streaming: bool
) -> None:
"""Part of IConsumer interface

View File

@@ -18,7 +18,7 @@
#
#
import logging
from typing import AbstractSet, Mapping
from typing import AbstractSet
from unittest.mock import patch
import attr
@@ -38,13 +38,17 @@ from synapse.handlers.sliding_sync import (
RoomSyncConfig,
StateValues,
_required_state_changes,
_RequiredStateChangesReturn,
)
from synapse.rest import admin
from synapse.rest.client import knock, login, room
from synapse.server import HomeServer
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import JsonDict, StateMap, StreamToken, UserID, create_requester
from synapse.types.handlers.sliding_sync import PerConnectionState, SlidingSyncConfig
from synapse.types.handlers.sliding_sync import (
PerConnectionState,
SlidingSyncConfig,
)
from synapse.types.state import StateFilter
from synapse.util.clock import Clock
@@ -3827,12 +3831,11 @@ class RequiredStateChangesTestParameters:
previous_required_state_map: dict[str, set[str]]
request_required_state_map: dict[str, set[str]]
state_deltas: StateMap[str]
expected_with_state_deltas: tuple[
Mapping[str, AbstractSet[str]] | None, StateFilter
]
expected_without_state_deltas: tuple[
Mapping[str, AbstractSet[str]] | None, StateFilter
]
expected_with_state_deltas: _RequiredStateChangesReturn
expected_without_state_deltas: _RequiredStateChangesReturn
previously_returned_lazy_user_ids: AbstractSet[str] = frozenset()
request_lazy_load_user_ids: AbstractSet[str] = frozenset()
class RequiredStateChangesTestCase(unittest.TestCase):
@@ -3848,8 +3851,12 @@ class RequiredStateChangesTestCase(unittest.TestCase):
request_required_state_map={"type1": {"state_key"}},
state_deltas={("type1", "state_key"): "$event_id"},
# No changes
expected_with_state_deltas=(None, StateFilter.none()),
expected_without_state_deltas=(None, StateFilter.none()),
expected_with_state_deltas=_RequiredStateChangesReturn(
None, StateFilter.none()
),
expected_without_state_deltas=_RequiredStateChangesReturn(
None, StateFilter.none()
),
),
),
(
@@ -3862,14 +3869,14 @@ class RequiredStateChangesTestCase(unittest.TestCase):
"type2": {"state_key"},
},
state_deltas={("type2", "state_key"): "$event_id"},
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
# We've added a type so we should persist the changed required state
# config.
{"type1": {"state_key"}, "type2": {"state_key"}},
# We should see the new type added
StateFilter.from_types([("type2", "state_key")]),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
{"type1": {"state_key"}, "type2": {"state_key"}},
StateFilter.from_types([("type2", "state_key")]),
),
@@ -3885,7 +3892,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
"type2": {"state_key"},
},
state_deltas={("type2", "state_key"): "$event_id"},
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
# We've added a type so we should persist the changed required state
# config.
{"type1": {"state_key"}, "type2": {"state_key"}},
@@ -3894,7 +3901,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
[("type1", "state_key"), ("type2", "state_key")]
),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
{"type1": {"state_key"}, "type2": {"state_key"}},
StateFilter.from_types(
[("type1", "state_key"), ("type2", "state_key")]
@@ -3909,14 +3916,14 @@ class RequiredStateChangesTestCase(unittest.TestCase):
previous_required_state_map={"type": {"state_key1"}},
request_required_state_map={"type": {"state_key1", "state_key2"}},
state_deltas={("type", "state_key2"): "$event_id"},
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
# We've added a key so we should persist the changed required state
# config.
{"type": {"state_key1", "state_key2"}},
# We should see the new state_keys added
StateFilter.from_types([("type", "state_key2")]),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
{"type": {"state_key1", "state_key2"}},
StateFilter.from_types([("type", "state_key2")]),
),
@@ -3929,7 +3936,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
previous_required_state_map={"type": {"state_key1"}},
request_required_state_map={"type": {"state_key2", "state_key3"}},
state_deltas={("type", "state_key2"): "$event_id"},
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
# We've added a key so we should persist the changed required state
# config.
#
@@ -3940,7 +3947,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
[("type", "state_key2"), ("type", "state_key3")]
),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
{"type": {"state_key1", "state_key2", "state_key3"}},
StateFilter.from_types(
[("type", "state_key2"), ("type", "state_key3")]
@@ -3964,7 +3971,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
},
request_required_state_map={"type1": {"state_key"}},
state_deltas={("type2", "state_key"): "$event_id"},
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
# Remove `type2` since there's been a change to that state,
# (persist the change to required state). That way next time,
# they request `type2`, we see that we haven't sent it before
@@ -3975,7 +3982,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
# less state now
StateFilter.none(),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
# `type2` is no longer requested but since that state hasn't
# changed, nothing should change (we should still keep track
# that we've sent `type2` before).
@@ -3998,7 +4005,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
},
request_required_state_map={},
state_deltas={("type2", "state_key"): "$event_id"},
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
# Remove `type2` since there's been a change to that state,
# (persist the change to required state). That way next time,
# they request `type2`, we see that we haven't sent it before
@@ -4009,7 +4016,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
# less state now
StateFilter.none(),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
# `type2` is no longer requested but since that state hasn't
# changed, nothing should change (we should still keep track
# that we've sent `type2` before).
@@ -4029,7 +4036,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
previous_required_state_map={"type": {"state_key1", "state_key2"}},
request_required_state_map={"type": {"state_key1"}},
state_deltas={("type", "state_key2"): "$event_id"},
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
# Remove `(type, state_key2)` since there's been a change
# to that state (persist the change to required state).
# That way next time, they request `(type, state_key2)`, we see
@@ -4041,7 +4048,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
# less state now
StateFilter.none(),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
# `(type, state_key2)` is no longer requested but since that
# state hasn't changed, nothing should change (we should still
# keep track that we've sent `(type, state_key1)` and `(type,
@@ -4073,11 +4080,11 @@ class RequiredStateChangesTestCase(unittest.TestCase):
("other_type", "state_key"): "$event_id",
},
# We've added a wildcard, so we persist the change and request everything
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
{"type1": {"state_key2"}, StateValues.WILDCARD: {"state_key"}},
StateFilter.all(),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
{"type1": {"state_key2"}, StateValues.WILDCARD: {"state_key"}},
StateFilter.all(),
),
@@ -4103,13 +4110,13 @@ class RequiredStateChangesTestCase(unittest.TestCase):
("other_type", "state_key"): "$event_id",
},
# We've removed a type wildcard, so we persist the change but don't request anything
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
{"type1": {"state_key2"}},
# We don't need to request anything more if they are requesting
# less state now
StateFilter.none(),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
{"type1": {"state_key2"}},
# We don't need to request anything more if they are requesting
# less state now
@@ -4129,11 +4136,11 @@ class RequiredStateChangesTestCase(unittest.TestCase):
state_deltas={("type2", "state_key"): "$event_id"},
# We've added a wildcard state_key, so we persist the change and
# request all of the state for that type
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
{"type1": {"state_key"}, "type2": {StateValues.WILDCARD}},
StateFilter.from_types([("type2", None)]),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
{"type1": {"state_key"}, "type2": {StateValues.WILDCARD}},
StateFilter.from_types([("type2", None)]),
),
@@ -4151,7 +4158,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
state_deltas={("type2", "state_key"): "$event_id"},
# We've removed a state_key wildcard, so we persist the change and
# request nothing
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
{"type1": {"state_key"}},
# We don't need to request anything more if they are requesting
# less state now
@@ -4160,7 +4167,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
# We've removed a state_key wildcard but there have been no matching
# state changes, so no changes needed, just persist the
# `request_required_state_map` as-is.
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
None,
# We don't need to request anything more if they are requesting
# less state now
@@ -4180,7 +4187,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
},
request_required_state_map={"type1": {"state_key1"}},
state_deltas={("type1", "state_key3"): "$event_id"},
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
# We've removed some state keys from the type, but only state_key3 was
# changed so only that one should be removed.
{"type1": {"state_key1", "state_key2"}},
@@ -4188,7 +4195,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
# less state now
StateFilter.none(),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
# No changes needed, just persist the
# `request_required_state_map` as-is
None,
@@ -4207,14 +4214,14 @@ class RequiredStateChangesTestCase(unittest.TestCase):
previous_required_state_map={},
request_required_state_map={"type1": {StateValues.ME}},
state_deltas={("type1", "@user:test"): "$event_id"},
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
# We've added a type so we should persist the changed required state
# config.
{"type1": {StateValues.ME}},
# We should see the new state_keys added
StateFilter.from_types([("type1", "@user:test")]),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
{"type1": {StateValues.ME}},
StateFilter.from_types([("type1", "@user:test")]),
),
@@ -4229,7 +4236,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
previous_required_state_map={"type1": {StateValues.ME}},
request_required_state_map={},
state_deltas={("type1", "@user:test"): "$event_id"},
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
# Remove `type1` since there's been a change to that state,
# (persist the change to required state). That way next time,
# they request `type1`, we see that we haven't sent it before
@@ -4240,7 +4247,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
# less state now
StateFilter.none(),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
# `type1` is no longer requested but since that state hasn't
# changed, nothing should change (we should still keep track
# that we've sent `type1` before).
@@ -4260,14 +4267,14 @@ class RequiredStateChangesTestCase(unittest.TestCase):
previous_required_state_map={},
request_required_state_map={"type1": {"@user:test"}},
state_deltas={("type1", "@user:test"): "$event_id"},
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
# We've added a type so we should persist the changed required state
# config.
{"type1": {"@user:test"}},
# We should see the new state_keys added
StateFilter.from_types([("type1", "@user:test")]),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
{"type1": {"@user:test"}},
StateFilter.from_types([("type1", "@user:test")]),
),
@@ -4282,7 +4289,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
previous_required_state_map={"type1": {"@user:test"}},
request_required_state_map={},
state_deltas={("type1", "@user:test"): "$event_id"},
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
# Remove `type1` since there's been a change to that state,
# (persist the change to required state). That way next time,
# they request `type1`, we see that we haven't sent it before
@@ -4293,7 +4300,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
# less state now
StateFilter.none(),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
# `type1` is no longer requested but since that state hasn't
# changed, nothing should change (we should still keep track
# that we've sent `type1` before).
@@ -4313,13 +4320,13 @@ class RequiredStateChangesTestCase(unittest.TestCase):
previous_required_state_map={},
request_required_state_map={EventTypes.Member: {StateValues.LAZY}},
state_deltas={(EventTypes.Member, "@user:test"): "$event_id"},
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
# If a "$LAZY" has been added or removed we always update the
# required state to what was requested for simplicity.
{EventTypes.Member: {StateValues.LAZY}},
StateFilter.none(),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
{EventTypes.Member: {StateValues.LAZY}},
StateFilter.none(),
),
@@ -4334,7 +4341,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
previous_required_state_map={EventTypes.Member: {StateValues.LAZY}},
request_required_state_map={},
state_deltas={(EventTypes.Member, "@user:test"): "$event_id"},
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
# If a "$LAZY" has been added or removed we always update the
# required state to what was requested for simplicity.
{},
@@ -4342,7 +4349,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
# less state now
StateFilter.none(),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
# `EventTypes.Member` is no longer requested but since that
# state hasn't changed, nothing should change (we should still
# keep track that we've sent `EventTypes.Member` before).
@@ -4361,41 +4368,40 @@ class RequiredStateChangesTestCase(unittest.TestCase):
we're sending down another response without any timeline events.
""",
RequiredStateChangesTestParameters(
previous_required_state_map={
EventTypes.Member: {
StateValues.LAZY,
"@user2:test",
"@user3:test",
}
},
previous_required_state_map={EventTypes.Member: {StateValues.LAZY}},
request_required_state_map={EventTypes.Member: {StateValues.LAZY}},
previously_returned_lazy_user_ids={"@user2:test", "@user3:test"},
request_lazy_load_user_ids=set(),
state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"},
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
# The `request_required_state_map` hasn't changed
None,
# We don't need to request anything more if they are requesting
# less state now
StateFilter.none(),
# Previous request did not include any explicit members,
# so there is no extra users to add to the lazy cache.
extra_users_to_add_to_lazy_cache=frozenset(),
# Remove "@user2:test" since that state has changed and is no
# longer being requested anymore. Since something was removed,
# we should persist the changed to required state. That way next
# time, they request "@user2:test", we see that we haven't sent
# it before and send the new state. (we should still keep track
# that we've sent specific `EventTypes.Member` before)
{
EventTypes.Member: {
StateValues.LAZY,
"@user3:test",
}
},
# We don't need to request anything more if they are requesting
# less state now
StateFilter.none(),
lazy_members_invalidated={"@user2:test"},
),
expected_without_state_deltas=(
# We're not requesting any specific `EventTypes.Member` now but
# since that state hasn't changed, nothing should change (we
# should still keep track that we've sent specific
# `EventTypes.Member` before).
expected_without_state_deltas=_RequiredStateChangesReturn(
# The `request_required_state_map` hasn't changed
None,
# We don't need to request anything more if they are requesting
# less state now
StateFilter.none(),
# Previous request did not include any explicit members,
# so there is no extra users to add to the lazy cache.
extra_users_to_add_to_lazy_cache=frozenset(),
# Nothing should change (we should still keep track that
# we've sent specific `EventTypes.Member` before).
lazy_members_invalidated=frozenset(),
),
),
),
@@ -4407,50 +4413,37 @@ class RequiredStateChangesTestCase(unittest.TestCase):
we're sending down another response with a new event from user4.
""",
RequiredStateChangesTestParameters(
previous_required_state_map={
EventTypes.Member: {
StateValues.LAZY,
"@user2:test",
"@user3:test",
}
},
request_required_state_map={
EventTypes.Member: {StateValues.LAZY, "@user4:test"}
},
previous_required_state_map={EventTypes.Member: {StateValues.LAZY}},
request_required_state_map={EventTypes.Member: {StateValues.LAZY}},
previously_returned_lazy_user_ids={"@user2:test", "@user3:test"},
request_lazy_load_user_ids={"@user4:test"},
state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"},
expected_with_state_deltas=(
# Since "@user4:test" was added, we should persist the changed
# required state config.
#
# Also remove "@user2:test" since that state has changed and is no
# longer being requested anymore. Since something was removed,
# we also should persist the changed to required state. That way next
# time, they request "@user2:test", we see that we haven't sent
# it before and send the new state. (we should still keep track
expected_with_state_deltas=_RequiredStateChangesReturn(
# The `request_required_state_map` hasn't changed
None,
# We should see the new state_keys added
StateFilter.from_types([(EventTypes.Member, "@user4:test")]),
# Previous request did not include any explicit members,
# so there is no extra users to add to the lazy cache.
extra_users_to_add_to_lazy_cache=frozenset(),
# Remove "@user2:test" since that state has changed and
# is no longer being requested anymore. Since something
# was removed, we also should persist the changed to
# required state. That way next time, they request
# "@user2:test", we see that we haven't sent it before
# and send the new state. (we should still keep track
# that we've sent specific `EventTypes.Member` before)
{
EventTypes.Member: {
StateValues.LAZY,
"@user3:test",
"@user4:test",
}
},
# We should see the new state_keys added
StateFilter.from_types([(EventTypes.Member, "@user4:test")]),
lazy_members_invalidated={"@user2:test"},
),
expected_without_state_deltas=(
# Since "@user4:test" was added, we should persist the changed
# required state config.
{
EventTypes.Member: {
StateValues.LAZY,
"@user2:test",
"@user3:test",
"@user4:test",
}
},
expected_without_state_deltas=_RequiredStateChangesReturn(
# The `request_required_state_map` hasn't changed
None,
# We should see the new state_keys added
StateFilter.from_types([(EventTypes.Member, "@user4:test")]),
# Previous request did not include any explicit members,
# so there is no extra users to add to the lazy cache.
extra_users_to_add_to_lazy_cache=frozenset(),
lazy_members_invalidated=frozenset(),
),
),
),
@@ -4464,40 +4457,81 @@ class RequiredStateChangesTestCase(unittest.TestCase):
EventTypes.Member: {"@user2:test", "@user3:test"}
},
request_required_state_map={EventTypes.Member: {StateValues.LAZY}},
previously_returned_lazy_user_ids=frozenset(),
request_lazy_load_user_ids=frozenset(),
state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"},
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
# Since `StateValues.LAZY` was added, we should persist the
# changed required state config.
#
# Also remove "@user2:test" since that state has changed and is no
# longer being requested anymore. Since something was removed,
# we also should persist the changed to required state. That way next
# time, they request "@user2:test", we see that we haven't sent
# it before and send the new state. (we should still keep track
# that we've sent specific `EventTypes.Member` before)
{
EventTypes.Member: {
StateValues.LAZY,
"@user3:test",
}
},
# We don't need to request anything more if they are requesting
# less state now
{EventTypes.Member: {StateValues.LAZY}},
# No users are being lazy loaded, so nothing to request.
StateFilter.none(),
# Remember the fact that we've sent @user3 down before,
# but not @user2 as that has been invalidated.
extra_users_to_add_to_lazy_cache={"@user3:test"},
# Nothing to invalidate as there are no existing lazy members.
lazy_members_invalidated=frozenset(),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
# Since `StateValues.LAZY` was added, we should persist the
# changed required state config.
{
EventTypes.Member: {
StateValues.LAZY,
"@user2:test",
"@user3:test",
}
},
# We don't need to request anything more if they are requesting
# less state now
{EventTypes.Member: {StateValues.LAZY}},
# No users are being lazy loaded, so nothing to request.
StateFilter.none(),
# Remember the fact that we've sent the users down before.
extra_users_to_add_to_lazy_cache={"@user2:test", "@user3:test"},
# Nothing to invalidate as there are no existing lazy members.
lazy_members_invalidated=frozenset(),
),
),
),
(
"state_key_expand_lazy_keep_previous_memberships_need_previous_sent",
"""
Test expanding the `required_state` to lazy-loading room
members. If a previously explicit membership is requested then
we should not send it again (as it was already sent before).
""",
RequiredStateChangesTestParameters(
previous_required_state_map={
EventTypes.Member: {"@user2:test", "@user3:test"}
},
request_required_state_map={EventTypes.Member: {StateValues.LAZY}},
previously_returned_lazy_user_ids=frozenset(),
request_lazy_load_user_ids={"@user3:test"},
state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"},
expected_with_state_deltas=_RequiredStateChangesReturn(
# Since `StateValues.LAZY` was added, we should persist the
# changed required state config.
{EventTypes.Member: {StateValues.LAZY}},
# We have already sent @user3 down before.
#
# `@user3:test` is required for lazy loading, but we've
# already sent it down before (due to it being in
# `previous_required_state_map`), so we don't need to
# request it again.
StateFilter.none(),
# Remember the fact that we've sent @user3 down before,
# but not @user2 as that has been invalidated.
extra_users_to_add_to_lazy_cache={"@user3:test"},
# Nothing to invalidate as there are no existing lazy members.
lazy_members_invalidated=frozenset(),
),
expected_without_state_deltas=_RequiredStateChangesReturn(
# Since `StateValues.LAZY` was added, we should persist the
# changed required state config.
{EventTypes.Member: {StateValues.LAZY}},
# We have already sent @user3 down before.
#
# `@user3:test` is required for lazy loading, but we've
# already sent it down before (due to it being in
# `previous_required_state_map`), so we don't need to
# request it again.
StateFilter.none(),
# Remember the fact that we've sent the users down before.
extra_users_to_add_to_lazy_cache={"@user2:test", "@user3:test"},
# Nothing to invalidate as there are no existing lazy members.
lazy_members_invalidated=frozenset(),
),
),
),
@@ -4507,36 +4541,33 @@ class RequiredStateChangesTestCase(unittest.TestCase):
Test retracting the `required_state` to no longer lazy-loading room members.
""",
RequiredStateChangesTestParameters(
previous_required_state_map={
EventTypes.Member: {
StateValues.LAZY,
"@user2:test",
"@user3:test",
}
},
previous_required_state_map={EventTypes.Member: {StateValues.LAZY}},
request_required_state_map={},
previously_returned_lazy_user_ids={"@user2:test", "@user3:test"},
request_lazy_load_user_ids=set(),
state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"},
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
# Remove `EventTypes.Member` since there's been a change to that
# state, (persist the change to required state). That way next
# time, they request `EventTypes.Member`, we see that we haven't
# sent it before and send the new state. (if we were tracking
# that we sent any other state, we should still keep track
# that).
#
# This acts the same as the `simple_remove_type` test. It's
# possible that we could remember the specific `state_keys` that
# we have sent down before but this currently just acts the same
# as if a whole `type` was removed. Perhaps it's good that we
# "garbage collect" and forget what we've sent before for a
# given `type` when the client stops caring about a certain
# `type`.
# state, (persist the change to required state).
{},
# We don't need to request anything more if they are requesting
# less state now
StateFilter.none(),
# Previous request did not include any explicit members,
# so there is no extra users to add to the lazy cache.
extra_users_to_add_to_lazy_cache=frozenset(),
# Explicitly remove the now invalidated @user2:test
# membership.
#
# We don't invalidate @user3:test as that membership
# hasn't changed. We continue to store the existing lazy
# members since they might be useful for future
# requests. (Alternatively, we could invalidate all
# members in the room when the client stops lazy
# loading, but we opt to keep track of them).
lazy_members_invalidated={"@user2:test"},
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
# `EventTypes.Member` is no longer requested but since that
# state hasn't changed, nothing should change (we should still
# keep track that we've sent `EventTypes.Member` before).
@@ -4544,13 +4575,20 @@ class RequiredStateChangesTestCase(unittest.TestCase):
# We don't need to request anything more if they are requesting
# less state now
StateFilter.none(),
# Previous request did not include any explicit members,
# so there is no extra users to add to the lazy cache.
extra_users_to_add_to_lazy_cache=frozenset(),
# Nothing has been invalidated.
lazy_members_invalidated=frozenset(),
),
),
),
(
"state_key_retract_lazy_keep_previous_memberships_with_new_memberships",
"state_key_retract_lazy_keep_previous_explicit_memberships",
"""
Test retracting the `required_state` to no longer lazy-loading room members.
Test removing explicit memberships from the `required_state`
when lazy-loading room members tracks previously sent
memberships.
""",
RequiredStateChangesTestParameters(
previous_required_state_map={
@@ -4560,39 +4598,144 @@ class RequiredStateChangesTestCase(unittest.TestCase):
"@user3:test",
}
},
request_required_state_map={EventTypes.Member: {"@user4:test"}},
request_required_state_map={EventTypes.Member: {StateValues.LAZY}},
previously_returned_lazy_user_ids=frozenset(),
request_lazy_load_user_ids={"@user3:test"},
state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"},
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
# Since an explicit membership was removed, we record
# the new required state config and move them to lazy
# members.
{EventTypes.Member: {StateValues.LAZY}},
# We have already sent @user3 down before.
#
# `@user3:test` is required for lazy loading, but we've
# already sent it down before (due to it being in
# `previous_required_state_map`), so we don't need to
# request it again.
StateFilter.none(),
# Remember the fact that we've sent @user3 down before,
# but not @user2 as that has been invalidated.
extra_users_to_add_to_lazy_cache={"@user3:test"},
# Nothing to invalidate as there are no existing lazy members.
lazy_members_invalidated=frozenset(),
),
expected_without_state_deltas=_RequiredStateChangesReturn(
# While some explicit memberships were removed, there were no
# state changes, so we don't need to persist the new required
# state config yet.
None,
# We have already sent @user3 down before.
#
# `@user3:test` is required for lazy loading, but we've
# already sent it down before (due to it being in
# `previous_required_state_map`), so we don't need to
# request it again.
StateFilter.none(),
# Remember the fact that we've sent the users down before.
extra_users_to_add_to_lazy_cache=frozenset(),
# Nothing to invalidate as there are no existing lazy members.
lazy_members_invalidated=frozenset(),
),
),
),
(
"state_key_retract_lazy_keep_previous_explicit_me_memberships",
"""
Test removing explicit $ME memberships from the `required_state`
when lazy-loading room members tracks previously sent
memberships.
""",
RequiredStateChangesTestParameters(
previous_required_state_map={
EventTypes.Member: {
StateValues.LAZY,
StateValues.ME,
"@user2:test",
}
},
request_required_state_map={EventTypes.Member: {StateValues.LAZY}},
previously_returned_lazy_user_ids=frozenset(),
request_lazy_load_user_ids={"@user:test"},
state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"},
expected_with_state_deltas=_RequiredStateChangesReturn(
# Since an explicit membership was removed, we record
# the new required state config and move them to lazy
# members.
{EventTypes.Member: {StateValues.LAZY}},
# We have already sent @user down before.
#
# `@user:test` is required for lazy loading, but we've
# already sent it down before (due to `StateValues.ME`
# being in `previous_required_state_map`), so we don't
# need to request it again.
StateFilter.none(),
# Remember the fact that we've sent @user down before,
# but not @user2 as that has been invalidated.
extra_users_to_add_to_lazy_cache={"@user:test"},
# Nothing to invalidate as there are no existing lazy members.
lazy_members_invalidated=frozenset(),
),
expected_without_state_deltas=_RequiredStateChangesReturn(
# While some explicit memberships were removed, there were no
# state changes, so we don't need to persist the new required
# state config yet.
None,
# We have already sent @user down before.
#
# `@user:test` is required for lazy loading, but we've
# already sent it down before (due to `StateValues.ME`
# being in `previous_required_state_map`), so we don't
# need to request it again.
StateFilter.none(),
# No relevant state has changed and we don't persist the
# changed required_state_map, so we don't yet move the
# $ME state to the lazy cache.
extra_users_to_add_to_lazy_cache=frozenset(),
# Nothing to invalidate as there are no existing lazy members.
lazy_members_invalidated=frozenset(),
),
),
),
(
"state_key_retract_lazy_keep_previous_memberships_with_new_memberships",
"""
Test retracting the `required_state` to no longer lazy-loading room members.
""",
RequiredStateChangesTestParameters(
previous_required_state_map={EventTypes.Member: {StateValues.LAZY}},
request_required_state_map={EventTypes.Member: {"@user4:test"}},
previously_returned_lazy_user_ids={"@user2:test", "@user3:test"},
request_lazy_load_user_ids=frozenset(),
state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"},
expected_with_state_deltas=_RequiredStateChangesReturn(
# Since "@user4:test" was added, we should persist the changed
# required state config.
#
{EventTypes.Member: {"@user4:test"}},
# We should see the new state_keys added
StateFilter.from_types([(EventTypes.Member, "@user4:test")]),
# Previous request did not include any explicit members,
# so there is no extra users to add to the lazy cache.
extra_users_to_add_to_lazy_cache=frozenset(),
# Also remove "@user2:test" since that state has changed and is no
# longer being requested anymore. Since something was removed,
# we also should persist the changed to required state. That way next
# time, they request "@user2:test", we see that we haven't sent
# it before and send the new state. (we should still keep track
# that we've sent specific `EventTypes.Member` before)
{
EventTypes.Member: {
"@user3:test",
"@user4:test",
}
},
# We should see the new state_keys added
StateFilter.from_types([(EventTypes.Member, "@user4:test")]),
lazy_members_invalidated={"@user2:test"},
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
# Since "@user4:test" was added, we should persist the changed
# required state config.
{
EventTypes.Member: {
"@user2:test",
"@user3:test",
"@user4:test",
}
},
{EventTypes.Member: {"@user4:test"}},
# We should see the new state_keys added
StateFilter.from_types([(EventTypes.Member, "@user4:test")]),
# Previous request did not include any explicit members,
# so there is no extra users to add to the lazy cache.
extra_users_to_add_to_lazy_cache=frozenset(),
# We don't invalidate user2 as they haven't changed
lazy_members_invalidated=frozenset(),
),
),
),
@@ -4613,7 +4756,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
# room required state config to match the request. And since we we're previously
# already fetching everything, we don't have to fetch anything now that they've
# narrowed.
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
{
StateValues.WILDCARD: {
"state_key1",
@@ -4623,7 +4766,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
},
StateFilter.none(),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
{
StateValues.WILDCARD: {
"state_key1",
@@ -4649,11 +4792,11 @@ class RequiredStateChangesTestCase(unittest.TestCase):
},
state_deltas={("type1", "state_key1"): "$event_id"},
# We've added a wildcard, so we persist the change and request everything
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
{StateValues.WILDCARD: {StateValues.WILDCARD}},
StateFilter.all(),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
{StateValues.WILDCARD: {StateValues.WILDCARD}},
StateFilter.all(),
),
@@ -4673,7 +4816,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
# request. And since we we're previously already fetching
# everything, we don't have to fetch anything now that they've
# narrowed.
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
{
"type1": {
"state_key1",
@@ -4683,7 +4826,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
},
StateFilter.none(),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
{
"type1": {
"state_key1",
@@ -4708,11 +4851,11 @@ class RequiredStateChangesTestCase(unittest.TestCase):
# update the effective room required state config to match the
# request. And we need to request all of the state for that type
# because we previously, only sent down a few keys.
expected_with_state_deltas=(
expected_with_state_deltas=_RequiredStateChangesReturn(
{"type1": {StateValues.WILDCARD, "state_key2", "state_key3"}},
StateFilter.from_types([("type1", None)]),
),
expected_without_state_deltas=(
expected_without_state_deltas=_RequiredStateChangesReturn(
{
"type1": {
StateValues.WILDCARD,
@@ -4734,42 +4877,66 @@ class RequiredStateChangesTestCase(unittest.TestCase):
test_parameters: RequiredStateChangesTestParameters,
) -> None:
# Without `state_deltas`
changed_required_state_map, added_state_filter = _required_state_changes(
state_changes = _required_state_changes(
user_id="@user:test",
prev_required_state_map=test_parameters.previous_required_state_map,
request_required_state_map=test_parameters.request_required_state_map,
previously_returned_lazy_user_ids=test_parameters.previously_returned_lazy_user_ids,
request_lazy_load_user_ids=test_parameters.request_lazy_load_user_ids,
state_deltas={},
)
self.assertEqual(
changed_required_state_map,
test_parameters.expected_without_state_deltas[0],
state_changes.changed_required_state_map,
test_parameters.expected_without_state_deltas.changed_required_state_map,
"changed_required_state_map does not match (without state_deltas)",
)
self.assertEqual(
added_state_filter,
test_parameters.expected_without_state_deltas[1],
state_changes.added_state_filter,
test_parameters.expected_without_state_deltas.added_state_filter,
"added_state_filter does not match (without state_deltas)",
)
self.assertEqual(
state_changes.lazy_members_invalidated,
test_parameters.expected_without_state_deltas.lazy_members_invalidated,
"lazy_members_invalidated does not match (without state_deltas)",
)
self.assertEqual(
state_changes.extra_users_to_add_to_lazy_cache,
test_parameters.expected_without_state_deltas.extra_users_to_add_to_lazy_cache,
"lazy_members_previously_returned does not match (without state_deltas)",
)
# With `state_deltas`
changed_required_state_map, added_state_filter = _required_state_changes(
state_changes = _required_state_changes(
user_id="@user:test",
prev_required_state_map=test_parameters.previous_required_state_map,
request_required_state_map=test_parameters.request_required_state_map,
previously_returned_lazy_user_ids=test_parameters.previously_returned_lazy_user_ids,
request_lazy_load_user_ids=test_parameters.request_lazy_load_user_ids,
state_deltas=test_parameters.state_deltas,
)
self.assertEqual(
changed_required_state_map,
test_parameters.expected_with_state_deltas[0],
state_changes.changed_required_state_map,
test_parameters.expected_with_state_deltas.changed_required_state_map,
"changed_required_state_map does not match (with state_deltas)",
)
self.assertEqual(
added_state_filter,
test_parameters.expected_with_state_deltas[1],
state_changes.added_state_filter,
test_parameters.expected_with_state_deltas.added_state_filter,
"added_state_filter does not match (with state_deltas)",
)
self.assertEqual(
state_changes.lazy_members_invalidated,
test_parameters.expected_with_state_deltas.lazy_members_invalidated,
"lazy_members_invalidated does not match (with state_deltas)",
)
self.assertEqual(
state_changes.extra_users_to_add_to_lazy_cache,
test_parameters.expected_with_state_deltas.extra_users_to_add_to_lazy_cache,
"lazy_members_previously_returned does not match (with state_deltas)",
)
@parameterized.expand(
[
@@ -4805,12 +4972,16 @@ class RequiredStateChangesTestCase(unittest.TestCase):
}
# (function under test)
changed_required_state_map, added_state_filter = _required_state_changes(
state_changes = _required_state_changes(
user_id="@user:test",
prev_required_state_map=previous_required_state_map,
request_required_state_map=request_required_state_map,
previously_returned_lazy_user_ids=frozenset(),
request_lazy_load_user_ids=frozenset(),
state_deltas={},
)
changed_required_state_map = state_changes.changed_required_state_map
assert changed_required_state_map is not None
# We should only remember up to the maximum number of state keys
@@ -4874,12 +5045,16 @@ class RequiredStateChangesTestCase(unittest.TestCase):
)
# (function under test)
changed_required_state_map, added_state_filter = _required_state_changes(
state_changes = _required_state_changes(
user_id="@user:test",
prev_required_state_map=previous_required_state_map,
request_required_state_map=request_required_state_map,
previously_returned_lazy_user_ids=frozenset(),
request_lazy_load_user_ids=frozenset(),
state_deltas={},
)
changed_required_state_map = state_changes.changed_required_state_map
assert changed_required_state_map is not None
# Should include all of the requested state

View File

@@ -71,14 +71,43 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
return resources
def _ensure_quarantined(
self, admin_user_tok: str, server_and_media_id: str
self,
user_tok: str,
server_and_media_id: str,
include_bypass_param: bool = False,
) -> None:
"""Ensure a piece of media is quarantined when trying to access it."""
"""Ensure a piece of media is quarantined when trying to access it.
The include_bypass_param flag enables the presence of the
admin_unsafely_bypass_quarantine query parameter, but still expects that the
request will fail to download the media.
"""
if include_bypass_param:
query_string = "?admin_unsafely_bypass_quarantine=true"
channel = self.make_request(
"GET",
f"/_matrix/client/v1/media/download/{server_and_media_id}{query_string}",
shorthand=False,
access_token=user_tok,
)
# Non-admins can't bypass, so this should fail regardless of whether the
# media is actually quarantined.
self.assertEqual(
400,
channel.code,
msg=(
"Expected to receive a 400 when bypassing quarantined media: %s"
% server_and_media_id
),
)
# Repeat the request, this time without the bypass parameter.
channel = self.make_request(
"GET",
f"/_matrix/client/v1/media/download/{server_and_media_id}",
shorthand=False,
access_token=admin_user_tok,
access_token=user_tok,
)
# Should be quarantined
@@ -91,6 +120,62 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
),
)
def test_admin_can_bypass_quarantine(self) -> None:
self.register_user("admin", "pass", admin=True)
admin_user_tok = self.login("admin", "pass")
# Upload some media
response = self.helper.upload_media(SMALL_PNG, tok=admin_user_tok)
# Extract media ID from the response
server_name_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
server_name, media_id = server_name_and_media_id.split("/")
# Attempt to access the media
channel = self.make_request(
"GET",
f"/_matrix/client/v1/media/download/{server_name_and_media_id}",
shorthand=False,
access_token=admin_user_tok,
)
# Should be successful
self.assertEqual(200, channel.code)
# Quarantine the media
url = "/_synapse/admin/v1/media/quarantine/%s/%s" % (
urllib.parse.quote(server_name),
urllib.parse.quote(media_id),
)
channel = self.make_request(
"POST",
url,
access_token=admin_user_tok,
)
self.pump(1.0)
self.assertEqual(200, channel.code, msg=channel.json_body)
# Now access it *without* the bypass parameter - this should fail (as expected).
self._ensure_quarantined(
admin_user_tok, server_name_and_media_id, include_bypass_param=False
)
# Now access it *with* the bypass parameter - this should work
channel = self.make_request(
"GET",
f"/_matrix/client/v1/media/download/{server_name_and_media_id}?admin_unsafely_bypass_quarantine=true",
shorthand=False,
access_token=admin_user_tok,
)
self.assertEqual(
200,
channel.code,
msg=(
"Expected to receive a 200 on accessing (with bypass) quarantined media: %s"
% server_name_and_media_id
),
)
@parameterized.expand(
[
# Attempt quarantine media APIs as non-admin
@@ -154,8 +239,14 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
self.pump(1.0)
self.assertEqual(200, channel.code, msg=channel.json_body)
# Attempt to access the media
self._ensure_quarantined(admin_user_tok, server_name_and_media_id)
# Attempt to access the media (and ensure non-admins can't download it, even
# with a bypass parameter). Admins cannot download it without the bypass param.
self._ensure_quarantined(
non_admin_user_tok, server_name_and_media_id, include_bypass_param=True
)
self._ensure_quarantined(
admin_user_tok, server_name_and_media_id, include_bypass_param=False
)
@parameterized.expand(
[
@@ -214,9 +305,21 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
server_and_media_id_1 = mxc_1[6:]
server_and_media_id_2 = mxc_2[6:]
# Test that we cannot download any of the media anymore
self._ensure_quarantined(admin_user_tok, server_and_media_id_1)
self._ensure_quarantined(admin_user_tok, server_and_media_id_2)
# Test that we cannot download any of the media anymore, especially with the
# bypass parameter set. Admins cannot download the media without supplying the
# bypass parameter, so we check that too.
self._ensure_quarantined(
non_admin_user_tok, server_and_media_id_1, include_bypass_param=True
)
self._ensure_quarantined(
non_admin_user_tok, server_and_media_id_2, include_bypass_param=True
)
self._ensure_quarantined(
admin_user_tok, server_and_media_id_1, include_bypass_param=False
)
self._ensure_quarantined(
admin_user_tok, server_and_media_id_2, include_bypass_param=False
)
def test_quarantine_all_media_by_user(self) -> None:
self.register_user("user_admin", "pass", admin=True)
@@ -263,10 +366,27 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
channel.json_body, {"num_quarantined": 3}, "Expected 3 quarantined items"
)
# Attempt to access each piece of media
self._ensure_quarantined(admin_user_tok, server_and_media_id_1)
self._ensure_quarantined(admin_user_tok, server_and_media_id_2)
self._ensure_quarantined(admin_user_tok, server_and_media_id_3)
# Attempt to access each piece of media, ensuring that it can't be downloaded
# even with a bypass parameter. Admins should not be able to download the media
# either when not supplying the bypass parameter, so we check that too.
self._ensure_quarantined(
non_admin_user_tok, server_and_media_id_1, include_bypass_param=True
)
self._ensure_quarantined(
non_admin_user_tok, server_and_media_id_2, include_bypass_param=True
)
self._ensure_quarantined(
non_admin_user_tok, server_and_media_id_3, include_bypass_param=True
)
self._ensure_quarantined(
admin_user_tok, server_and_media_id_1, include_bypass_param=False
)
self._ensure_quarantined(
admin_user_tok, server_and_media_id_2, include_bypass_param=False
)
self._ensure_quarantined(
admin_user_tok, server_and_media_id_3, include_bypass_param=False
)
def test_cannot_quarantine_safe_media(self) -> None:
self.register_user("user_admin", "pass", admin=True)
@@ -307,8 +427,14 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
)
# Attempt to access each piece of media, the first should fail, the
# second should succeed.
self._ensure_quarantined(admin_user_tok, server_and_media_id_1)
# second should succeed. We check both the non-admin user with a bypass
# parameter, and the admin user without.
self._ensure_quarantined(
non_admin_user_tok, server_and_media_id_1, include_bypass_param=True
)
self._ensure_quarantined(
admin_user_tok, server_and_media_id_1, include_bypass_param=False
)
# Attempt to access each piece of media
channel = self.make_request(

View File

@@ -756,6 +756,112 @@ class DeleteMediaByDateSizeTestCase(_AdminMediaTests):
self.assertFalse(os.path.exists(local_path))
class ListQuarantinedMediaTestCase(_AdminMediaTests):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.server_name = hs.hostname
@parameterized.expand(["local", "remote"])
def test_no_auth(self, kind: str) -> None:
"""
Try to list quarantined media without authentication.
"""
channel = self.make_request(
"GET",
"/_synapse/admin/v1/media/quarantined?kind=%s" % (kind,),
)
self.assertEqual(401, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
@parameterized.expand(["local", "remote"])
def test_requester_is_not_admin(self, kind: str) -> None:
"""
If the user is not a server admin, an error is returned.
"""
self.other_user = self.register_user("user", "pass")
self.other_user_token = self.login("user", "pass")
channel = self.make_request(
"GET",
"/_synapse/admin/v1/media/quarantined?kind=%s" % (kind,),
access_token=self.other_user_token,
)
self.assertEqual(403, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_list_quarantined_media(self) -> None:
"""
Ensure we actually get results for each page. We can't really test that
remote media is quarantined, but we can test that local media is.
"""
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
def _upload() -> str:
return self.helper.upload_media(
SMALL_PNG, tok=self.admin_user_tok, expect_code=200
)["content_uri"][6:].split("/")[1] # Cut off 'mxc://' and domain
self.media_id_1 = _upload()
self.media_id_2 = _upload()
self.media_id_3 = _upload()
def _quarantine(media_id: str) -> None:
channel = self.make_request(
"POST",
"/_synapse/admin/v1/media/quarantine/%s/%s"
% (
self.server_name,
media_id,
),
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
_quarantine(self.media_id_1)
_quarantine(self.media_id_2)
_quarantine(self.media_id_3)
# Page 1
channel = self.make_request(
"GET",
"/_synapse/admin/v1/media/quarantined?kind=local&from=0&limit=1",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(1, len(channel.json_body["media"]))
# Page 2
channel = self.make_request(
"GET",
"/_synapse/admin/v1/media/quarantined?kind=local&from=1&limit=1",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(1, len(channel.json_body["media"]))
# Page 3
channel = self.make_request(
"GET",
"/_synapse/admin/v1/media/quarantined?kind=local&from=2&limit=1",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(1, len(channel.json_body["media"]))
# Page 4 (no media)
channel = self.make_request(
"GET",
"/_synapse/admin/v1/media/quarantined?kind=local&from=3&limit=1",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(0, len(channel.json_body["media"]))
class QuarantineMediaByIDTestCase(_AdminMediaTests):
def upload_media_and_return_media_id(self, data: bytes) -> str:
# Upload some media into the room

View File

@@ -2976,6 +2976,120 @@ class JoinAliasRoomTestCase(unittest.HomeserverTestCase):
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(private_room_id, channel.json_body["joined_rooms"][0])
def test_joined_rooms(self) -> None:
"""
Test joined_rooms admin endpoint.
"""
channel = self.make_request(
"POST",
f"/_matrix/client/v3/join/{self.public_room_id}",
content={"user_id": self.second_user_id},
access_token=self.second_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(self.public_room_id, channel.json_body["room_id"])
channel = self.make_request(
"GET",
f"/_synapse/admin/v1/users/{self.second_user_id}/joined_rooms",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(self.public_room_id, channel.json_body["joined_rooms"][0])
def test_memberships(self) -> None:
"""
Test user memberships admin endpoint.
"""
channel = self.make_request(
"POST",
f"/_matrix/client/v3/join/{self.public_room_id}",
content={"user_id": self.second_user_id},
access_token=self.second_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
other_room_id = self.helper.create_room_as(
self.admin_user, tok=self.admin_user_tok
)
channel = self.make_request(
"POST",
f"/_matrix/client/v3/join/{other_room_id}",
content={"user_id": self.second_user_id},
access_token=self.second_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
channel = self.make_request(
"GET",
f"/_synapse/admin/v1/users/{self.second_user_id}/memberships",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(
{
"memberships": {
self.public_room_id: Membership.JOIN,
other_room_id: Membership.JOIN,
}
},
channel.json_body,
)
channel = self.make_request(
"POST",
f"/_matrix/client/v3/rooms/{other_room_id}/leave",
content={"user_id": self.second_user_id},
access_token=self.second_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
invited_room_id = self.helper.create_room_as(
self.admin_user, tok=self.admin_user_tok
)
channel = self.make_request(
"POST",
f"/_matrix/client/v3/rooms/{invited_room_id}/invite",
content={"user_id": self.second_user_id},
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
banned_room_id = self.helper.create_room_as(
self.admin_user, tok=self.admin_user_tok
)
channel = self.make_request(
"POST",
f"/_matrix/client/v3/rooms/{banned_room_id}/ban",
content={"user_id": self.second_user_id},
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
channel = self.make_request(
"GET",
f"/_synapse/admin/v1/users/{self.second_user_id}/memberships",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(
{
"memberships": {
self.public_room_id: Membership.JOIN,
other_room_id: Membership.LEAVE,
invited_room_id: Membership.INVITE,
banned_room_id: Membership.BAN,
}
},
channel.json_body,
)
def test_context_as_non_admin(self) -> None:
"""
Test that, without being admin, one cannot use the context admin API

View File

@@ -690,7 +690,7 @@ class SlidingSyncFiltersTestCase(SlidingSyncBase):
user1_tok = self.login(user1_id, "pass")
# Create a remote invite room without any `unsigned.invite_room_state`
_remote_invite_room_id = self._create_remote_invite_room_for_user(
_remote_invite_room_id, _ = self._create_remote_invite_room_for_user(
user1_id, None
)
@@ -760,7 +760,7 @@ class SlidingSyncFiltersTestCase(SlidingSyncBase):
# Create a remote invite room with some `unsigned.invite_room_state`
# indicating that the room is encrypted.
remote_invite_room_id = self._create_remote_invite_room_for_user(
remote_invite_room_id, _ = self._create_remote_invite_room_for_user(
user1_id,
[
StrippedStateEvent(
@@ -849,7 +849,7 @@ class SlidingSyncFiltersTestCase(SlidingSyncBase):
# Create a remote invite room with some `unsigned.invite_room_state`
# but don't set any room encryption event.
remote_invite_room_id = self._create_remote_invite_room_for_user(
remote_invite_room_id, _ = self._create_remote_invite_room_for_user(
user1_id,
[
StrippedStateEvent(
@@ -1484,7 +1484,7 @@ class SlidingSyncFiltersTestCase(SlidingSyncBase):
user1_tok = self.login(user1_id, "pass")
# Create a remote invite room without any `unsigned.invite_room_state`
_remote_invite_room_id = self._create_remote_invite_room_for_user(
_remote_invite_room_id, _ = self._create_remote_invite_room_for_user(
user1_id, None
)
@@ -1554,7 +1554,7 @@ class SlidingSyncFiltersTestCase(SlidingSyncBase):
# Create a remote invite room with some `unsigned.invite_room_state` indicating
# that it is a space room
remote_invite_room_id = self._create_remote_invite_room_for_user(
remote_invite_room_id, _ = self._create_remote_invite_room_for_user(
user1_id,
[
StrippedStateEvent(
@@ -1637,7 +1637,7 @@ class SlidingSyncFiltersTestCase(SlidingSyncBase):
# Create a remote invite room with some `unsigned.invite_room_state`
# but the create event does not specify a room type (normal room)
remote_invite_room_id = self._create_remote_invite_room_for_user(
remote_invite_room_id, _ = self._create_remote_invite_room_for_user(
user1_id,
[
StrippedStateEvent(

View File

@@ -23,6 +23,7 @@ from synapse.api.constants import EventContentFields, EventTypes, JoinRules, Mem
from synapse.handlers.sliding_sync import StateValues
from synapse.rest.client import knock, login, room, sync
from synapse.server import HomeServer
from synapse.storage.databases.main.events import DeltaState, SlidingSyncTableChanges
from synapse.util.clock import Clock
from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase
@@ -642,11 +643,6 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
{
# This appears because *some* membership in the room changed and the
# heroes are recalculated and is thrown in because we have it. But this
# is technically optional and not needed because we've already seen user2
# in the last sync (and their membership hasn't changed).
state_map[(EventTypes.Member, user2_id)],
# Appears because there is a message in the timeline from this user
state_map[(EventTypes.Member, user4_id)],
# Appears because there is a membership event in the timeline from this user
@@ -841,6 +837,437 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
exact=True,
)
def test_lazy_loading_room_members_limited_sync(self) -> None:
"""Test that when using lazy loading for room members and a limited sync
missing a membership change, we include the membership change next time
said user says something.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
# Send a message from each user to the room so that both memberships are sent down.
self.helper.send(room_id1, "1", tok=user1_tok)
self.helper.send(room_id1, "2", tok=user2_tok)
# Make a first sync with lazy loading for the room members to establish
# a position
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Member, StateValues.LAZY],
],
"timeline_limit": 2,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# We should see both membership events in required_state
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Member, user1_id)],
state_map[(EventTypes.Member, user2_id)],
},
exact=True,
)
# User2 changes their display name (causing a membership change)
self.helper.send_state(
room_id1,
event_type=EventTypes.Member,
state_key=user2_id,
body={
EventContentFields.MEMBERSHIP: Membership.JOIN,
EventContentFields.MEMBERSHIP_DISPLAYNAME: "New Name",
},
tok=user2_tok,
)
# Send a couple of messages to the room to push out the membership change
self.helper.send(room_id1, "3", tok=user1_tok)
self.helper.send(room_id1, "4", tok=user1_tok)
# Make an incremental Sliding Sync request
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
# The membership change should *not* be included yet as user2 doesn't
# have any events in the timeline.
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1].get("required_state", []),
set(),
exact=True,
)
# Now user2 sends a message to the room
self.helper.send(room_id1, "5", tok=user2_tok)
# Make another incremental Sliding Sync request
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
# The membership change should now be included as user2 has an event
# in the timeline.
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1].get("required_state", []),
{
state_map[(EventTypes.Member, user2_id)],
},
exact=True,
)
def test_lazy_loading_room_members_across_multiple_rooms(self) -> None:
"""Test that lazy loading room members are tracked per-room correctly."""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
# Create two rooms with both users in them and send a message in each
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
self.helper.send(room_id1, "room1-msg1", tok=user2_tok)
room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id2, user1_id, tok=user1_tok)
self.helper.send(room_id2, "room2-msg1", tok=user2_tok)
# Make a sync with lazy loading for the room members to establish
# a position
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Member, StateValues.LAZY],
],
"timeline_limit": 1,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# We expect to see only user2's membership in both rooms
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Member, user2_id)],
},
exact=True,
)
# Send a message in room1 from user1
self.helper.send(room_id1, "room1-msg2", tok=user1_tok)
# Make an incremental Sliding Sync request and check that we get user1's
# membership.
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Member, user1_id)],
},
exact=True,
)
# Send a message in room2 from user1
self.helper.send(room_id2, "room2-msg2", tok=user1_tok)
# Make an incremental Sliding Sync request and check that we get user1's
# membership.
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id2)
)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id2]["required_state"],
{
state_map[(EventTypes.Member, user1_id)],
},
exact=True,
)
def test_lazy_loading_room_members_across_multiple_connections(self) -> None:
"""Test that lazy loading room members are tracked per-connection
correctly.
This catches bugs where if a membership got sent down one connection,
it would incorrectly assume it was sent down another connection.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
self.helper.send(room_id1, "1", tok=user2_tok)
# Make a sync with lazy loading for the room members to establish
# a position
sync_body1 = {
"conn_id": "first-connection",
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Member, StateValues.LAZY],
],
"timeline_limit": 1,
}
},
}
response_body, from_token1 = self.do_sync(sync_body1, tok=user1_tok)
# We expect to see only user2's membership in the room
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Member, user2_id)],
},
exact=True,
)
# Now make a new connection
sync_body2 = {
"conn_id": "second-connection",
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Member, StateValues.LAZY],
],
"timeline_limit": 1,
}
},
}
response_body, from_token2 = self.do_sync(sync_body2, tok=user1_tok)
# We should see user2's membership as this is a new connection
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Member, user2_id)],
},
exact=True,
)
# If we send a message from user1 and sync again on the first connection,
# we should get user1's membership
self.helper.send(room_id1, "2", tok=user1_tok)
response_body, from_token1 = self.do_sync(
sync_body1, since=from_token1, tok=user1_tok
)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Member, user1_id)],
},
exact=True,
)
# We sync again on the first connection to "ack" the position. This
# triggers the `sliding_sync_connection_lazy_members` to set its
# connection_position to null.
self.do_sync(sync_body1, since=from_token1, tok=user1_tok)
# If we sync again on the second connection, we should also get user1's
# membership
response_body, _ = self.do_sync(sync_body2, since=from_token2, tok=user1_tok)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Member, user1_id)],
},
exact=True,
)
def test_lazy_loading_room_members_forked_position(self) -> None:
"""Test that lazy loading room members are tracked correctly when a
connection position is reused"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
self.helper.send(room_id1, "1", tok=user2_tok)
# Make a sync with lazy loading for the room members to establish
# a position
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Member, StateValues.LAZY],
],
"timeline_limit": 1,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# We expect to see only user2's membership in the room
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Member, user2_id)],
},
exact=True,
)
# Send a message in room1 from user1
self.helper.send(room_id1, "2", tok=user1_tok)
# Make an incremental Sliding Sync request and check that we get user1's
# membership.
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Member, user1_id)],
},
exact=True,
)
# Now, reuse the original position and check we still get user1's
# membership.
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Member, user1_id)],
},
exact=True,
)
def test_lazy_loading_room_members_explicit_membership_removed(self) -> None:
"""Test the case where we requested explicit memberships and then later
changed to lazy loading."""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
self.helper.send(room_id1, "1", tok=user2_tok)
# Make a sync with lazy loading for the room members to establish
# a position
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Member, StateValues.ME],
],
"timeline_limit": 1,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# We expect to see only user1's membership in the room
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Member, user1_id)],
},
exact=True,
)
# Now change to lazy loading...
sync_body["lists"]["foo-list"]["required_state"] = [
[EventTypes.Member, StateValues.LAZY],
]
# Send a message in room1 from user2
self.helper.send(room_id1, "2", tok=user2_tok)
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
# We should see user2's membership as it's in the timeline
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Member, user2_id)],
},
exact=True,
)
# Now send a message in room1 from user1
self.helper.send(room_id1, "3", tok=user1_tok)
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# We should not see any memberships as we've already seen user1's
# membership.
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1].get("required_state", []),
[],
exact=True,
)
def test_rooms_required_state_me(self) -> None:
"""
Test `rooms.required_state` correctly handles $ME.
@@ -1686,3 +2113,135 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
# We should not see the room name again, as we have already sent that
# down.
self.assertIsNone(response_body["rooms"][room_id1].get("required_state"))
def test_lazy_loading_room_members_state_reset_non_limited_timeline(self) -> None:
"""Test that when using lazy-loaded members, if a membership state is
reset to a previous state and the sync is not limited, then we send down
the state reset.
Regression test as previously we only returned membership relevant to
the timeline and so did not tell clients about state resets for
users who did not send any timeline events.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
content = self.helper.join(room_id, user1_id, tok=user1_tok)
first_event_id = content["event_id"]
# Send a message so that the user1 membership comes down sync (because we're lazy-loading room members)
self.helper.send(room_id, "msg", tok=user1_tok)
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Member, StateValues.LAZY],
],
"timeline_limit": 1,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# Check that user1 is returned
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id)
)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id]["required_state"],
{
state_map[(EventTypes.Member, user1_id)],
},
exact=True,
)
# user1 changes their display name
content = self.helper.send_state(
room_id,
EventTypes.Member,
body={"membership": "join", "displayname": "New display name"},
state_key=user1_id,
tok=user1_tok,
)
second_event_id = content["event_id"]
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
# We should see the updated membership state
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id)
)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id]["required_state"],
{
state_map[(EventTypes.Member, user1_id)],
},
exact=True,
)
self.assertEqual(
response_body["rooms"][room_id]["required_state"][0]["event_id"],
second_event_id,
)
# Now, fake a reset the membership state to the first event
persist_event_store = self.hs.get_datastores().persist_events
assert persist_event_store is not None
self.get_success(
persist_event_store.update_current_state(
room_id,
DeltaState(
to_insert={(EventTypes.Member, user1_id): first_event_id},
to_delete=[],
),
# We don't need to worry about sliding sync changes for this test
SlidingSyncTableChanges(
room_id=room_id,
joined_room_bump_stamp_to_fully_insert=None,
joined_room_updates={},
membership_snapshot_shared_insert_values={},
to_insert_membership_snapshots=[],
to_delete_membership_snapshots=[],
),
)
)
# Send a message from *user2* so that user1 wouldn't normally get
# synced.
self.helper.send(room_id, "msg2", tok=user2_tok)
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
# This should be a non-limited sync as there is only one timeline event
# (<= `timeline_limit). This is important as we're specifically testing the non-`limited`
# timeline scenario. And for reference, we don't send down state resets
# on limited timelines when using lazy loaded memberships.
self.assertFalse(
response_body["rooms"][room_id].get("limited", False),
"Expected a non-limited timeline",
)
# We should see the reset membership state of user1
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id)
)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id]["required_state"],
{
state_map[(EventTypes.Member, user1_id)],
},
)
self.assertEqual(
response_body["rooms"][room_id]["required_state"][0]["event_id"],
first_event_id,
)

View File

@@ -257,7 +257,7 @@ class SlidingSyncBase(unittest.HomeserverTestCase):
invitee_user_id: str,
unsigned_invite_room_state: list[StrippedStateEvent] | None,
invite_room_id: str | None = None,
) -> str:
) -> tuple[str, EventBase]:
"""
Create a fake invite for a remote room and persist it.
@@ -323,11 +323,13 @@ class SlidingSyncBase(unittest.HomeserverTestCase):
context = EventContext.for_outlier(self.hs.get_storage_controllers())
persist_controller = self.hs.get_storage_controllers().persistence
assert persist_controller is not None
self.get_success(persist_controller.persist_event(invite_event, context))
persisted_event, _, _ = self.get_success(
persist_controller.persist_event(invite_event, context)
)
self._remote_invite_count += 1
return invite_room_id
return invite_room_id, persisted_event
def _bump_notifier_wait_for_events(
self,
@@ -763,7 +765,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
user1_tok = self.login(user1_id, "pass")
# Create a remote room invite (out-of-band membership)
room_id = self._create_remote_invite_room_for_user(user1_id, None)
room_id, _ = self._create_remote_invite_room_for_user(user1_id, None)
# Make the Sliding Sync request
sync_body = {

View File

@@ -55,12 +55,16 @@ class UserMutualRoomsTest(unittest.HomeserverTestCase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
mutual_rooms.MUTUAL_ROOMS_BATCH_LIMIT = 10
def _get_mutual_rooms(self, token: str, other_user: str) -> FakeChannel:
def _get_mutual_rooms(
self, token: str, other_user: str, since_token: str | None = None
) -> FakeChannel:
return self.make_request(
"GET",
"/_matrix/client/unstable/uk.half-shot.msc2666/user/mutual_rooms"
f"?user_id={quote(other_user)}",
f"?user_id={quote(other_user)}"
+ (f"&from={quote(since_token)}" if since_token else ""),
access_token=token,
)
@@ -141,6 +145,52 @@ class UserMutualRoomsTest(unittest.HomeserverTestCase):
for room_id_id in channel.json_body["joined"]:
self.assertIn(room_id_id, [room_id_one, room_id_two])
def _create_rooms_for_pagination_test(
self, count: int
) -> tuple[str, str, list[str]]:
u1 = self.register_user("user1", "pass")
u1_token = self.login(u1, "pass")
u2 = self.register_user("user2", "pass")
u2_token = self.login(u2, "pass")
room_ids = []
for i in range(count):
room_id = self.helper.create_room_as(u1, is_public=i % 2 == 0, tok=u1_token)
self.helper.invite(room_id, src=u1, targ=u2, tok=u1_token)
self.helper.join(room_id, user=u2, tok=u2_token)
room_ids.append(room_id)
room_ids.sort()
return u1_token, u2, room_ids
def test_shared_room_list_pagination_two_pages(self) -> None:
u1_token, u2, room_ids = self._create_rooms_for_pagination_test(15)
channel = self._get_mutual_rooms(u1_token, u2)
self.assertEqual(200, channel.code, channel.result)
self.assertEqual(channel.json_body["joined"], room_ids[0:10])
self.assertIn("next_batch", channel.json_body)
channel = self._get_mutual_rooms(u1_token, u2, channel.json_body["next_batch"])
self.assertEqual(200, channel.code, channel.result)
self.assertEqual(channel.json_body["joined"], room_ids[10:20])
self.assertNotIn("next_batch", channel.json_body)
def test_shared_room_list_pagination_one_page(self) -> None:
u1_token, u2, room_ids = self._create_rooms_for_pagination_test(10)
channel = self._get_mutual_rooms(u1_token, u2)
self.assertEqual(200, channel.code, channel.result)
self.assertEqual(channel.json_body["joined"], room_ids)
self.assertNotIn("next_batch", channel.json_body)
def test_shared_room_list_pagination_invalid_token(self) -> None:
u1_token, u2, room_ids = self._create_rooms_for_pagination_test(10)
channel = self._get_mutual_rooms(u1_token, u2, "!<>##faketoken")
self.assertEqual(400, channel.code, channel.result)
self.assertEqual(
"M_INVALID_PARAM", channel.json_body["errcode"], channel.result
)
def test_shared_room_list_after_leave(self) -> None:
"""
A room should no longer be considered shared if the other
@@ -172,3 +222,14 @@ class UserMutualRoomsTest(unittest.HomeserverTestCase):
channel = self._get_mutual_rooms(u2_token, u1)
self.assertEqual(200, channel.code, channel.result)
self.assertEqual(len(channel.json_body["joined"]), 0)
def test_shared_room_list_nonexistent_user(self) -> None:
u1 = self.register_user("user1", "pass")
u1_token = self.login(u1, "pass")
# Check shared rooms from user1's perspective.
# We should see the one room in common
channel = self._get_mutual_rooms(u1_token, "@meow:example.com")
self.assertEqual(200, channel.code, channel.result)
self.assertEqual(len(channel.json_body["joined"]), 0)
self.assertNotIn("next_batch", channel.json_body)

View File

@@ -147,7 +147,7 @@ class FakeChannel:
_reactor: MemoryReactorClock
result: dict = attr.Factory(dict)
_ip: str = "127.0.0.1"
_producer: IPullProducer | IPushProducer | None = None
_producer: Optional[Union[IPullProducer, IPushProducer]] = None
resource_usage: ContextResourceUsage | None = None
_request: Request | None = None
@@ -248,7 +248,7 @@ class FakeChannel:
# TODO This should ensure that the IProducer is an IPushProducer or
# IPullProducer, unfortunately twisted.protocols.basic.FileSender does
# implement those, but doesn't declare it.
self._producer = cast(IPushProducer | IPullProducer, producer)
self._producer = cast(Union[IPushProducer, IPullProducer], producer)
self.producerStreaming = streaming
def _produce() -> None:
@@ -852,7 +852,7 @@ class FakeTransport:
"""Test reactor
"""
_protocol: IProtocol | None = None
_protocol: Optional[IProtocol] = None
"""The Protocol which is producing data for this transport. Optional, but if set
will get called back for connectionLost() notifications etc.
"""
@@ -871,7 +871,7 @@ class FakeTransport:
disconnected = False
connected = True
buffer: bytes = b""
producer: IPushProducer | None = None
producer: Optional[IPushProducer] = None
autoflush: bool = True
def getPeer(self) -> IPv4Address | IPv6Address:
@@ -1073,7 +1073,7 @@ def setup_test_homeserver(
cleanup_func: Callable[[Callable[[], Optional["Deferred[None]"]]], None],
server_name: str = "test",
config: HomeServerConfig | None = None,
reactor: ISynapseReactor | None = None,
reactor: Optional[ISynapseReactor] = None,
homeserver_to_use: type[HomeServer] = TestHomeServer,
db_txn_limit: int | None = None,
**extra_homeserver_attributes: Any,

View File

@@ -30,19 +30,23 @@ from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, StrippedStateEvent, make_event_from_dict
from synapse.events.snapshot import EventContext
from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.rest.client import login, room, sync
from synapse.server import HomeServer
from synapse.storage.databases.main.events import DeltaState
from synapse.storage.databases.main.events_bg_updates import (
_resolve_stale_data_in_sliding_sync_joined_rooms_table,
_resolve_stale_data_in_sliding_sync_membership_snapshots_table,
)
from synapse.types import create_requester
from synapse.types import SlidingSyncStreamToken, create_requester
from synapse.types.handlers.sliding_sync import (
LAZY_MEMBERS_UPDATE_INTERVAL,
StateValues,
)
from synapse.types.storage import _BackgroundUpdates
from synapse.util.clock import Clock
from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase
from tests.test_utils.event_injection import create_event
from tests.unittest import HomeserverTestCase
logger = logging.getLogger(__name__)
@@ -86,7 +90,7 @@ class _SlidingSyncMembershipSnapshotResult:
forgotten: bool = False
class SlidingSyncTablesTestCaseBase(HomeserverTestCase):
class SlidingSyncTablesTestCaseBase(SlidingSyncBase):
"""
Helpers to deal with testing that the
`sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` database tables are
@@ -97,6 +101,7 @@ class SlidingSyncTablesTestCaseBase(HomeserverTestCase):
admin.register_servlets,
login.register_servlets,
room.register_servlets,
sync.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
@@ -202,78 +207,6 @@ class SlidingSyncTablesTestCaseBase(HomeserverTestCase):
for row in rows
}
_remote_invite_count: int = 0
def _create_remote_invite_room_for_user(
self,
invitee_user_id: str,
unsigned_invite_room_state: list[StrippedStateEvent] | None,
) -> tuple[str, EventBase]:
"""
Create a fake invite for a remote room and persist it.
We don't have any state for these kind of rooms and can only rely on the
stripped state included in the unsigned portion of the invite event to identify
the room.
Args:
invitee_user_id: The person being invited
unsigned_invite_room_state: List of stripped state events to assist the
receiver in identifying the room.
Returns:
The room ID of the remote invite room and the persisted remote invite event.
"""
invite_room_id = f"!test_room{self._remote_invite_count}:remote_server"
invite_event_dict = {
"room_id": invite_room_id,
"sender": "@inviter:remote_server",
"state_key": invitee_user_id,
"depth": 1,
"origin_server_ts": 1,
"type": EventTypes.Member,
"content": {"membership": Membership.INVITE},
"auth_events": [],
"prev_events": [],
}
if unsigned_invite_room_state is not None:
serialized_stripped_state_events = []
for stripped_event in unsigned_invite_room_state:
serialized_stripped_state_events.append(
{
"type": stripped_event.type,
"state_key": stripped_event.state_key,
"sender": stripped_event.sender,
"content": stripped_event.content,
}
)
invite_event_dict["unsigned"] = {
"invite_room_state": serialized_stripped_state_events
}
invite_event = make_event_from_dict(
invite_event_dict,
room_version=RoomVersions.V10,
)
invite_event.internal_metadata.outlier = True
invite_event.internal_metadata.out_of_band_membership = True
self.get_success(
self.store.maybe_store_room_on_outlier_membership(
room_id=invite_room_id, room_version=invite_event.room_version
)
)
context = EventContext.for_outlier(self.hs.get_storage_controllers())
persisted_event, _, _ = self.get_success(
self.persist_controller.persist_event(invite_event, context)
)
self._remote_invite_count += 1
return invite_room_id, persisted_event
def _retract_remote_invite_for_user(
self,
user_id: str,
@@ -3052,6 +2985,141 @@ class SlidingSyncTablesTestCase(SlidingSyncTablesTestCaseBase):
exact=True,
)
def test_lazy_loading_room_members_last_seen_ts(self) -> None:
"""Test that the `last_seen_ts` column in
`sliding_sync_connection_lazy_members` is correctly kept up to date.
We expect that it only gets updated every
`LAZY_MEMBERS_UPDATE_INTERVAL`, rather than on every sync.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
self.helper.join(room_id, user1_id, tok=user1_tok)
# Send a message so that user1 comes down sync.
self.helper.send(room_id, "msg", tok=user1_tok)
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Member, StateValues.LAZY],
],
"timeline_limit": 1,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# Check that user1 is returned
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id)
)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id]["required_state"],
{
state_map[(EventTypes.Member, user1_id)],
},
exact=True,
)
# Check that we have an entry in sliding_sync_connection_lazy_members
connection_pos1 = self.get_success(
SlidingSyncStreamToken.from_string(self.store, from_token)
).connection_position
lazy_member_entries = self.get_success(
self.store.get_sliding_sync_connection_lazy_members(
connection_pos1, room_id, {user1_id}
)
)
self.assertIn(user1_id, lazy_member_entries)
prev_timestamp = lazy_member_entries[user1_id]
# If user1 sends a message then we consider it for lazy loading. We have
# previously returned it so we don't send the state down again, but it
# is still eligible for updating the timestamp. Since we last updated
# the timestamp within the last `LAZY_MEMBERS_UPDATE_INTERVAL`, we do not
# update it.
self.helper.send(room_id, "msg2", tok=user1_tok)
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
# We expect the required_state map to be empty as nothing has changed.
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id)
)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id].get("required_state", []),
{},
exact=True,
)
connection_pos2 = self.get_success(
SlidingSyncStreamToken.from_string(self.store, from_token)
).connection_position
lazy_member_entries = self.get_success(
self.store.get_sliding_sync_connection_lazy_members(
connection_pos2, room_id, {user1_id}
)
)
# The timestamp should be unchanged.
self.assertEqual(lazy_member_entries[user1_id], prev_timestamp)
# Now advance the time by `LAZY_MEMBERS_UPDATE_INTERVAL` so that we
# would update the timestamp.
self.reactor.advance(LAZY_MEMBERS_UPDATE_INTERVAL.as_secs())
# Send a message from user2
self.helper.send(room_id, "msg3", tok=user2_tok)
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
connection_pos3 = self.get_success(
SlidingSyncStreamToken.from_string(self.store, from_token)
).connection_position
lazy_member_entries = self.get_success(
self.store.get_sliding_sync_connection_lazy_members(
connection_pos3, room_id, {user1_id}
)
)
# The timestamp for user1 should be unchanged, as they were not sent down.
self.assertEqual(lazy_member_entries[user1_id], prev_timestamp)
# Now if user1 sends a message, then the timestamp should be updated as
# its been over `LAZY_MEMBERS_UPDATE_INTERVAL` since we last updated it.
# (Even though we don't send the state down again).
self.helper.send(room_id, "msg4", tok=user1_tok)
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
connection_pos4 = self.get_success(
SlidingSyncStreamToken.from_string(self.store, from_token)
).connection_position
lazy_member_entries = self.get_success(
self.store.get_sliding_sync_connection_lazy_members(
connection_pos4, room_id, {user1_id}
)
)
# The timestamp for user1 should be updated.
self.assertGreater(lazy_member_entries[user1_id], prev_timestamp)
class SlidingSyncTablesBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase):
"""

View File

@@ -37,6 +37,7 @@ from typing import (
Iterable,
Mapping,
NoReturn,
Optional,
Protocol,
TypeVar,
)
@@ -636,7 +637,7 @@ class HomeserverTestCase(TestCase):
self,
server_name: str | None = None,
config: JsonDict | None = None,
reactor: ISynapseReactor | None = None,
reactor: Optional[ISynapseReactor] = None,
clock: Clock | None = None,
**extra_homeserver_attributes: Any,
) -> HomeServer:

View File

@@ -198,7 +198,9 @@ def default_config(
"rc_invites": {
"per_room": {"per_second": 10000, "burst_count": 10000},
"per_user": {"per_second": 10000, "burst_count": 10000},
"per_issuer": {"per_second": 10000, "burst_count": 10000},
},
"rc_room_creation": {"per_second": 10000, "burst_count": 10000},
"rc_3pid_validation": {"per_second": 10000, "burst_count": 10000},
"rc_presence": {"per_user": {"per_second": 10000, "burst_count": 10000}},
"saml2_enabled": False,