mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-17 02:10:27 +00:00
Compare commits
13 Commits
dkasak/imp
...
develop
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f4320b5a49 | ||
|
|
3989d22a37 | ||
|
|
0395b71e25 | ||
|
|
29fd0116a5 | ||
|
|
0f2b29511f | ||
|
|
466994743a | ||
|
|
df24e0f302 | ||
|
|
048629dd13 | ||
|
|
7347cc436e | ||
|
|
3f636386a6 | ||
|
|
1f7f16477d | ||
|
|
dfd00a986f | ||
|
|
cdf286d405 |
@@ -7,4 +7,4 @@ if command -v yum &> /dev/null; then
|
||||
fi
|
||||
|
||||
# Install a Rust toolchain
|
||||
curl https://sh.rustup.rs -sSf | sh -s -- --default-toolchain 1.82.0 -y --profile minimal
|
||||
curl https://sh.rustup.rs -sSf | sh -s -- --default-toolchain stable -y --profile minimal
|
||||
|
||||
@@ -1,39 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# this script is run by GitHub Actions in a plain `jammy` container; it
|
||||
# - installs the minimal system requirements, and poetry;
|
||||
# - patches the project definition file to refer to old versions only;
|
||||
# - creates a venv with these old versions using poetry; and finally
|
||||
# - invokes `trial` to run the tests with old deps.
|
||||
|
||||
set -ex
|
||||
|
||||
# Prevent virtualenv from auto-updating pip to an incompatible version
|
||||
export VIRTUALENV_NO_DOWNLOAD=1
|
||||
|
||||
# TODO: in the future, we could use an implementation of
|
||||
# https://github.com/python-poetry/poetry/issues/3527
|
||||
# https://github.com/pypa/pip/issues/8085
|
||||
# to select the lowest possible versions, rather than resorting to this sed script.
|
||||
|
||||
# Patch the project definitions in-place:
|
||||
# - `-E` use extended regex syntax.
|
||||
# - Don't modify the line that defines required Python versions.
|
||||
# - Replace all lower and tilde bounds with exact bounds.
|
||||
# - Replace all caret bounds with exact bounds.
|
||||
# - Delete all lines referring to psycopg2 - so no testing of postgres support.
|
||||
# - Use pyopenssl 17.0, which is the oldest version that works with
|
||||
# a `cryptography` compiled against OpenSSL 1.1.
|
||||
# - Omit systemd: we're not logging to journal here.
|
||||
|
||||
sed -i -E '
|
||||
/^\s*requires-python\s*=/b
|
||||
s/[~>]=/==/g
|
||||
s/\^/==/g
|
||||
/psycopg2/d
|
||||
s/pyOpenSSL\s*==\s*16\.0\.0"/pyOpenSSL==17.0.0"/
|
||||
/systemd/d
|
||||
' pyproject.toml
|
||||
|
||||
echo "::group::Patched pyproject.toml"
|
||||
cat pyproject.toml
|
||||
echo "::endgroup::"
|
||||
2
.github/workflows/release-artifacts.yml
vendored
2
.github/workflows/release-artifacts.yml
vendored
@@ -5,7 +5,7 @@ name: Build release artifacts
|
||||
on:
|
||||
# we build on PRs and develop to (hopefully) get early warning
|
||||
# of things breaking (but only build one set of debs). PRs skip
|
||||
# building wheels on macOS & ARM.
|
||||
# building wheels on ARM.
|
||||
pull_request:
|
||||
push:
|
||||
branches: ["develop", "release-*"]
|
||||
|
||||
14
.github/workflows/tests.yml
vendored
14
.github/workflows/tests.yml
vendored
@@ -452,14 +452,12 @@ jobs:
|
||||
python-version: '3.10'
|
||||
|
||||
- name: Prepare old deps
|
||||
if: steps.cache-poetry-old-deps.outputs.cache-hit != 'true'
|
||||
run: .ci/scripts/prepare_old_deps.sh
|
||||
|
||||
# Note: we install using `pip` here, not poetry. `poetry install` ignores the
|
||||
# build-system section (https://github.com/python-poetry/poetry/issues/6154), but
|
||||
# we explicitly want to test that you can `pip install` using the oldest version
|
||||
# of poetry-core and setuptools-rust.
|
||||
- run: pip install .[all,test]
|
||||
# Note: we install using `uv` here, not poetry or pip to allow us to test with the
|
||||
# minimum version of all dependencies, both those explicitly specified and those
|
||||
# implicitly brought in by the explicit dependencies.
|
||||
run: |
|
||||
pip install uv
|
||||
uv pip install --system --resolution=lowest .[all,test]
|
||||
|
||||
# We nuke the local copy, as we've installed synapse into the virtualenv
|
||||
# (rather than use an editable install, which we no longer support). If we
|
||||
|
||||
1
changelog.d/19206.bugfix
Normal file
1
changelog.d/19206.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix sliding sync performance slow down for long lived connections.
|
||||
1
changelog.d/19231.bugfix
Normal file
1
changelog.d/19231.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix a bug where Mastodon posts (and possibly other embeds) have the wrong description for URL previews.
|
||||
1
changelog.d/19260.feature
Normal file
1
changelog.d/19260.feature
Normal file
@@ -0,0 +1 @@
|
||||
Add `memberships` endpoint to the admin API. This is useful for forensics and T&S purpose.
|
||||
1
changelog.d/19268.feature
Normal file
1
changelog.d/19268.feature
Normal file
@@ -0,0 +1 @@
|
||||
Add an admin API for retrieving a paginated list of quarantined media.
|
||||
1
changelog.d/19270.doc
Normal file
1
changelog.d/19270.doc
Normal file
@@ -0,0 +1 @@
|
||||
Document the importance of `public_baseurl` when configuring OpenID Connect authentication.
|
||||
1
changelog.d/19274.bugfix
Normal file
1
changelog.d/19274.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix bug introduced in 1.143.0 that broke support for versions of `zope-interface` older than 6.2.
|
||||
1
changelog.d/19275.feature
Normal file
1
changelog.d/19275.feature
Normal file
@@ -0,0 +1 @@
|
||||
Server admins can bypass the quarantine media check when downloading media by setting the `admin_unsafely_bypass_quarantine` query parameter to `true` on Client-Server API media download requests.
|
||||
1
changelog.d/19279.feature
Normal file
1
changelog.d/19279.feature
Normal file
@@ -0,0 +1 @@
|
||||
Implemented pagination for the [MSC2666](https://github.com/matrix-org/matrix-spec-proposals/pull/2666) mutual rooms endpoint. Contributed by @tulir @ Beeper.
|
||||
1
changelog.d/19281.feature
Normal file
1
changelog.d/19281.feature
Normal file
@@ -0,0 +1 @@
|
||||
Admin API: add worker support to `GET /_synapse/admin/v2/users/<user_id>`.
|
||||
1
changelog.d/19289.misc
Normal file
1
changelog.d/19289.misc
Normal file
@@ -0,0 +1 @@
|
||||
Use `uv` to test olddeps to ensure all transitive dependencies use minimum versions.
|
||||
1
changelog.d/19300.feature
Normal file
1
changelog.d/19300.feature
Normal file
@@ -0,0 +1 @@
|
||||
Improve proxy support for the `federation_client.py` dev script. Contributed by Denis Kasak (@dkasak).
|
||||
1
changelog.d/19302.misc
Normal file
1
changelog.d/19302.misc
Normal file
@@ -0,0 +1 @@
|
||||
Unpin the version of Rust we use to build Synapse wheels (was 1.82.0) now that MacOS support has been dropped.
|
||||
@@ -73,6 +73,33 @@ Response:
|
||||
}
|
||||
```
|
||||
|
||||
## Listing all quarantined media
|
||||
|
||||
This API returns a list of all quarantined media on the server. It is paginated, and can be scoped to either local or
|
||||
remote media. Note that the pagination values are also scoped to the request parameters - changing them but keeping the
|
||||
same pagination values will result in unexpected results.
|
||||
|
||||
Request:
|
||||
```http
|
||||
GET /_synapse/admin/v1/media/quarantined?from=0&limit=100&kind=local
|
||||
```
|
||||
|
||||
`from` and `limit` are optional parameters, and default to `0` and `100` respectively. They are the row index and number
|
||||
of rows to return - they are not timestamps.
|
||||
|
||||
`kind` *MUST* either be `local` or `remote`.
|
||||
|
||||
The API returns a JSON body containing MXC URIs for the quarantined media, like the following:
|
||||
|
||||
```json
|
||||
{
|
||||
"media": [
|
||||
"mxc://localhost/xwvutsrqponmlkjihgfedcba",
|
||||
"mxc://localhost/abcdefghijklmnopqrstuvwx"
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
# Quarantine media
|
||||
|
||||
Quarantining media means that it is marked as inaccessible by users. It applies
|
||||
@@ -88,6 +115,20 @@ is quarantined, Synapse will:
|
||||
- Quarantine any existing cached remote media.
|
||||
- Quarantine any future remote media.
|
||||
|
||||
## Downloading quarantined media
|
||||
|
||||
Normally, when media is quarantined, it will return a 404 error when downloaded.
|
||||
Admins can bypass this by adding `?admin_unsafely_bypass_quarantine=true`
|
||||
to the [normal download URL](https://spec.matrix.org/v1.16/client-server-api/#get_matrixclientv1mediadownloadservernamemediaid).
|
||||
|
||||
Bypassing the quarantine check is not recommended. Media is typically quarantined
|
||||
to prevent harmful content from being served to users, which includes admins. Only
|
||||
set the bypass parameter if you intentionally want to access potentially harmful
|
||||
content.
|
||||
|
||||
Non-admin users cannot bypass quarantine checks, even when specifying the above
|
||||
query parameter.
|
||||
|
||||
## Quarantining media by ID
|
||||
|
||||
This API quarantines a single piece of local or remote media.
|
||||
|
||||
@@ -505,6 +505,55 @@ with a body of:
|
||||
}
|
||||
```
|
||||
|
||||
## List room memberships of a user
|
||||
|
||||
Gets a list of room memberships for a specific `user_id`. This
|
||||
endpoint differs from
|
||||
[`GET /_synapse/admin/v1/users/<user_id>/joined_rooms`](#list-joined-rooms-of-a-user)
|
||||
in that it returns rooms with memberships other than "join".
|
||||
|
||||
The API is:
|
||||
|
||||
```
|
||||
GET /_synapse/admin/v1/users/<user_id>/memberships
|
||||
```
|
||||
|
||||
A response body like the following is returned:
|
||||
|
||||
```json
|
||||
{
|
||||
"memberships": {
|
||||
"!DuGcnbhHGaSZQoNQR:matrix.org": "join",
|
||||
"!ZtSaPCawyWtxfWiIy:matrix.org": "leave",
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
which is a list of room membership states for the given user. This endpoint can
|
||||
be used with both local and remote users, with the caveat that the homeserver will
|
||||
only be aware of the memberships for rooms that one of its local users has joined.
|
||||
|
||||
Remote user memberships may also be out of date if all local users have since left
|
||||
a room. The homeserver will thus no longer receive membership updates about it.
|
||||
|
||||
The list includes rooms that the user has since left; other membership states (knock,
|
||||
invite, etc.) are also possible.
|
||||
|
||||
Note that rooms will only disappear from this list if they are
|
||||
[purged](./rooms.md#delete-room-api) from the homeserver.
|
||||
|
||||
**Parameters**
|
||||
|
||||
The following parameters should be set in the URL:
|
||||
|
||||
- `user_id` - fully qualified: for example, `@user:server.com`.
|
||||
|
||||
**Response**
|
||||
|
||||
The following fields are returned in the JSON response body:
|
||||
|
||||
- `memberships` - A map of `room_id` (string) to `membership` state (string).
|
||||
|
||||
## List joined rooms of a user
|
||||
|
||||
Gets a list of all `room_id` that a specific `user_id` is joined to and is a member of (participating in).
|
||||
|
||||
@@ -50,6 +50,11 @@ setting in your configuration file.
|
||||
See the [configuration manual](usage/configuration/config_documentation.md#oidc_providers) for some sample settings, as well as
|
||||
the text below for example configurations for specific providers.
|
||||
|
||||
For setups using [`.well-known` delegation](delegate.md), make sure
|
||||
[`public_baseurl`](usage/configuration/config_documentation.md#public_baseurl) is set
|
||||
appropriately. If unset, Synapse defaults to `https://<server_name>/` which is used in
|
||||
the OIDC callback URL.
|
||||
|
||||
## OIDC Back-Channel Logout
|
||||
|
||||
Synapse supports receiving [OpenID Connect Back-Channel Logout](https://openid.net/specs/openid-connect-backchannel-1_0.html) notifications.
|
||||
|
||||
@@ -255,6 +255,8 @@ information.
|
||||
^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$
|
||||
^/_matrix/client/(r0|v3|unstable)/capabilities$
|
||||
^/_matrix/client/(r0|v3|unstable)/notifications$
|
||||
|
||||
# Admin API requests
|
||||
^/_synapse/admin/v1/rooms/[^/]+$
|
||||
|
||||
# Encryption requests
|
||||
@@ -300,6 +302,9 @@ Additionally, the following REST endpoints can be handled for GET requests:
|
||||
# Presence requests
|
||||
^/_matrix/client/(api/v1|r0|v3|unstable)/presence/
|
||||
|
||||
# Admin API requests
|
||||
^/_synapse/admin/v2/users/[^/]+$
|
||||
|
||||
Pagination requests can also be handled, but all requests for a given
|
||||
room must be routed to the same instance. Additionally, care must be taken to
|
||||
ensure that the purge history admin API is not used while pagination requests
|
||||
|
||||
109
poetry.lock
generated
109
poetry.lock
generated
@@ -31,7 +31,7 @@ description = "The ultimate Python library in building OAuth and OpenID Connect
|
||||
optional = true
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"all\" or extra == \"jwt\" or extra == \"oidc\""
|
||||
markers = "extra == \"oidc\" or extra == \"jwt\" or extra == \"all\""
|
||||
files = [
|
||||
{file = "authlib-1.6.5-py2.py3-none-any.whl", hash = "sha256:3e0e0507807f842b02175507bdee8957a1d5707fd4afb17c32fb43fee90b6e3a"},
|
||||
{file = "authlib-1.6.5.tar.gz", hash = "sha256:6aaf9c79b7cc96c900f0b284061691c5d4e61221640a948fe690b556a6d6d10b"},
|
||||
@@ -481,7 +481,7 @@ description = "XML bomb protection for Python stdlib modules"
|
||||
optional = true
|
||||
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"all\" or extra == \"saml2\""
|
||||
markers = "extra == \"saml2\" or extra == \"all\""
|
||||
files = [
|
||||
{file = "defusedxml-0.7.1-py2.py3-none-any.whl", hash = "sha256:a352e7e428770286cc899e2542b6cdaedb2b4953ff269a210103ec58f6198a61"},
|
||||
{file = "defusedxml-0.7.1.tar.gz", hash = "sha256:1bb3032db185915b62d7c6209c5a8792be6a32ab2fedacc84e01b52c51aa3e69"},
|
||||
@@ -506,7 +506,7 @@ description = "XPath 1.0/2.0/3.0/3.1 parsers and selectors for ElementTree and l
|
||||
optional = true
|
||||
python-versions = ">=3.7"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"all\" or extra == \"saml2\""
|
||||
markers = "extra == \"saml2\" or extra == \"all\""
|
||||
files = [
|
||||
{file = "elementpath-4.1.5-py3-none-any.whl", hash = "sha256:2ac1a2fb31eb22bbbf817f8cf6752f844513216263f0e3892c8e79782fe4bb55"},
|
||||
{file = "elementpath-4.1.5.tar.gz", hash = "sha256:c2d6dc524b29ef751ecfc416b0627668119d8812441c555d7471da41d4bacb8d"},
|
||||
@@ -556,7 +556,7 @@ description = "Python wrapper for hiredis"
|
||||
optional = true
|
||||
python-versions = ">=3.8"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"all\" or extra == \"redis\""
|
||||
markers = "extra == \"redis\" or extra == \"all\""
|
||||
files = [
|
||||
{file = "hiredis-3.3.0-cp310-cp310-macosx_10_15_universal2.whl", hash = "sha256:9937d9b69321b393fbace69f55423480f098120bc55a3316e1ca3508c4dbbd6f"},
|
||||
{file = "hiredis-3.3.0-cp310-cp310-macosx_10_15_x86_64.whl", hash = "sha256:50351b77f89ba6a22aff430b993653847f36b71d444509036baa0f2d79d1ebf4"},
|
||||
@@ -879,7 +879,7 @@ description = "Jaeger Python OpenTracing Tracer implementation"
|
||||
optional = true
|
||||
python-versions = ">=3.7"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"all\" or extra == \"opentracing\""
|
||||
markers = "extra == \"opentracing\" or extra == \"all\""
|
||||
files = [
|
||||
{file = "jaeger-client-4.8.0.tar.gz", hash = "sha256:3157836edab8e2c209bd2d6ae61113db36f7ee399e66b1dcbb715d87ab49bfe0"},
|
||||
]
|
||||
@@ -1017,7 +1017,7 @@ description = "A strictly RFC 4510 conforming LDAP V3 pure Python client library
|
||||
optional = true
|
||||
python-versions = "*"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"all\" or extra == \"matrix-synapse-ldap3\""
|
||||
markers = "extra == \"matrix-synapse-ldap3\" or extra == \"all\""
|
||||
files = [
|
||||
{file = "ldap3-2.9.1-py2.py3-none-any.whl", hash = "sha256:5869596fc4948797020d3f03b7939da938778a0f9e2009f7a072ccf92b8e8d70"},
|
||||
{file = "ldap3-2.9.1.tar.gz", hash = "sha256:f3e7fc4718e3f09dda568b57100095e0ce58633bcabbed8667ce3f8fbaa4229f"},
|
||||
@@ -1119,7 +1119,7 @@ description = "Powerful and Pythonic XML processing library combining libxml2/li
|
||||
optional = true
|
||||
python-versions = ">=3.8"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"all\" or extra == \"url-preview\""
|
||||
markers = "extra == \"url-preview\" or extra == \"all\""
|
||||
files = [
|
||||
{file = "lxml-6.0.2-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:e77dd455b9a16bbd2a5036a63ddbd479c19572af81b624e79ef422f929eef388"},
|
||||
{file = "lxml-6.0.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:5d444858b9f07cefff6455b983aea9a67f7462ba1f6cbe4a21e8bf6791bf2153"},
|
||||
@@ -1405,7 +1405,7 @@ description = "An LDAP3 auth provider for Synapse"
|
||||
optional = true
|
||||
python-versions = ">=3.7"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"all\" or extra == \"matrix-synapse-ldap3\""
|
||||
markers = "extra == \"matrix-synapse-ldap3\" or extra == \"all\""
|
||||
files = [
|
||||
{file = "matrix-synapse-ldap3-0.3.0.tar.gz", hash = "sha256:8bb6517173164d4b9cc44f49de411d8cebdb2e705d5dd1ea1f38733c4a009e1d"},
|
||||
{file = "matrix_synapse_ldap3-0.3.0-py3-none-any.whl", hash = "sha256:8b4d701f8702551e98cc1d8c20dbed532de5613584c08d0df22de376ba99159d"},
|
||||
@@ -1648,7 +1648,7 @@ description = "OpenTracing API for Python. See documentation at http://opentraci
|
||||
optional = true
|
||||
python-versions = "*"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"all\" or extra == \"opentracing\""
|
||||
markers = "extra == \"opentracing\" or extra == \"all\""
|
||||
files = [
|
||||
{file = "opentracing-2.4.0.tar.gz", hash = "sha256:a173117e6ef580d55874734d1fa7ecb6f3655160b8b8974a2a1e98e5ec9c840d"},
|
||||
]
|
||||
@@ -1838,7 +1838,7 @@ description = "psycopg2 - Python-PostgreSQL Database Adapter"
|
||||
optional = true
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"all\" or extra == \"postgres\""
|
||||
markers = "extra == \"postgres\" or extra == \"all\""
|
||||
files = [
|
||||
{file = "psycopg2-2.9.11-cp310-cp310-win_amd64.whl", hash = "sha256:103e857f46bb76908768ead4e2d0ba1d1a130e7b8ed77d3ae91e8b33481813e8"},
|
||||
{file = "psycopg2-2.9.11-cp311-cp311-win_amd64.whl", hash = "sha256:210daed32e18f35e3140a1ebe059ac29209dd96468f2f7559aa59f75ee82a5cb"},
|
||||
@@ -1856,7 +1856,7 @@ description = ".. image:: https://travis-ci.org/chtd/psycopg2cffi.svg?branch=mas
|
||||
optional = true
|
||||
python-versions = "*"
|
||||
groups = ["main"]
|
||||
markers = "platform_python_implementation == \"PyPy\" and (extra == \"all\" or extra == \"postgres\")"
|
||||
markers = "platform_python_implementation == \"PyPy\" and (extra == \"postgres\" or extra == \"all\")"
|
||||
files = [
|
||||
{file = "psycopg2cffi-2.9.0.tar.gz", hash = "sha256:7e272edcd837de3a1d12b62185eb85c45a19feda9e62fa1b120c54f9e8d35c52"},
|
||||
]
|
||||
@@ -1872,7 +1872,7 @@ description = "A Simple library to enable psycopg2 compatability"
|
||||
optional = true
|
||||
python-versions = "*"
|
||||
groups = ["main"]
|
||||
markers = "platform_python_implementation == \"PyPy\" and (extra == \"all\" or extra == \"postgres\")"
|
||||
markers = "platform_python_implementation == \"PyPy\" and (extra == \"postgres\" or extra == \"all\")"
|
||||
files = [
|
||||
{file = "psycopg2cffi-compat-1.1.tar.gz", hash = "sha256:d25e921748475522b33d13420aad5c2831c743227dc1f1f2585e0fdb5c914e05"},
|
||||
]
|
||||
@@ -2154,7 +2154,7 @@ description = "A development tool to measure, monitor and analyze the memory beh
|
||||
optional = true
|
||||
python-versions = ">=3.6"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"all\" or extra == \"cache-memory\""
|
||||
markers = "extra == \"cache-memory\" or extra == \"all\""
|
||||
files = [
|
||||
{file = "Pympler-1.0.1-py3-none-any.whl", hash = "sha256:d260dda9ae781e1eab6ea15bacb84015849833ba5555f141d2d9b7b7473b307d"},
|
||||
{file = "Pympler-1.0.1.tar.gz", hash = "sha256:993f1a3599ca3f4fcd7160c7545ad06310c9e12f70174ae7ae8d4e25f6c5d3fa"},
|
||||
@@ -2207,6 +2207,63 @@ typing-extensions = {version = ">=4.9", markers = "python_version < \"3.13\" and
|
||||
docs = ["sphinx (!=5.2.0,!=5.2.0.post0,!=7.2.5)", "sphinx_rtd_theme"]
|
||||
test = ["pretend", "pytest (>=3.0.1)", "pytest-rerunfailures"]
|
||||
|
||||
[[package]]
|
||||
name = "pyparsing"
|
||||
version = "3.2.5"
|
||||
description = "pyparsing - Classes and methods to define and execute parsing grammars"
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "pyparsing-3.2.5-py3-none-any.whl", hash = "sha256:e38a4f02064cf41fe6593d328d0512495ad1f3d8a91c4f73fc401b3079a59a5e"},
|
||||
{file = "pyparsing-3.2.5.tar.gz", hash = "sha256:2df8d5b7b2802ef88e8d016a2eb9c7aeaa923529cd251ed0fe4608275d4105b6"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
diagrams = ["jinja2", "railroad-diagrams"]
|
||||
|
||||
[[package]]
|
||||
name = "pyrsistent"
|
||||
version = "0.20.0"
|
||||
description = "Persistent/Functional/Immutable data structures"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "pyrsistent-0.20.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:8c3aba3e01235221e5b229a6c05f585f344734bd1ad42a8ac51493d74722bbce"},
|
||||
{file = "pyrsistent-0.20.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c1beb78af5423b879edaf23c5591ff292cf7c33979734c99aa66d5914ead880f"},
|
||||
{file = "pyrsistent-0.20.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:21cc459636983764e692b9eba7144cdd54fdec23ccdb1e8ba392a63666c60c34"},
|
||||
{file = "pyrsistent-0.20.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f5ac696f02b3fc01a710427585c855f65cd9c640e14f52abe52020722bb4906b"},
|
||||
{file = "pyrsistent-0.20.0-cp310-cp310-win32.whl", hash = "sha256:0724c506cd8b63c69c7f883cc233aac948c1ea946ea95996ad8b1380c25e1d3f"},
|
||||
{file = "pyrsistent-0.20.0-cp310-cp310-win_amd64.whl", hash = "sha256:8441cf9616d642c475684d6cf2520dd24812e996ba9af15e606df5f6fd9d04a7"},
|
||||
{file = "pyrsistent-0.20.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:0f3b1bcaa1f0629c978b355a7c37acd58907390149b7311b5db1b37648eb6958"},
|
||||
{file = "pyrsistent-0.20.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5cdd7ef1ea7a491ae70d826b6cc64868de09a1d5ff9ef8d574250d0940e275b8"},
|
||||
{file = "pyrsistent-0.20.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:cae40a9e3ce178415040a0383f00e8d68b569e97f31928a3a8ad37e3fde6df6a"},
|
||||
{file = "pyrsistent-0.20.0-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6288b3fa6622ad8a91e6eb759cfc48ff3089e7c17fb1d4c59a919769314af224"},
|
||||
{file = "pyrsistent-0.20.0-cp311-cp311-win32.whl", hash = "sha256:7d29c23bdf6e5438c755b941cef867ec2a4a172ceb9f50553b6ed70d50dfd656"},
|
||||
{file = "pyrsistent-0.20.0-cp311-cp311-win_amd64.whl", hash = "sha256:59a89bccd615551391f3237e00006a26bcf98a4d18623a19909a2c48b8e986ee"},
|
||||
{file = "pyrsistent-0.20.0-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:09848306523a3aba463c4b49493a760e7a6ca52e4826aa100ee99d8d39b7ad1e"},
|
||||
{file = "pyrsistent-0.20.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a14798c3005ec892bbada26485c2eea3b54109cb2533713e355c806891f63c5e"},
|
||||
{file = "pyrsistent-0.20.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b14decb628fac50db5e02ee5a35a9c0772d20277824cfe845c8a8b717c15daa3"},
|
||||
{file = "pyrsistent-0.20.0-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2e2c116cc804d9b09ce9814d17df5edf1df0c624aba3b43bc1ad90411487036d"},
|
||||
{file = "pyrsistent-0.20.0-cp312-cp312-win32.whl", hash = "sha256:e78d0c7c1e99a4a45c99143900ea0546025e41bb59ebc10182e947cf1ece9174"},
|
||||
{file = "pyrsistent-0.20.0-cp312-cp312-win_amd64.whl", hash = "sha256:4021a7f963d88ccd15b523787d18ed5e5269ce57aa4037146a2377ff607ae87d"},
|
||||
{file = "pyrsistent-0.20.0-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:79ed12ba79935adaac1664fd7e0e585a22caa539dfc9b7c7c6d5ebf91fb89054"},
|
||||
{file = "pyrsistent-0.20.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f920385a11207dc372a028b3f1e1038bb244b3ec38d448e6d8e43c6b3ba20e98"},
|
||||
{file = "pyrsistent-0.20.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4f5c2d012671b7391803263419e31b5c7c21e7c95c8760d7fc35602353dee714"},
|
||||
{file = "pyrsistent-0.20.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ef3992833fbd686ee783590639f4b8343a57f1f75de8633749d984dc0eb16c86"},
|
||||
{file = "pyrsistent-0.20.0-cp38-cp38-win32.whl", hash = "sha256:881bbea27bbd32d37eb24dd320a5e745a2a5b092a17f6debc1349252fac85423"},
|
||||
{file = "pyrsistent-0.20.0-cp38-cp38-win_amd64.whl", hash = "sha256:6d270ec9dd33cdb13f4d62c95c1a5a50e6b7cdd86302b494217137f760495b9d"},
|
||||
{file = "pyrsistent-0.20.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:ca52d1ceae015859d16aded12584c59eb3825f7b50c6cfd621d4231a6cc624ce"},
|
||||
{file = "pyrsistent-0.20.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b318ca24db0f0518630e8b6f3831e9cba78f099ed5c1d65ffe3e023003043ba0"},
|
||||
{file = "pyrsistent-0.20.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fed2c3216a605dc9a6ea50c7e84c82906e3684c4e80d2908208f662a6cbf9022"},
|
||||
{file = "pyrsistent-0.20.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2e14c95c16211d166f59c6611533d0dacce2e25de0f76e4c140fde250997b3ca"},
|
||||
{file = "pyrsistent-0.20.0-cp39-cp39-win32.whl", hash = "sha256:f058a615031eea4ef94ead6456f5ec2026c19fb5bd6bfe86e9665c4158cf802f"},
|
||||
{file = "pyrsistent-0.20.0-cp39-cp39-win_amd64.whl", hash = "sha256:58b8f6366e152092194ae68fefe18b9f0b4f89227dfd86a07770c3d86097aebf"},
|
||||
{file = "pyrsistent-0.20.0-py3-none-any.whl", hash = "sha256:c55acc4733aad6560a7f5f818466631f07efc001fd023f34a6c203f8b6df0f0b"},
|
||||
{file = "pyrsistent-0.20.0.tar.gz", hash = "sha256:4c48f78f62ab596c679086084d0dd13254ae4f3d6c72a83ffdf5ebdef8f265a4"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pysaml2"
|
||||
version = "7.5.0"
|
||||
@@ -2214,7 +2271,7 @@ description = "Python implementation of SAML Version 2 Standard"
|
||||
optional = true
|
||||
python-versions = ">=3.9,<4.0"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"all\" or extra == \"saml2\""
|
||||
markers = "extra == \"saml2\" or extra == \"all\""
|
||||
files = [
|
||||
{file = "pysaml2-7.5.0-py3-none-any.whl", hash = "sha256:bc6627cc344476a83c757f440a73fda1369f13b6fda1b4e16bca63ffbabb5318"},
|
||||
{file = "pysaml2-7.5.0.tar.gz", hash = "sha256:f36871d4e5ee857c6b85532e942550d2cf90ea4ee943d75eb681044bbc4f54f7"},
|
||||
@@ -2239,7 +2296,7 @@ description = "Extensions to the standard Python datetime module"
|
||||
optional = true
|
||||
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"all\" or extra == \"saml2\""
|
||||
markers = "extra == \"saml2\" or extra == \"all\""
|
||||
files = [
|
||||
{file = "python-dateutil-2.8.2.tar.gz", hash = "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86"},
|
||||
{file = "python_dateutil-2.8.2-py2.py3-none-any.whl", hash = "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9"},
|
||||
@@ -2267,7 +2324,7 @@ description = "World timezone definitions, modern and historical"
|
||||
optional = true
|
||||
python-versions = "*"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"all\" or extra == \"saml2\""
|
||||
markers = "extra == \"saml2\" or extra == \"all\""
|
||||
files = [
|
||||
{file = "pytz-2025.2-py2.py3-none-any.whl", hash = "sha256:5ddf76296dd8c44c26eb8f4b6f35488f3ccbf6fbbd7adee0b7262d43f0ec2f00"},
|
||||
{file = "pytz-2025.2.tar.gz", hash = "sha256:360b9e3dbb49a209c21ad61809c7fb453643e048b38924c765813546746e81c3"},
|
||||
@@ -2671,7 +2728,7 @@ description = "Python client for Sentry (https://sentry.io)"
|
||||
optional = true
|
||||
python-versions = ">=3.6"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"all\" or extra == \"sentry\""
|
||||
markers = "extra == \"sentry\" or extra == \"all\""
|
||||
files = [
|
||||
{file = "sentry_sdk-2.46.0-py2.py3-none-any.whl", hash = "sha256:4eeeb60198074dff8d066ea153fa6f241fef1668c10900ea53a4200abc8da9b1"},
|
||||
{file = "sentry_sdk-2.46.0.tar.gz", hash = "sha256:91821a23460725734b7741523021601593f35731808afc0bb2ba46c27b8acd91"},
|
||||
@@ -2881,7 +2938,7 @@ description = "Tornado IOLoop Backed Concurrent Futures"
|
||||
optional = true
|
||||
python-versions = "*"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"all\" or extra == \"opentracing\""
|
||||
markers = "extra == \"opentracing\" or extra == \"all\""
|
||||
files = [
|
||||
{file = "threadloop-1.0.2-py2-none-any.whl", hash = "sha256:5c90dbefab6ffbdba26afb4829d2a9df8275d13ac7dc58dccb0e279992679599"},
|
||||
{file = "threadloop-1.0.2.tar.gz", hash = "sha256:8b180aac31013de13c2ad5c834819771992d350267bddb854613ae77ef571944"},
|
||||
@@ -2897,7 +2954,7 @@ description = "Python bindings for the Apache Thrift RPC system"
|
||||
optional = true
|
||||
python-versions = "*"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"all\" or extra == \"opentracing\""
|
||||
markers = "extra == \"opentracing\" or extra == \"all\""
|
||||
files = [
|
||||
{file = "thrift-0.16.0.tar.gz", hash = "sha256:2b5b6488fcded21f9d312aa23c9ff6a0195d0f6ae26ddbd5ad9e3e25dfc14408"},
|
||||
]
|
||||
@@ -2970,7 +3027,7 @@ description = "Tornado is a Python web framework and asynchronous networking lib
|
||||
optional = true
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"all\" or extra == \"opentracing\""
|
||||
markers = "extra == \"opentracing\" or extra == \"all\""
|
||||
files = [
|
||||
{file = "tornado-6.5-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:f81067dad2e4443b015368b24e802d0083fecada4f0a4572fdb72fc06e54a9a6"},
|
||||
{file = "tornado-6.5-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:9ac1cbe1db860b3cbb251e795c701c41d343f06a96049d6274e7c77559117e41"},
|
||||
@@ -3104,7 +3161,7 @@ description = "non-blocking redis client for python"
|
||||
optional = true
|
||||
python-versions = "*"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"all\" or extra == \"redis\""
|
||||
markers = "extra == \"redis\" or extra == \"all\""
|
||||
files = [
|
||||
{file = "txredisapi-1.4.11-py3-none-any.whl", hash = "sha256:ac64d7a9342b58edca13ef267d4fa7637c1aa63f8595e066801c1e8b56b22d0b"},
|
||||
{file = "txredisapi-1.4.11.tar.gz", hash = "sha256:3eb1af99aefdefb59eb877b1dd08861efad60915e30ad5bf3d5bf6c5cedcdbc6"},
|
||||
@@ -3350,7 +3407,7 @@ description = "An XML Schema validator and decoder"
|
||||
optional = true
|
||||
python-versions = ">=3.7"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"all\" or extra == \"saml2\""
|
||||
markers = "extra == \"saml2\" or extra == \"all\""
|
||||
files = [
|
||||
{file = "xmlschema-2.4.0-py3-none-any.whl", hash = "sha256:dc87be0caaa61f42649899189aab2fd8e0d567f2cf548433ba7b79278d231a4a"},
|
||||
{file = "xmlschema-2.4.0.tar.gz", hash = "sha256:d74cd0c10866ac609e1ef94a5a69b018ad16e39077bc6393408b40c6babee793"},
|
||||
@@ -3468,15 +3525,15 @@ docs = ["Sphinx", "repoze.sphinx.autointerface"]
|
||||
test = ["zope.i18nmessageid", "zope.testing", "zope.testrunner"]
|
||||
|
||||
[extras]
|
||||
all = ["authlib", "hiredis", "jaeger-client", "lxml", "matrix-synapse-ldap3", "opentracing", "psycopg2", "psycopg2cffi", "psycopg2cffi-compat", "pympler", "pysaml2", "sentry-sdk", "txredisapi"]
|
||||
all = ["authlib", "defusedxml", "hiredis", "jaeger-client", "lxml", "matrix-synapse-ldap3", "opentracing", "psycopg2", "psycopg2cffi", "psycopg2cffi-compat", "pympler", "pysaml2", "pytz", "sentry-sdk", "thrift", "tornado", "txredisapi"]
|
||||
cache-memory = ["pympler"]
|
||||
jwt = ["authlib"]
|
||||
matrix-synapse-ldap3 = ["matrix-synapse-ldap3"]
|
||||
oidc = ["authlib"]
|
||||
opentracing = ["jaeger-client", "opentracing"]
|
||||
opentracing = ["jaeger-client", "opentracing", "thrift", "tornado"]
|
||||
postgres = ["psycopg2", "psycopg2cffi", "psycopg2cffi-compat"]
|
||||
redis = ["hiredis", "txredisapi"]
|
||||
saml2 = ["pysaml2"]
|
||||
saml2 = ["defusedxml", "pysaml2", "pytz"]
|
||||
sentry = ["sentry-sdk"]
|
||||
systemd = ["systemd-python"]
|
||||
test = ["idna", "parameterized"]
|
||||
@@ -3485,4 +3542,4 @@ url-preview = ["lxml"]
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = ">=3.10.0,<4.0.0"
|
||||
content-hash = "960ddae65fde8574f0f36b6988622fc4baf7646823c36699c5cd4773cad8b0ed"
|
||||
content-hash = "1caa5072f6304122c89377420f993a54f54587f3618ccc8094ec31642264592c"
|
||||
|
||||
@@ -42,7 +42,8 @@ dependencies = [
|
||||
"Twisted[tls]>=21.2.0",
|
||||
"treq>=21.5.0",
|
||||
# Twisted has required pyopenssl 16.0 since about Twisted 16.6.
|
||||
"pyOpenSSL>=16.0.0",
|
||||
# pyOpenSSL 16.2.0 fixes compatibility with OpenSSL 1.1.0.
|
||||
"pyOpenSSL>=16.2.0",
|
||||
"PyYAML>=5.3",
|
||||
"pyasn1>=0.1.9",
|
||||
"pyasn1-modules>=0.0.7",
|
||||
@@ -95,6 +96,25 @@ dependencies = [
|
||||
|
||||
# This is used for parsing multipart responses
|
||||
"python-multipart>=0.0.9",
|
||||
|
||||
# Transitive dependency constraints
|
||||
# These dependencies aren't directly required by Synapse.
|
||||
# However, in order for Synapse to build, Synapse requires a higher minimum version
|
||||
# for these dependencies than the minimum specified by the direct dependency.
|
||||
# We should periodically check to see if these dependencies are still necessary and
|
||||
# remove any that are no longer required.
|
||||
"cffi>=1.15", # via cryptography
|
||||
"pynacl>=1.3", # via signedjson
|
||||
"pyparsing>=2.4", # via packaging
|
||||
"pyrsistent>=0.18.0", # via jsonschema
|
||||
"requests>=2.16.0", # 2.16.0+ no longer vendors urllib3, avoiding Python 3.10+ incompatibility
|
||||
"urllib3>=1.26.5", # via treq; 1.26.5 fixes Python 3.10+ collections.abc compatibility
|
||||
# 5.2 is the current version in Debian oldstable. If we don't care to support that, then 5.4 is
|
||||
# the minimum version from Ubuntu 22.04 and RHEL 9. (as of 2025-12)
|
||||
# When bumping this version to 6.2 or above, refer to https://github.com/element-hq/synapse/pull/19274
|
||||
# for details of Synapse improvements that may be unlocked. Particularly around the use of `|`
|
||||
# syntax with zope interface types.
|
||||
"zope-interface>=5.2", # via twisted
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
@@ -104,7 +124,16 @@ postgres = [
|
||||
"psycopg2cffi>=2.8;platform_python_implementation == 'PyPy'",
|
||||
"psycopg2cffi-compat==1.1;platform_python_implementation == 'PyPy'",
|
||||
]
|
||||
saml2 = ["pysaml2>=4.5.0"]
|
||||
saml2 = [
|
||||
"pysaml2>=4.5.0",
|
||||
|
||||
# Transitive dependencies from pysaml2
|
||||
# These dependencies aren't directly required by Synapse.
|
||||
# However, in order for Synapse to build, Synapse requires a higher minimum version
|
||||
# for these dependencies than the minimum specified by the direct dependency.
|
||||
"defusedxml>=0.7.1", # via pysaml2
|
||||
"pytz>=2018.3", # via pysaml2
|
||||
]
|
||||
oidc = ["authlib>=0.15.1"]
|
||||
# systemd-python is necessary for logging to the systemd journal via
|
||||
# `systemd.journal.JournalHandler`, as is documented in
|
||||
@@ -112,13 +141,23 @@ oidc = ["authlib>=0.15.1"]
|
||||
systemd = ["systemd-python>=231"]
|
||||
url-preview = ["lxml>=4.6.3"]
|
||||
sentry = ["sentry-sdk>=0.7.2"]
|
||||
opentracing = ["jaeger-client>=4.2.0", "opentracing>=2.2.0"]
|
||||
opentracing = [
|
||||
"jaeger-client>=4.2.0",
|
||||
"opentracing>=2.2.0",
|
||||
|
||||
# Transitive dependencies from jaeger-client
|
||||
# These dependencies aren't directly required by Synapse.
|
||||
# However, in order for Synapse to build, Synapse requires a higher minimum version
|
||||
# for these dependencies than the minimum specified by the direct dependency.
|
||||
"thrift>=0.10", # via jaeger-client
|
||||
"tornado>=6.0", # via jaeger-client
|
||||
]
|
||||
jwt = ["authlib"]
|
||||
# hiredis is not a *strict* dependency, but it makes things much faster.
|
||||
# (if it is not installed, we fall back to slow code.)
|
||||
redis = ["txredisapi>=1.4.7", "hiredis"]
|
||||
redis = ["txredisapi>=1.4.7", "hiredis>=0.3"]
|
||||
# Required to use experimental `caches.track_memory_usage` config option.
|
||||
cache-memory = ["pympler"]
|
||||
cache-memory = ["pympler>=1.0"]
|
||||
# If this is updated, don't forget to update the equivalent lines in
|
||||
# `dependency-groups.dev` below.
|
||||
test = ["parameterized>=0.9.0", "idna>=3.3"]
|
||||
@@ -149,12 +188,22 @@ all = [
|
||||
# opentracing
|
||||
"jaeger-client>=4.2.0", "opentracing>=2.2.0",
|
||||
# redis
|
||||
"txredisapi>=1.4.7", "hiredis",
|
||||
"txredisapi>=1.4.7", "hiredis>=0.3",
|
||||
# cache-memory
|
||||
"pympler",
|
||||
# 1.0 added support for python 3.10, our current minimum supported python version
|
||||
"pympler>=1.0",
|
||||
# omitted:
|
||||
# - test: it's useful to have this separate from dev deps in the olddeps job
|
||||
# - systemd: this is a system-based requirement
|
||||
|
||||
# Transitive dependencies
|
||||
# These dependencies aren't directly required by Synapse.
|
||||
# However, in order for Synapse to build, Synapse requires a higher minimum version
|
||||
# for these dependencies than the minimum specified by the direct dependency.
|
||||
"defusedxml>=0.7.1", # via pysaml2
|
||||
"pytz>=2018.3", # via pysaml2
|
||||
"thrift>=0.10", # via jaeger-client
|
||||
"tornado>=6.0", # via jaeger-client
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
@@ -339,15 +388,10 @@ select = [
|
||||
"G",
|
||||
# pyupgrade
|
||||
"UP006",
|
||||
"UP007",
|
||||
"UP045",
|
||||
]
|
||||
extend-safe-fixes = [
|
||||
# pyupgrade rules compatible with Python >= 3.9
|
||||
"UP006",
|
||||
"UP007",
|
||||
# pyupgrade rules compatible with Python >= 3.10
|
||||
"UP045",
|
||||
# Allow ruff to automatically fix trailing spaces within a multi-line string/comment.
|
||||
"W293"
|
||||
]
|
||||
@@ -427,9 +471,6 @@ skip = "cp3??t-* *i686* *macosx*"
|
||||
enable = "pypy"
|
||||
|
||||
# We need a rust compiler.
|
||||
#
|
||||
# We temporarily pin Rust to 1.82.0 to work around
|
||||
# https://github.com/element-hq/synapse/issues/17988
|
||||
before-all = "sh .ci/before_build_wheel.sh"
|
||||
environment= { PATH = "$PATH:$HOME/.cargo/bin" }
|
||||
|
||||
|
||||
@@ -14,7 +14,6 @@ import sqlglot.expressions
|
||||
|
||||
SCHEMA_FILE_REGEX = re.compile(r"^synapse/storage/schema/(.*)/delta/(.*)/(.*)$")
|
||||
|
||||
|
||||
# The base branch we want to check against. We use the main development branch
|
||||
# on the assumption that is what we are developing against.
|
||||
DEVELOP_BRANCH = "develop"
|
||||
|
||||
@@ -145,7 +145,7 @@ def request(
|
||||
print("Requesting %s" % dest, file=sys.stderr)
|
||||
|
||||
s = requests.Session()
|
||||
s.mount("matrix-federation://", MatrixConnectionAdapter())
|
||||
s.mount("matrix-federation://", MatrixConnectionAdapter(verify_tls=verify_tls))
|
||||
|
||||
headers: dict[str, str] = {
|
||||
"Authorization": authorization_headers[0],
|
||||
@@ -267,6 +267,17 @@ def read_args_from_config(args: argparse.Namespace) -> None:
|
||||
|
||||
|
||||
class MatrixConnectionAdapter(HTTPAdapter):
|
||||
"""
|
||||
A Matrix federation-aware HTTP Adapter.
|
||||
"""
|
||||
|
||||
verify_tls: bool
|
||||
"""whether to verify the remote server's TLS certificate."""
|
||||
|
||||
def __init__(self, verify_tls: bool = True) -> None:
|
||||
self.verify_tls = verify_tls
|
||||
super().__init__()
|
||||
|
||||
def send(
|
||||
self,
|
||||
request: PreparedRequest,
|
||||
@@ -280,7 +291,7 @@ class MatrixConnectionAdapter(HTTPAdapter):
|
||||
assert isinstance(request.url, str)
|
||||
parsed = urlparse.urlsplit(request.url)
|
||||
server_name = parsed.netloc
|
||||
well_known = self._get_well_known(parsed.netloc)
|
||||
well_known = self._get_well_known(parsed.netloc, verify_tls=self.verify_tls)
|
||||
|
||||
if well_known:
|
||||
server_name = well_known
|
||||
@@ -318,6 +329,21 @@ class MatrixConnectionAdapter(HTTPAdapter):
|
||||
print(
|
||||
f"Connecting to {host}:{port} with SNI {ssl_server_name}", file=sys.stderr
|
||||
)
|
||||
|
||||
if proxies:
|
||||
scheme = parsed.scheme
|
||||
if isinstance(scheme, bytes):
|
||||
scheme = scheme.decode("utf-8")
|
||||
|
||||
proxy_for_scheme = proxies.get(scheme)
|
||||
if proxy_for_scheme:
|
||||
return self.proxy_manager_for(proxy_for_scheme).connection_from_host(
|
||||
host,
|
||||
port=port,
|
||||
scheme="https",
|
||||
pool_kwargs={"server_hostname": ssl_server_name},
|
||||
)
|
||||
|
||||
return self.poolmanager.connection_from_host(
|
||||
host,
|
||||
port=port,
|
||||
@@ -368,7 +394,7 @@ class MatrixConnectionAdapter(HTTPAdapter):
|
||||
return server_name, 8448, server_name
|
||||
|
||||
@staticmethod
|
||||
def _get_well_known(server_name: str) -> str | None:
|
||||
def _get_well_known(server_name: str, verify_tls: bool = True) -> str | None:
|
||||
if ":" in server_name:
|
||||
# explicit port, or ipv6 literal. Either way, no .well-known
|
||||
return None
|
||||
@@ -379,7 +405,7 @@ class MatrixConnectionAdapter(HTTPAdapter):
|
||||
print(f"fetching {uri}", file=sys.stderr)
|
||||
|
||||
try:
|
||||
resp = requests.get(uri)
|
||||
resp = requests.get(uri, verify=verify_tls)
|
||||
if resp.status_code != 200:
|
||||
print("%s gave %i" % (uri, resp.status_code), file=sys.stderr)
|
||||
return None
|
||||
|
||||
@@ -36,6 +36,7 @@ from typing import (
|
||||
Awaitable,
|
||||
Callable,
|
||||
NoReturn,
|
||||
Optional,
|
||||
cast,
|
||||
)
|
||||
from wsgiref.simple_server import WSGIServer
|
||||
@@ -455,7 +456,7 @@ def listen_http(
|
||||
root_resource: Resource,
|
||||
version_string: str,
|
||||
max_request_body_size: int,
|
||||
context_factory: IOpenSSLContextFactory | None,
|
||||
context_factory: Optional[IOpenSSLContextFactory],
|
||||
reactor: ISynapseReactor = reactor,
|
||||
) -> list[Port]:
|
||||
"""
|
||||
|
||||
@@ -24,7 +24,7 @@ import logging
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
from typing import Mapping, Sequence
|
||||
from typing import Mapping, Optional, Sequence
|
||||
|
||||
from twisted.internet import defer, task
|
||||
|
||||
@@ -291,7 +291,7 @@ def load_config(argv_options: list[str]) -> tuple[HomeServerConfig, argparse.Nam
|
||||
|
||||
def create_homeserver(
|
||||
config: HomeServerConfig,
|
||||
reactor: ISynapseReactor | None = None,
|
||||
reactor: Optional[ISynapseReactor] = None,
|
||||
) -> AdminCmdServer:
|
||||
"""
|
||||
Create a homeserver instance for the Synapse admin command process.
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
#
|
||||
import logging
|
||||
import sys
|
||||
from typing import Optional
|
||||
|
||||
from twisted.web.resource import Resource
|
||||
|
||||
@@ -335,7 +336,7 @@ def load_config(argv_options: list[str]) -> HomeServerConfig:
|
||||
|
||||
def create_homeserver(
|
||||
config: HomeServerConfig,
|
||||
reactor: ISynapseReactor | None = None,
|
||||
reactor: Optional[ISynapseReactor] = None,
|
||||
) -> GenericWorkerServer:
|
||||
"""
|
||||
Create a homeserver instance for the Synapse worker process.
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from typing import Iterable
|
||||
from typing import Iterable, Optional
|
||||
|
||||
from twisted.internet.tcp import Port
|
||||
from twisted.web.resource import EncodingResourceWrapper, Resource
|
||||
@@ -350,7 +350,7 @@ def load_or_generate_config(argv_options: list[str]) -> HomeServerConfig:
|
||||
|
||||
def create_homeserver(
|
||||
config: HomeServerConfig,
|
||||
reactor: ISynapseReactor | None = None,
|
||||
reactor: Optional[ISynapseReactor] = None,
|
||||
) -> SynapseHomeServer:
|
||||
"""
|
||||
Create a homeserver instance for the Synapse main process.
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
#
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
from twisted.internet.interfaces import IDelayedCall
|
||||
|
||||
@@ -74,7 +74,7 @@ class DelayedEventsHandler:
|
||||
cfg=self._config.ratelimiting.rc_delayed_event_mgmt,
|
||||
)
|
||||
|
||||
self._next_delayed_event_call: IDelayedCall | None = None
|
||||
self._next_delayed_event_call: Optional[IDelayedCall] = None
|
||||
|
||||
# The current position in the current_state_delta stream
|
||||
self._event_pos: int | None = None
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
import logging
|
||||
import random
|
||||
from http import HTTPStatus
|
||||
from typing import TYPE_CHECKING, Any, Mapping, Sequence
|
||||
from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence
|
||||
|
||||
from canonicaljson import encode_canonical_json
|
||||
|
||||
@@ -111,7 +111,7 @@ class MessageHandler:
|
||||
|
||||
# The scheduled call to self._expire_event. None if no call is currently
|
||||
# scheduled.
|
||||
self._scheduled_expiry: IDelayedCall | None = None
|
||||
self._scheduled_expiry: Optional[IDelayedCall] = None
|
||||
|
||||
if not hs.config.worker.worker_app:
|
||||
self.hs.run_as_background_process(
|
||||
|
||||
@@ -17,6 +17,7 @@ import logging
|
||||
from itertools import chain
|
||||
from typing import TYPE_CHECKING, AbstractSet, Mapping
|
||||
|
||||
import attr
|
||||
from prometheus_client import Histogram
|
||||
from typing_extensions import assert_never
|
||||
|
||||
@@ -62,6 +63,7 @@ from synapse.types.handlers.sliding_sync import (
|
||||
HaveSentRoomFlag,
|
||||
MutablePerConnectionState,
|
||||
PerConnectionState,
|
||||
RoomLazyMembershipChanges,
|
||||
RoomSyncConfig,
|
||||
SlidingSyncConfig,
|
||||
SlidingSyncResult,
|
||||
@@ -106,7 +108,7 @@ class SlidingSyncHandler:
|
||||
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
|
||||
self.connection_store = SlidingSyncConnectionStore(self.store)
|
||||
self.connection_store = SlidingSyncConnectionStore(self.clock, self.store)
|
||||
self.extensions = SlidingSyncExtensionHandler(hs)
|
||||
self.room_lists = SlidingSyncRoomLists(hs)
|
||||
|
||||
@@ -981,14 +983,15 @@ class SlidingSyncHandler:
|
||||
#
|
||||
# Calculate the `StateFilter` based on the `required_state` for the room
|
||||
required_state_filter = StateFilter.none()
|
||||
# The requested `required_state_map` with the lazy membership expanded and
|
||||
# `$ME` replaced with the user's ID. This allows us to see what membership we've
|
||||
# sent down to the client in the next request.
|
||||
#
|
||||
# Make a copy so we can modify it. Still need to be careful to make a copy of
|
||||
# the state key sets if we want to add/remove from them. We could make a deep
|
||||
# copy but this saves us some work.
|
||||
expanded_required_state_map = dict(room_sync_config.required_state_map)
|
||||
|
||||
# Keep track of which users' state we may need to fetch. We split this
|
||||
# into explicit users and lazy loaded users.
|
||||
explicit_user_state = set()
|
||||
lazy_load_user_ids = set()
|
||||
|
||||
# Whether lazy-loading of room members is enabled.
|
||||
lazy_load_room_members = False
|
||||
|
||||
if room_membership_for_user_at_to_token.membership not in (
|
||||
Membership.INVITE,
|
||||
Membership.KNOCK,
|
||||
@@ -1036,7 +1039,6 @@ class SlidingSyncHandler:
|
||||
else:
|
||||
required_state_types: list[tuple[str, str | None]] = []
|
||||
num_wild_state_keys = 0
|
||||
lazy_load_room_members = False
|
||||
num_others = 0
|
||||
for (
|
||||
state_type,
|
||||
@@ -1068,43 +1070,60 @@ class SlidingSyncHandler:
|
||||
timeline_event.state_key
|
||||
)
|
||||
|
||||
# The client needs to know the membership of everyone in
|
||||
# the timeline we're returning.
|
||||
lazy_load_user_ids.update(timeline_membership)
|
||||
|
||||
# Update the required state filter so we pick up the new
|
||||
# membership
|
||||
for user_id in timeline_membership:
|
||||
required_state_types.append(
|
||||
(EventTypes.Member, user_id)
|
||||
)
|
||||
if limited or initial:
|
||||
# If the timeline is limited, we only need to
|
||||
# return the membership changes for people in
|
||||
# the timeline.
|
||||
for user_id in timeline_membership:
|
||||
required_state_types.append(
|
||||
(EventTypes.Member, user_id)
|
||||
)
|
||||
else:
|
||||
# For non-limited timelines we always return all
|
||||
# membership changes. This is so that clients
|
||||
# who have fetched the full membership list
|
||||
# already can continue to maintain it for
|
||||
# non-limited syncs.
|
||||
#
|
||||
# This assumes that for non-limited syncs there
|
||||
# won't be many membership changes that wouldn't
|
||||
# have been included already (this can only
|
||||
# happen if membership state was rolled back due
|
||||
# to state resolution anyway).
|
||||
#
|
||||
# `None` is a wildcard in the `StateFilter`
|
||||
required_state_types.append((EventTypes.Member, None))
|
||||
|
||||
# Add an explicit entry for each user in the timeline
|
||||
#
|
||||
# Make a new set or copy of the state key set so we can
|
||||
# modify it without affecting the original
|
||||
# `required_state_map`
|
||||
expanded_required_state_map[EventTypes.Member] = (
|
||||
expanded_required_state_map.get(
|
||||
EventTypes.Member, set()
|
||||
# Record the extra members we're returning.
|
||||
lazy_load_user_ids.update(
|
||||
state_key
|
||||
for event_type, state_key in room_state_delta_id_map
|
||||
if event_type == EventTypes.Member
|
||||
)
|
||||
| timeline_membership
|
||||
)
|
||||
elif state_key == StateValues.ME:
|
||||
else:
|
||||
num_others += 1
|
||||
required_state_types.append((state_type, user.to_string()))
|
||||
|
||||
# Replace `$ME` with the user's ID so we can deduplicate
|
||||
# when someone requests the same state with `$ME` or with
|
||||
# their user ID.
|
||||
#
|
||||
# Make a new set or copy of the state key set so we can
|
||||
# modify it without affecting the original
|
||||
# `required_state_map`
|
||||
expanded_required_state_map[EventTypes.Member] = (
|
||||
expanded_required_state_map.get(
|
||||
EventTypes.Member, set()
|
||||
)
|
||||
| {user.to_string()}
|
||||
normalized_state_key = state_key
|
||||
if state_key == StateValues.ME:
|
||||
normalized_state_key = user.to_string()
|
||||
|
||||
if state_type == EventTypes.Member:
|
||||
# Also track explicitly requested member state for
|
||||
# lazy membership tracking.
|
||||
explicit_user_state.add(normalized_state_key)
|
||||
|
||||
required_state_types.append(
|
||||
(state_type, normalized_state_key)
|
||||
)
|
||||
else:
|
||||
num_others += 1
|
||||
required_state_types.append((state_type, state_key))
|
||||
|
||||
set_tag(
|
||||
SynapseTags.FUNC_ARG_PREFIX
|
||||
@@ -1122,6 +1141,10 @@ class SlidingSyncHandler:
|
||||
|
||||
required_state_filter = StateFilter.from_types(required_state_types)
|
||||
|
||||
# Remove any explicitly requested user state from the lazy-loaded set,
|
||||
# as we track them separately.
|
||||
lazy_load_user_ids -= explicit_user_state
|
||||
|
||||
# We need this base set of info for the response so let's just fetch it along
|
||||
# with the `required_state` for the room
|
||||
hero_room_state = [
|
||||
@@ -1149,6 +1172,22 @@ class SlidingSyncHandler:
|
||||
# We can return all of the state that was requested if this was the first
|
||||
# time we've sent the room down this connection.
|
||||
room_state: StateMap[EventBase] = {}
|
||||
|
||||
# Includes the state for the heroes if we need them (may contain other
|
||||
# state as well).
|
||||
hero_membership_state: StateMap[EventBase] = {}
|
||||
|
||||
# By default, we mark all `lazy_load_user_ids` as being sent down
|
||||
# for the first time in this sync. We later check if we sent any of them
|
||||
# down previously and update `returned_user_id_to_last_seen_ts_map` if
|
||||
# we have.
|
||||
returned_user_id_to_last_seen_ts_map = {}
|
||||
if lazy_load_room_members:
|
||||
returned_user_id_to_last_seen_ts_map = dict.fromkeys(lazy_load_user_ids)
|
||||
new_connection_state.room_lazy_membership[room_id] = RoomLazyMembershipChanges(
|
||||
returned_user_id_to_last_seen_ts_map=returned_user_id_to_last_seen_ts_map
|
||||
)
|
||||
|
||||
if initial:
|
||||
room_state = await self.get_current_state_at(
|
||||
room_id=room_id,
|
||||
@@ -1156,28 +1195,97 @@ class SlidingSyncHandler:
|
||||
state_filter=state_filter,
|
||||
to_token=to_token,
|
||||
)
|
||||
|
||||
# The `room_state` includes the hero membership state if needed.
|
||||
# We'll later filter this down so we don't need to do so here.
|
||||
hero_membership_state = room_state
|
||||
else:
|
||||
assert from_token is not None
|
||||
assert from_bound is not None
|
||||
|
||||
if prev_room_sync_config is not None:
|
||||
# Define `all_required_user_state` as all user state we want, which
|
||||
# is the explicitly requested members, any needed for lazy
|
||||
# loading, and users whose membership has changed.
|
||||
all_required_user_state = explicit_user_state | lazy_load_user_ids
|
||||
for state_type, state_key in room_state_delta_id_map:
|
||||
if state_type == EventTypes.Member:
|
||||
all_required_user_state.add(state_key)
|
||||
|
||||
# We need to know what user state we previously sent down the
|
||||
# connection so we can determine what has changed.
|
||||
#
|
||||
# We need to fetch all users whose memberships we may want
|
||||
# to send down this sync. This includes (and matches
|
||||
# `all_required_user_state`):
|
||||
# 1. Explicitly requested user state
|
||||
# 2. Lazy loaded members, i.e. users who appear in the
|
||||
# timeline.
|
||||
# 3. The users whose membership has changed in the room, i.e.
|
||||
# in the state deltas.
|
||||
#
|
||||
# This is to correctly handle the cases where a user was
|
||||
# previously sent down as a lazy loaded member:
|
||||
# - and is now explicitly requested (so shouldn't be sent down
|
||||
# again); or
|
||||
# - their membership has changed (so we need to invalidate
|
||||
# their entry in the lazy loaded table if we don't send the
|
||||
# change down).
|
||||
if all_required_user_state:
|
||||
previously_returned_user_to_last_seen = (
|
||||
await self.store.get_sliding_sync_connection_lazy_members(
|
||||
connection_position=from_token.connection_position,
|
||||
room_id=room_id,
|
||||
user_ids=all_required_user_state,
|
||||
)
|
||||
)
|
||||
|
||||
# Update the room lazy membership changes to track which
|
||||
# lazy loaded members were needed for this sync. This is so
|
||||
# that we can correctly track the last time we sent down
|
||||
# users' membership (and so can evict old membership state
|
||||
# from the DB tables).
|
||||
returned_user_id_to_last_seen_ts_map.update(
|
||||
(user_id, timestamp)
|
||||
for user_id, timestamp in previously_returned_user_to_last_seen.items()
|
||||
if user_id in lazy_load_user_ids
|
||||
)
|
||||
else:
|
||||
previously_returned_user_to_last_seen = {}
|
||||
|
||||
# Check if there are any changes to the required state config
|
||||
# that we need to handle.
|
||||
changed_required_state_map, added_state_filter = (
|
||||
_required_state_changes(
|
||||
user.to_string(),
|
||||
prev_required_state_map=prev_room_sync_config.required_state_map,
|
||||
request_required_state_map=expanded_required_state_map,
|
||||
state_deltas=room_state_delta_id_map,
|
||||
)
|
||||
changes_return = _required_state_changes(
|
||||
user.to_string(),
|
||||
prev_required_state_map=prev_room_sync_config.required_state_map,
|
||||
request_required_state_map=room_sync_config.required_state_map,
|
||||
previously_returned_lazy_user_ids=previously_returned_user_to_last_seen.keys(),
|
||||
request_lazy_load_user_ids=lazy_load_user_ids,
|
||||
state_deltas=room_state_delta_id_map,
|
||||
)
|
||||
changed_required_state_map = changes_return.changed_required_state_map
|
||||
|
||||
if added_state_filter:
|
||||
new_connection_state.room_lazy_membership[
|
||||
room_id
|
||||
].invalidated_user_ids = changes_return.lazy_members_invalidated
|
||||
|
||||
# Add any previously returned explicit memberships to the lazy
|
||||
# loaded table. This happens when a client requested explicit
|
||||
# members and then converted them to lazy loading.
|
||||
for user_id in changes_return.extra_users_to_add_to_lazy_cache:
|
||||
# We don't know the right timestamp to use here, as we don't
|
||||
# know the last time we would have sent the membership down.
|
||||
# So we don't overwrite it if we have a timestamp already,
|
||||
# and fallback to `None` (which means now) if we don't.
|
||||
returned_user_id_to_last_seen_ts_map.setdefault(user_id, None)
|
||||
|
||||
if changes_return.added_state_filter:
|
||||
# Some state entries got added, so we pull out the current
|
||||
# state for them. If we don't do this we'd only send down new deltas.
|
||||
state_ids = await self.get_current_state_ids_at(
|
||||
room_id=room_id,
|
||||
room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
|
||||
state_filter=added_state_filter,
|
||||
state_filter=changes_return.added_state_filter,
|
||||
to_token=to_token,
|
||||
)
|
||||
room_state_delta_id_map.update(state_ids)
|
||||
@@ -1189,6 +1297,7 @@ class SlidingSyncHandler:
|
||||
|
||||
# If the membership changed and we have to get heroes, get the remaining
|
||||
# heroes from the state
|
||||
hero_membership_state = {}
|
||||
if hero_user_ids:
|
||||
hero_membership_state = await self.get_current_state_at(
|
||||
room_id=room_id,
|
||||
@@ -1196,7 +1305,6 @@ class SlidingSyncHandler:
|
||||
state_filter=StateFilter.from_types(hero_room_state),
|
||||
to_token=to_token,
|
||||
)
|
||||
room_state.update(hero_membership_state)
|
||||
|
||||
required_room_state: StateMap[EventBase] = {}
|
||||
if required_state_filter != StateFilter.none():
|
||||
@@ -1219,7 +1327,7 @@ class SlidingSyncHandler:
|
||||
# Assemble heroes: extract the info from the state we just fetched
|
||||
heroes: list[SlidingSyncResult.RoomResult.StrippedHero] = []
|
||||
for hero_user_id in hero_user_ids:
|
||||
member_event = room_state.get((EventTypes.Member, hero_user_id))
|
||||
member_event = hero_membership_state.get((EventTypes.Member, hero_user_id))
|
||||
if member_event is not None:
|
||||
heroes.append(
|
||||
SlidingSyncResult.RoomResult.StrippedHero(
|
||||
@@ -1281,7 +1389,7 @@ class SlidingSyncHandler:
|
||||
bump_stamp = 0
|
||||
|
||||
room_sync_required_state_map_to_persist: Mapping[str, AbstractSet[str]] = (
|
||||
expanded_required_state_map
|
||||
room_sync_config.required_state_map
|
||||
)
|
||||
if changed_required_state_map:
|
||||
room_sync_required_state_map_to_persist = changed_required_state_map
|
||||
@@ -1471,13 +1579,37 @@ class SlidingSyncHandler:
|
||||
return None
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True)
|
||||
class _RequiredStateChangesReturn:
|
||||
"""Return type for _required_state_changes."""
|
||||
|
||||
changed_required_state_map: Mapping[str, AbstractSet[str]] | None
|
||||
"""The updated required state map to store in the room config, or None if
|
||||
there is no change."""
|
||||
|
||||
added_state_filter: StateFilter
|
||||
"""The state filter to use to fetch any additional current state that needs
|
||||
to be returned to the client."""
|
||||
|
||||
extra_users_to_add_to_lazy_cache: AbstractSet[str] = frozenset()
|
||||
"""The set of user IDs we should add to the lazy members cache that we had
|
||||
previously returned. Handles the case where a user was previously sent down
|
||||
explicitly but is now being lazy loaded."""
|
||||
|
||||
lazy_members_invalidated: AbstractSet[str] = frozenset()
|
||||
"""The set of user IDs whose membership has changed but we didn't send down,
|
||||
so we need to invalidate them from the cache."""
|
||||
|
||||
|
||||
def _required_state_changes(
|
||||
user_id: str,
|
||||
*,
|
||||
prev_required_state_map: Mapping[str, AbstractSet[str]],
|
||||
request_required_state_map: Mapping[str, AbstractSet[str]],
|
||||
previously_returned_lazy_user_ids: AbstractSet[str],
|
||||
request_lazy_load_user_ids: AbstractSet[str],
|
||||
state_deltas: StateMap[str],
|
||||
) -> tuple[Mapping[str, AbstractSet[str]] | None, StateFilter]:
|
||||
) -> _RequiredStateChangesReturn:
|
||||
"""Calculates the changes between the required state room config from the
|
||||
previous requests compared with the current request.
|
||||
|
||||
@@ -1491,14 +1623,62 @@ def _required_state_changes(
|
||||
added, removed and then added again to the required state. In that case we
|
||||
only want to re-send that entry down sync if it has changed.
|
||||
|
||||
Returns:
|
||||
A 2-tuple of updated required state config (or None if there is no update)
|
||||
and the state filter to use to fetch extra current state that we need to
|
||||
return.
|
||||
Args:
|
||||
user_id: The user ID of the user making the request.
|
||||
prev_required_state_map: The required state map from the previous
|
||||
request.
|
||||
request_required_state_map: The required state map from the current
|
||||
request.
|
||||
previously_returned_lazy_user_ids: The set of user IDs whose membership
|
||||
we have previously returned to the client due to lazy loading. This
|
||||
is filtered to only include users who have either sent events in the
|
||||
`timeline`, `required_state` or whose membership changed.
|
||||
request_lazy_load_user_ids: The set of user IDs whose lazy-loaded
|
||||
membership is required for this request.
|
||||
state_deltas: The state deltas in the room in the request token range,
|
||||
considering user membership. See `get_current_state_deltas_for_room`
|
||||
for more details.
|
||||
"""
|
||||
|
||||
# First we find any lazy members that have been invalidated due to state
|
||||
# changes that we are not sending down.
|
||||
lazy_members_invalidated = set()
|
||||
for event_type, state_key in state_deltas:
|
||||
if event_type != EventTypes.Member:
|
||||
continue
|
||||
|
||||
if state_key in request_lazy_load_user_ids:
|
||||
# Because it's part of the `request_lazy_load_user_ids`, we're going to
|
||||
# send this member change down.
|
||||
continue
|
||||
|
||||
if state_key not in previously_returned_lazy_user_ids:
|
||||
# We've not previously returned this member so nothing to
|
||||
# invalidate.
|
||||
continue
|
||||
|
||||
lazy_members_invalidated.add(state_key)
|
||||
|
||||
if prev_required_state_map == request_required_state_map:
|
||||
# There has been no change. Return immediately.
|
||||
return None, StateFilter.none()
|
||||
# There has been no change in state, just need to check lazy members.
|
||||
newly_returned_lazy_members = (
|
||||
request_lazy_load_user_ids - previously_returned_lazy_user_ids
|
||||
)
|
||||
if newly_returned_lazy_members:
|
||||
# There are some new lazy members we need to fetch.
|
||||
added_types: list[tuple[str, str | None]] = []
|
||||
for new_user_id in newly_returned_lazy_members:
|
||||
added_types.append((EventTypes.Member, new_user_id))
|
||||
|
||||
added_state_filter = StateFilter.from_types(added_types)
|
||||
else:
|
||||
added_state_filter = StateFilter.none()
|
||||
|
||||
return _RequiredStateChangesReturn(
|
||||
changed_required_state_map=None,
|
||||
added_state_filter=added_state_filter,
|
||||
lazy_members_invalidated=lazy_members_invalidated,
|
||||
)
|
||||
|
||||
prev_wildcard = prev_required_state_map.get(StateValues.WILDCARD, set())
|
||||
request_wildcard = request_required_state_map.get(StateValues.WILDCARD, set())
|
||||
@@ -1508,17 +1688,29 @@ def _required_state_changes(
|
||||
# already fetching everything, we don't have to fetch anything now that they've
|
||||
# narrowed.
|
||||
if StateValues.WILDCARD in prev_wildcard:
|
||||
return request_required_state_map, StateFilter.none()
|
||||
return _RequiredStateChangesReturn(
|
||||
changed_required_state_map=request_required_state_map,
|
||||
added_state_filter=StateFilter.none(),
|
||||
lazy_members_invalidated=lazy_members_invalidated,
|
||||
)
|
||||
|
||||
# If a event type wildcard has been added or removed we don't try and do
|
||||
# anything fancy, and instead always update the effective room required
|
||||
# state config to match the request.
|
||||
if request_wildcard - prev_wildcard:
|
||||
# Some keys were added, so we need to fetch everything
|
||||
return request_required_state_map, StateFilter.all()
|
||||
return _RequiredStateChangesReturn(
|
||||
changed_required_state_map=request_required_state_map,
|
||||
added_state_filter=StateFilter.all(),
|
||||
lazy_members_invalidated=lazy_members_invalidated,
|
||||
)
|
||||
if prev_wildcard - request_wildcard:
|
||||
# Keys were only removed, so we don't have to fetch everything.
|
||||
return request_required_state_map, StateFilter.none()
|
||||
return _RequiredStateChangesReturn(
|
||||
changed_required_state_map=request_required_state_map,
|
||||
added_state_filter=StateFilter.none(),
|
||||
lazy_members_invalidated=lazy_members_invalidated,
|
||||
)
|
||||
|
||||
# Contains updates to the required state map compared with the previous room
|
||||
# config. This has the same format as `RoomSyncConfig.required_state`
|
||||
@@ -1550,6 +1742,17 @@ def _required_state_changes(
|
||||
# Nothing *added*, so we skip. Removals happen below.
|
||||
continue
|
||||
|
||||
# Handle the special case of adding `$LAZY` membership, where we want to
|
||||
# always record the change to be lazy loading, as we immediately start
|
||||
# using the lazy loading tables so there is no point *not* recording the
|
||||
# change to lazy load in the effective room config.
|
||||
if event_type == EventTypes.Member:
|
||||
old_state_key_lazy = StateValues.LAZY in old_state_keys
|
||||
request_state_key_lazy = StateValues.LAZY in request_state_keys
|
||||
if not old_state_key_lazy and request_state_key_lazy:
|
||||
changes[event_type] = request_state_keys
|
||||
continue
|
||||
|
||||
# We only remove state keys from the effective state if they've been
|
||||
# removed from the request *and* the state has changed. This ensures
|
||||
# that if a client removes and then re-adds a state key, we only send
|
||||
@@ -1620,9 +1823,31 @@ def _required_state_changes(
|
||||
# LAZY values should also be ignore for event types that are
|
||||
# not membership.
|
||||
pass
|
||||
elif event_type == EventTypes.Member:
|
||||
if state_key not in previously_returned_lazy_user_ids:
|
||||
# Only add *explicit* members we haven't previously sent
|
||||
# down.
|
||||
added.append((event_type, state_key))
|
||||
else:
|
||||
added.append((event_type, state_key))
|
||||
|
||||
previously_required_state_members = set(
|
||||
prev_required_state_map.get(EventTypes.Member, ())
|
||||
)
|
||||
if StateValues.ME in previously_required_state_members:
|
||||
previously_required_state_members.add(user_id)
|
||||
|
||||
# We also need to pull out any lazy members that are now required but
|
||||
# haven't previously been returned.
|
||||
for required_user_id in (
|
||||
request_lazy_load_user_ids
|
||||
# Remove previously returned users
|
||||
- previously_returned_lazy_user_ids
|
||||
# Exclude previously explicitly requested members.
|
||||
- previously_required_state_members
|
||||
):
|
||||
added.append((EventTypes.Member, required_user_id))
|
||||
|
||||
added_state_filter = StateFilter.from_types(added)
|
||||
|
||||
# Figure out what changes we need to apply to the effective required state
|
||||
@@ -1663,13 +1888,25 @@ def _required_state_changes(
|
||||
changes[event_type] = request_state_keys
|
||||
continue
|
||||
|
||||
# When handling $LAZY membership, we want to either a) not update the
|
||||
# state or b) update it to match the request. This is to avoid churn of
|
||||
# the effective required state for rooms (we deduplicate required state
|
||||
# between rooms), and because we can store the previously returned
|
||||
# explicit memberships with the lazy loaded memberships.
|
||||
if event_type == EventTypes.Member:
|
||||
old_state_key_lazy = StateValues.LAZY in old_state_keys
|
||||
request_state_key_lazy = StateValues.LAZY in request_state_keys
|
||||
has_lazy = old_state_key_lazy or request_state_key_lazy
|
||||
|
||||
# If a "$LAZY" has been added or removed we always update to match
|
||||
# the request.
|
||||
if old_state_key_lazy != request_state_key_lazy:
|
||||
# If a "$LAZY" has been added or removed we always update the effective room
|
||||
# required state config to match the request.
|
||||
changes[event_type] = request_state_keys
|
||||
continue
|
||||
|
||||
# Or if we have lazy membership and there are invalidated
|
||||
# explicit memberships.
|
||||
if has_lazy and invalidated_state_keys:
|
||||
changes[event_type] = request_state_keys
|
||||
continue
|
||||
|
||||
@@ -1684,6 +1921,28 @@ def _required_state_changes(
|
||||
if invalidated_state_keys:
|
||||
changes[event_type] = old_state_keys - invalidated_state_keys
|
||||
|
||||
# Check for any explicit membership changes that were removed that we can
|
||||
# add to the lazy members previously returned. This is so that we don't
|
||||
# return a user due to lazy loading if they were previously returned as an
|
||||
# explicit membership.
|
||||
users_to_add_to_lazy_cache: set[str] = set()
|
||||
|
||||
membership_changes = changes.get(EventTypes.Member, set())
|
||||
if membership_changes and StateValues.LAZY in request_state_keys:
|
||||
for state_key in prev_required_state_map.get(EventTypes.Member, set()):
|
||||
if state_key == StateValues.WILDCARD or state_key == StateValues.LAZY:
|
||||
# Ignore non-user IDs.
|
||||
continue
|
||||
|
||||
if state_key == StateValues.ME:
|
||||
# Normalize to proper user ID
|
||||
state_key = user_id
|
||||
|
||||
# We remember the user if they haven't been invalidated
|
||||
if (EventTypes.Member, state_key) not in state_deltas:
|
||||
users_to_add_to_lazy_cache.add(state_key)
|
||||
|
||||
new_required_state_map = None
|
||||
if changes:
|
||||
# Update the required state config based on the changes.
|
||||
new_required_state_map = dict(prev_required_state_map)
|
||||
@@ -1694,6 +1953,9 @@ def _required_state_changes(
|
||||
# Remove entries with empty state keys.
|
||||
new_required_state_map.pop(event_type, None)
|
||||
|
||||
return new_required_state_map, added_state_filter
|
||||
else:
|
||||
return None, added_state_filter
|
||||
return _RequiredStateChangesReturn(
|
||||
changed_required_state_map=new_required_state_map,
|
||||
added_state_filter=added_state_filter,
|
||||
lazy_members_invalidated=lazy_members_invalidated,
|
||||
extra_users_to_add_to_lazy_cache=users_to_add_to_lazy_cache,
|
||||
)
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
#
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import attr
|
||||
|
||||
@@ -25,9 +24,7 @@ from synapse.types.handlers.sliding_sync import (
|
||||
PerConnectionState,
|
||||
SlidingSyncConfig,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
from synapse.util.clock import Clock
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -61,7 +58,8 @@ class SlidingSyncConnectionStore:
|
||||
to mapping of room ID to `HaveSentRoom`.
|
||||
"""
|
||||
|
||||
store: "DataStore"
|
||||
clock: Clock
|
||||
store: DataStore
|
||||
|
||||
async def get_and_clear_connection_positions(
|
||||
self,
|
||||
@@ -101,7 +99,7 @@ class SlidingSyncConnectionStore:
|
||||
If there are no changes to the state this may return the same token as
|
||||
the existing per-connection state.
|
||||
"""
|
||||
if not new_connection_state.has_updates():
|
||||
if not new_connection_state.has_updates(self.clock):
|
||||
if from_token is not None:
|
||||
return from_token.connection_position
|
||||
else:
|
||||
|
||||
@@ -21,7 +21,7 @@
|
||||
|
||||
import logging
|
||||
from http import HTTPStatus
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
from twisted.internet.interfaces import IDelayedCall
|
||||
|
||||
@@ -125,7 +125,7 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||
# Guard to ensure we only have one process for refreshing remote profiles
|
||||
self._is_refreshing_remote_profiles = False
|
||||
# Handle to cancel the `call_later` of `kick_off_remote_profile_refresh_process`
|
||||
self._refresh_remote_profiles_call_later: IDelayedCall | None = None
|
||||
self._refresh_remote_profiles_call_later: Optional[IDelayedCall] = None
|
||||
|
||||
# Guard to ensure we only have one process for refreshing remote profiles
|
||||
# for the given servers.
|
||||
|
||||
@@ -28,6 +28,7 @@ from typing import (
|
||||
BinaryIO,
|
||||
Callable,
|
||||
Mapping,
|
||||
Optional,
|
||||
Protocol,
|
||||
)
|
||||
|
||||
@@ -313,7 +314,7 @@ class BlocklistingAgentWrapper(Agent):
|
||||
method: bytes,
|
||||
uri: bytes,
|
||||
headers: Headers | None = None,
|
||||
bodyProducer: IBodyProducer | None = None,
|
||||
bodyProducer: Optional[IBodyProducer] = None,
|
||||
) -> defer.Deferred:
|
||||
h = urllib.parse.urlparse(uri.decode("ascii"))
|
||||
|
||||
@@ -1033,7 +1034,7 @@ class BodyExceededMaxSize(Exception):
|
||||
class _DiscardBodyWithMaxSizeProtocol(protocol.Protocol):
|
||||
"""A protocol which immediately errors upon receiving data."""
|
||||
|
||||
transport: ITCPTransport | None = None
|
||||
transport: Optional[ITCPTransport] = None
|
||||
|
||||
def __init__(self, deferred: defer.Deferred):
|
||||
self.deferred = deferred
|
||||
@@ -1075,7 +1076,7 @@ class _MultipartParserProtocol(protocol.Protocol):
|
||||
Protocol to read and parse a MSC3916 multipart/mixed response
|
||||
"""
|
||||
|
||||
transport: ITCPTransport | None = None
|
||||
transport: Optional[ITCPTransport] = None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -1188,7 +1189,7 @@ class _MultipartParserProtocol(protocol.Protocol):
|
||||
class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
|
||||
"""A protocol which reads body to a stream, erroring if the body exceeds a maximum size."""
|
||||
|
||||
transport: ITCPTransport | None = None
|
||||
transport: Optional[ITCPTransport] = None
|
||||
|
||||
def __init__(
|
||||
self, stream: ByteWriteable, deferred: defer.Deferred, max_size: int | None
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
#
|
||||
import logging
|
||||
import urllib.parse
|
||||
from typing import Any, Generator
|
||||
from typing import Any, Generator, Optional
|
||||
from urllib.request import ( # type: ignore[attr-defined]
|
||||
proxy_bypass_environment,
|
||||
)
|
||||
@@ -173,7 +173,7 @@ class MatrixFederationAgent:
|
||||
method: bytes,
|
||||
uri: bytes,
|
||||
headers: Headers | None = None,
|
||||
bodyProducer: IBodyProducer | None = None,
|
||||
bodyProducer: Optional[IBodyProducer] = None,
|
||||
) -> Generator[defer.Deferred, Any, IResponse]:
|
||||
"""
|
||||
Args:
|
||||
|
||||
@@ -33,6 +33,7 @@ from typing import (
|
||||
Callable,
|
||||
Generic,
|
||||
Literal,
|
||||
Optional,
|
||||
TextIO,
|
||||
TypeVar,
|
||||
cast,
|
||||
@@ -691,7 +692,7 @@ class MatrixFederationHttpClient:
|
||||
destination_bytes, method_bytes, url_to_sign_bytes, json
|
||||
)
|
||||
data = encode_canonical_json(json)
|
||||
producer: IBodyProducer | None = QuieterFileBodyProducer(
|
||||
producer: Optional[IBodyProducer] = QuieterFileBodyProducer(
|
||||
BytesIO(data), cooperator=self._cooperator
|
||||
)
|
||||
else:
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
import json
|
||||
import logging
|
||||
import urllib.parse
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
from typing import TYPE_CHECKING, Any, Optional, cast
|
||||
|
||||
from twisted.internet import protocol
|
||||
from twisted.internet.interfaces import ITCPTransport
|
||||
@@ -237,7 +237,7 @@ class _ProxyResponseBody(protocol.Protocol):
|
||||
request.
|
||||
"""
|
||||
|
||||
transport: ITCPTransport | None = None
|
||||
transport: Optional[ITCPTransport] = None
|
||||
|
||||
def __init__(self, request: "SynapseRequest") -> None:
|
||||
self._request = request
|
||||
|
||||
@@ -21,7 +21,7 @@
|
||||
import logging
|
||||
import random
|
||||
import re
|
||||
from typing import Any, Collection, Sequence, cast
|
||||
from typing import Any, Collection, Optional, Sequence, cast
|
||||
from urllib.parse import urlparse
|
||||
from urllib.request import ( # type: ignore[attr-defined]
|
||||
proxy_bypass_environment,
|
||||
@@ -119,8 +119,8 @@ class ProxyAgent(_AgentBase):
|
||||
self,
|
||||
*,
|
||||
reactor: IReactorCore,
|
||||
proxy_reactor: IReactorCore | None = None,
|
||||
contextFactory: IPolicyForHTTPS | None = None,
|
||||
proxy_reactor: Optional[IReactorCore] = None,
|
||||
contextFactory: Optional[IPolicyForHTTPS] = None,
|
||||
connectTimeout: float | None = None,
|
||||
bindAddress: bytes | None = None,
|
||||
pool: HTTPConnectionPool | None = None,
|
||||
@@ -175,7 +175,7 @@ class ProxyAgent(_AgentBase):
|
||||
self._policy_for_https = contextFactory
|
||||
self._reactor = cast(IReactorTime, reactor)
|
||||
|
||||
self._federation_proxy_endpoint: IStreamClientEndpoint | None = None
|
||||
self._federation_proxy_endpoint: Optional[IStreamClientEndpoint] = None
|
||||
self._federation_proxy_credentials: ProxyCredentials | None = None
|
||||
if federation_proxy_locations:
|
||||
assert federation_proxy_credentials is not None, (
|
||||
@@ -221,7 +221,7 @@ class ProxyAgent(_AgentBase):
|
||||
method: bytes,
|
||||
uri: bytes,
|
||||
headers: Headers | None = None,
|
||||
bodyProducer: IBodyProducer | None = None,
|
||||
bodyProducer: Optional[IBodyProducer] = None,
|
||||
) -> "defer.Deferred[IResponse]":
|
||||
"""
|
||||
Issue a request to the server indicated by the given uri.
|
||||
@@ -365,11 +365,11 @@ class ProxyAgent(_AgentBase):
|
||||
def http_proxy_endpoint(
|
||||
proxy: bytes | None,
|
||||
reactor: IReactorCore,
|
||||
tls_options_factory: IPolicyForHTTPS | None,
|
||||
tls_options_factory: Optional[IPolicyForHTTPS],
|
||||
timeout: float = 30,
|
||||
bindAddress: bytes | str | tuple[bytes | str, int] | None = None,
|
||||
attemptDelay: float | None = None,
|
||||
) -> tuple[IStreamClientEndpoint | None, ProxyCredentials | None]:
|
||||
) -> tuple[Optional[IStreamClientEndpoint], ProxyCredentials | None]:
|
||||
"""Parses an http proxy setting and returns an endpoint for the proxy
|
||||
|
||||
Args:
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
#
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from zope.interface import implementer
|
||||
|
||||
@@ -149,7 +150,7 @@ class ReplicationAgent(_AgentBase):
|
||||
method: bytes,
|
||||
uri: bytes,
|
||||
headers: Headers | None = None,
|
||||
bodyProducer: IBodyProducer | None = None,
|
||||
bodyProducer: Optional[IBodyProducer] = None,
|
||||
) -> "defer.Deferred[IResponse]":
|
||||
"""
|
||||
Issue a request to the server indicated by the given uri.
|
||||
|
||||
@@ -25,7 +25,7 @@ import traceback
|
||||
from collections import deque
|
||||
from ipaddress import IPv4Address, IPv6Address, ip_address
|
||||
from math import floor
|
||||
from typing import Callable
|
||||
from typing import Callable, Optional
|
||||
|
||||
import attr
|
||||
from zope.interface import implementer
|
||||
@@ -113,7 +113,7 @@ class RemoteHandler(logging.Handler):
|
||||
port: int,
|
||||
maximum_buffer: int = 1000,
|
||||
level: int = logging.NOTSET,
|
||||
_reactor: IReactorTime | None = None,
|
||||
_reactor: Optional[IReactorTime] = None,
|
||||
):
|
||||
super().__init__(level=level)
|
||||
self.host = host
|
||||
|
||||
@@ -3,7 +3,7 @@ import time
|
||||
from logging import Handler, LogRecord
|
||||
from logging.handlers import MemoryHandler
|
||||
from threading import Thread
|
||||
from typing import cast
|
||||
from typing import Optional, cast
|
||||
|
||||
from twisted.internet.interfaces import IReactorCore
|
||||
|
||||
@@ -26,7 +26,7 @@ class PeriodicallyFlushingMemoryHandler(MemoryHandler):
|
||||
target: Handler | None = None,
|
||||
flushOnClose: bool = True,
|
||||
period: float = 5.0,
|
||||
reactor: IReactorCore | None = None,
|
||||
reactor: Optional[IReactorCore] = None,
|
||||
) -> None:
|
||||
"""
|
||||
period: the period between automatic flushes
|
||||
|
||||
@@ -30,6 +30,7 @@ from typing import (
|
||||
Awaitable,
|
||||
BinaryIO,
|
||||
Generator,
|
||||
Optional,
|
||||
)
|
||||
|
||||
import attr
|
||||
@@ -705,7 +706,7 @@ class ThreadedFileSender:
|
||||
|
||||
self.file: BinaryIO | None = None
|
||||
self.deferred: "Deferred[None]" = Deferred()
|
||||
self.consumer: interfaces.IConsumer | None = None
|
||||
self.consumer: Optional[IConsumer] = None
|
||||
|
||||
# Signals if the thread should keep reading/sending data. Set means
|
||||
# continue, clear means pause.
|
||||
|
||||
@@ -439,7 +439,11 @@ class MediaRepository:
|
||||
return await self.store.get_cached_remote_media(origin, media_id)
|
||||
|
||||
async def get_local_media_info(
|
||||
self, request: SynapseRequest, media_id: str, max_timeout_ms: int
|
||||
self,
|
||||
request: SynapseRequest,
|
||||
media_id: str,
|
||||
max_timeout_ms: int,
|
||||
bypass_quarantine: bool = False,
|
||||
) -> LocalMedia | None:
|
||||
"""Gets the info dictionary for given local media ID. If the media has
|
||||
not been uploaded yet, this function will wait up to ``max_timeout_ms``
|
||||
@@ -451,6 +455,7 @@ class MediaRepository:
|
||||
the file_id for local content.)
|
||||
max_timeout_ms: the maximum number of milliseconds to wait for the
|
||||
media to be uploaded.
|
||||
bypass_quarantine: whether to bypass quarantine checks
|
||||
|
||||
Returns:
|
||||
Either the info dictionary for the given local media ID or
|
||||
@@ -466,7 +471,7 @@ class MediaRepository:
|
||||
respond_404(request)
|
||||
return None
|
||||
|
||||
if media_info.quarantined_by:
|
||||
if media_info.quarantined_by and not bypass_quarantine:
|
||||
logger.info("Media %s is quarantined", media_id)
|
||||
respond_404(request)
|
||||
return None
|
||||
@@ -500,6 +505,7 @@ class MediaRepository:
|
||||
max_timeout_ms: int,
|
||||
allow_authenticated: bool = True,
|
||||
federation: bool = False,
|
||||
bypass_quarantine: bool = False,
|
||||
) -> None:
|
||||
"""Responds to requests for local media, if exists, or returns 404.
|
||||
|
||||
@@ -513,11 +519,14 @@ class MediaRepository:
|
||||
media to be uploaded.
|
||||
allow_authenticated: whether media marked as authenticated may be served to this request
|
||||
federation: whether the local media being fetched is for a federation request
|
||||
bypass_quarantine: whether to bypass quarantine checks
|
||||
|
||||
Returns:
|
||||
Resolves once a response has successfully been written to request
|
||||
"""
|
||||
media_info = await self.get_local_media_info(request, media_id, max_timeout_ms)
|
||||
media_info = await self.get_local_media_info(
|
||||
request, media_id, max_timeout_ms, bypass_quarantine=bypass_quarantine
|
||||
)
|
||||
if not media_info:
|
||||
return
|
||||
|
||||
@@ -561,6 +570,7 @@ class MediaRepository:
|
||||
ip_address: str,
|
||||
use_federation_endpoint: bool,
|
||||
allow_authenticated: bool = True,
|
||||
bypass_quarantine: bool = False,
|
||||
) -> None:
|
||||
"""Respond to requests for remote media.
|
||||
|
||||
@@ -577,6 +587,7 @@ class MediaRepository:
|
||||
federation `/download` endpoint
|
||||
allow_authenticated: whether media marked as authenticated may be served to this
|
||||
request
|
||||
bypass_quarantine: whether to bypass quarantine checks
|
||||
|
||||
Returns:
|
||||
Resolves once a response has successfully been written to request
|
||||
@@ -609,6 +620,7 @@ class MediaRepository:
|
||||
ip_address,
|
||||
use_federation_endpoint,
|
||||
allow_authenticated,
|
||||
bypass_quarantine=bypass_quarantine,
|
||||
)
|
||||
|
||||
# Check if the media is cached on the client, if so return 304. We need
|
||||
@@ -697,6 +709,7 @@ class MediaRepository:
|
||||
ip_address: str,
|
||||
use_federation_endpoint: bool,
|
||||
allow_authenticated: bool,
|
||||
bypass_quarantine: bool = False,
|
||||
) -> tuple[Responder | None, RemoteMedia]:
|
||||
"""Looks for media in local cache, if not there then attempt to
|
||||
download from remote server.
|
||||
@@ -712,6 +725,7 @@ class MediaRepository:
|
||||
ip_address: the IP address of the requester
|
||||
use_federation_endpoint: whether to request the remote media over the new federation
|
||||
/download endpoint
|
||||
bypass_quarantine: whether to bypass quarantine checks
|
||||
|
||||
Returns:
|
||||
A tuple of responder and the media info of the file.
|
||||
@@ -732,7 +746,7 @@ class MediaRepository:
|
||||
file_id = media_info.filesystem_id
|
||||
file_info = FileInfo(server_name, file_id)
|
||||
|
||||
if media_info.quarantined_by:
|
||||
if media_info.quarantined_by and not bypass_quarantine:
|
||||
logger.info("Media is quarantined")
|
||||
raise NotFoundError()
|
||||
|
||||
@@ -914,6 +928,7 @@ class MediaRepository:
|
||||
filesystem_id=file_id,
|
||||
last_access_ts=time_now_ms,
|
||||
quarantined_by=None,
|
||||
quarantined_ts=None,
|
||||
authenticated=authenticated,
|
||||
sha256=sha256writer.hexdigest(),
|
||||
)
|
||||
@@ -1047,6 +1062,7 @@ class MediaRepository:
|
||||
filesystem_id=file_id,
|
||||
last_access_ts=time_now_ms,
|
||||
quarantined_by=None,
|
||||
quarantined_ts=None,
|
||||
authenticated=authenticated,
|
||||
sha256=sha256writer.hexdigest(),
|
||||
)
|
||||
|
||||
@@ -331,10 +331,16 @@ class UrlPreviewer:
|
||||
# response failed or is incomplete.
|
||||
og_from_html = parse_html_to_open_graph(tree)
|
||||
|
||||
# Compile the Open Graph response by using the scraped
|
||||
# information from the HTML and overlaying any information
|
||||
# from the oEmbed response.
|
||||
og = {**og_from_html, **og_from_oembed}
|
||||
# Compile an Open Graph response by combining the oEmbed response
|
||||
# and the information from the HTML, with information in the HTML
|
||||
# preferred.
|
||||
#
|
||||
# The ordering here is intentional: certain websites (especially
|
||||
# SPA JavaScript-based ones) including Mastodon and YouTube provide
|
||||
# almost complete OpenGraph descriptions but only stubs for oEmbed,
|
||||
# with further oEmbed information being populated with JavaScript,
|
||||
# that Synapse won't execute.
|
||||
og = og_from_oembed | og_from_html
|
||||
|
||||
await self._precache_image_url(user, media_info, og)
|
||||
else:
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
#
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
|
||||
from twisted.internet.interfaces import IDelayedCall
|
||||
@@ -71,7 +71,7 @@ class EmailPusher(Pusher):
|
||||
self.server_name = hs.hostname
|
||||
self.store = self.hs.get_datastores().main
|
||||
self.email = pusher_config.pushkey
|
||||
self.timed_call: IDelayedCall | None = None
|
||||
self.timed_call: Optional[IDelayedCall] = None
|
||||
self.throttle_params: dict[str, ThrottleParams] = {}
|
||||
self._inited = False
|
||||
|
||||
|
||||
@@ -21,7 +21,7 @@
|
||||
import logging
|
||||
import random
|
||||
import urllib.parse
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
from prometheus_client import Counter
|
||||
|
||||
@@ -120,7 +120,7 @@ class HttpPusher(Pusher):
|
||||
self.data = pusher_config.data
|
||||
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
|
||||
self.failing_since = pusher_config.failing_since
|
||||
self.timed_call: IDelayedCall | None = None
|
||||
self.timed_call: Optional[IDelayedCall] = None
|
||||
self._is_processing = False
|
||||
self._group_unread_count_by_room = (
|
||||
hs.config.push.push_group_unread_count_by_room
|
||||
|
||||
@@ -114,10 +114,12 @@ from synapse.rest.admin.users import (
|
||||
UserByThreePid,
|
||||
UserInvitesCount,
|
||||
UserJoinedRoomCount,
|
||||
UserMembershipRestServlet,
|
||||
UserJoinedRoomsRestServlet,
|
||||
UserMembershipsRestServlet,
|
||||
UserRegisterServlet,
|
||||
UserReplaceMasterCrossSigningKeyRestServlet,
|
||||
UserRestServletV2,
|
||||
UserRestServletV2Get,
|
||||
UsersRestServletV2,
|
||||
UsersRestServletV3,
|
||||
UserTokenRestServlet,
|
||||
@@ -280,6 +282,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||
# matrix_authentication_service integration uses the dedicated MAS API.
|
||||
if hs.config.experimental.msc3861.enabled:
|
||||
register_servlets_for_msc3861_delegation(hs, http_server)
|
||||
else:
|
||||
UserRestServletV2Get(hs).register(http_server)
|
||||
|
||||
return
|
||||
|
||||
@@ -297,7 +301,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||
VersionServlet(hs).register(http_server)
|
||||
if not auth_delegated:
|
||||
UserAdminServlet(hs).register(http_server)
|
||||
UserMembershipRestServlet(hs).register(http_server)
|
||||
UserJoinedRoomsRestServlet(hs).register(http_server)
|
||||
UserMembershipsRestServlet(hs).register(http_server)
|
||||
if not auth_delegated:
|
||||
UserTokenRestServlet(hs).register(http_server)
|
||||
UserRestServletV2(hs).register(http_server)
|
||||
|
||||
@@ -293,6 +293,38 @@ class ListMediaInRoom(RestServlet):
|
||||
return HTTPStatus.OK, {"local": local_mxcs, "remote": remote_mxcs}
|
||||
|
||||
|
||||
class ListQuarantinedMedia(RestServlet):
|
||||
"""Lists all quarantined media on the server."""
|
||||
|
||||
PATTERNS = admin_patterns("/media/quarantined$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.store = hs.get_datastores().main
|
||||
self.auth = hs.get_auth()
|
||||
|
||||
async def on_GET(
|
||||
self,
|
||||
request: SynapseRequest,
|
||||
) -> tuple[int, JsonDict]:
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
start = parse_integer(request, "from", default=0)
|
||||
limit = parse_integer(request, "limit", default=100)
|
||||
local_or_remote = parse_string(request, "kind", required=True)
|
||||
|
||||
if local_or_remote not in ["local", "remote"]:
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Query parameter `kind` must be either 'local' or 'remote'.",
|
||||
)
|
||||
|
||||
mxcs = await self.store.get_quarantined_media_mxcs(
|
||||
start, limit, local_or_remote == "local"
|
||||
)
|
||||
|
||||
return HTTPStatus.OK, {"media": mxcs}
|
||||
|
||||
|
||||
class PurgeMediaCacheRestServlet(RestServlet):
|
||||
PATTERNS = admin_patterns("/purge_media_cache$")
|
||||
|
||||
@@ -532,6 +564,7 @@ def register_servlets_for_media_repo(hs: "HomeServer", http_server: HttpServer)
|
||||
ProtectMediaByID(hs).register(http_server)
|
||||
UnprotectMediaByID(hs).register(http_server)
|
||||
ListMediaInRoom(hs).register(http_server)
|
||||
ListQuarantinedMedia(hs).register(http_server)
|
||||
# XXX DeleteMediaByDateSize must be registered before DeleteMediaByID as
|
||||
# their URL routes overlap.
|
||||
DeleteMediaByDateSize(hs).register(http_server)
|
||||
|
||||
@@ -210,7 +210,7 @@ class UsersRestServletV3(UsersRestServletV2):
|
||||
return parse_boolean(request, "deactivated")
|
||||
|
||||
|
||||
class UserRestServletV2(RestServlet):
|
||||
class UserRestServletV2Get(RestServlet):
|
||||
PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)$", "v2")
|
||||
|
||||
"""Get request to list user details.
|
||||
@@ -220,22 +220,6 @@ class UserRestServletV2(RestServlet):
|
||||
|
||||
returns:
|
||||
200 OK with user details if success otherwise an error.
|
||||
|
||||
Put request to allow an administrator to add or modify a user.
|
||||
This needs user to have administrator access in Synapse.
|
||||
We use PUT instead of POST since we already know the id of the user
|
||||
object to create. POST could be used to create guests.
|
||||
|
||||
PUT /_synapse/admin/v2/users/<user_id>
|
||||
{
|
||||
"password": "secret",
|
||||
"displayname": "User"
|
||||
}
|
||||
|
||||
returns:
|
||||
201 OK with new user object if user was created or
|
||||
200 OK with modified user object if user was modified
|
||||
otherwise an error.
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
@@ -267,6 +251,28 @@ class UserRestServletV2(RestServlet):
|
||||
|
||||
return HTTPStatus.OK, user_info_dict
|
||||
|
||||
|
||||
class UserRestServletV2(UserRestServletV2Get):
|
||||
"""
|
||||
Put request to allow an administrator to add or modify a user.
|
||||
This needs user to have administrator access in Synapse.
|
||||
We use PUT instead of POST since we already know the id of the user
|
||||
object to create. POST could be used to create guests.
|
||||
|
||||
Note: This inherits from `UserRestServletV2Get`, so also supports the `GET` route.
|
||||
|
||||
PUT /_synapse/admin/v2/users/<user_id>
|
||||
{
|
||||
"password": "secret",
|
||||
"displayname": "User"
|
||||
}
|
||||
|
||||
returns:
|
||||
201 OK with new user object if user was created or
|
||||
200 OK with modified user object if user was modified
|
||||
otherwise an error.
|
||||
"""
|
||||
|
||||
async def on_PUT(
|
||||
self, request: SynapseRequest, user_id: str
|
||||
) -> tuple[int, JsonMapping]:
|
||||
@@ -1031,7 +1037,7 @@ class UserAdminServlet(RestServlet):
|
||||
return HTTPStatus.OK, {}
|
||||
|
||||
|
||||
class UserMembershipRestServlet(RestServlet):
|
||||
class UserJoinedRoomsRestServlet(RestServlet):
|
||||
"""
|
||||
Get list of joined room ID's for a user.
|
||||
"""
|
||||
@@ -1054,6 +1060,28 @@ class UserMembershipRestServlet(RestServlet):
|
||||
return HTTPStatus.OK, rooms_response
|
||||
|
||||
|
||||
class UserMembershipsRestServlet(RestServlet):
|
||||
"""
|
||||
Get list of room memberships for a user.
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/memberships$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.is_mine = hs.is_mine
|
||||
self.auth = hs.get_auth()
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
async def on_GET(
|
||||
self, request: SynapseRequest, user_id: str
|
||||
) -> tuple[int, JsonDict]:
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
memberships = await self.store.get_memberships_for_user(user_id)
|
||||
|
||||
return HTTPStatus.OK, {"memberships": memberships}
|
||||
|
||||
|
||||
class PushersRestServlet(RestServlet):
|
||||
"""
|
||||
Gets information about all pushers for a specific `user_id`.
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
import logging
|
||||
import re
|
||||
|
||||
from synapse.api.errors import Codes, cs_error
|
||||
from synapse.http.server import (
|
||||
HttpServer,
|
||||
respond_with_json,
|
||||
@@ -235,7 +236,23 @@ class DownloadResource(RestServlet):
|
||||
# Validate the server name, raising if invalid
|
||||
parse_and_validate_server_name(server_name)
|
||||
|
||||
await self.auth.get_user_by_req(request, allow_guest=True)
|
||||
requester = await self.auth.get_user_by_req(request, allow_guest=True)
|
||||
is_admin = await self.auth.is_server_admin(requester)
|
||||
bypass_quarantine = False
|
||||
if parse_string(request, "admin_unsafely_bypass_quarantine") == "true":
|
||||
if is_admin:
|
||||
logger.info("Admin bypassing quarantine for media download")
|
||||
bypass_quarantine = True
|
||||
else:
|
||||
respond_with_json(
|
||||
request,
|
||||
400,
|
||||
cs_error(
|
||||
"Must be a server admin to bypass quarantine",
|
||||
code=Codes.UNKNOWN,
|
||||
),
|
||||
send_cors=True,
|
||||
)
|
||||
|
||||
set_cors_headers(request)
|
||||
set_corp_headers(request)
|
||||
@@ -259,7 +276,11 @@ class DownloadResource(RestServlet):
|
||||
|
||||
if self._is_mine_server_name(server_name):
|
||||
await self.media_repo.get_local_media(
|
||||
request, media_id, file_name, max_timeout_ms
|
||||
request,
|
||||
media_id,
|
||||
file_name,
|
||||
max_timeout_ms,
|
||||
bypass_quarantine=bypass_quarantine,
|
||||
)
|
||||
else:
|
||||
ip_address = request.getClientAddress().host
|
||||
@@ -271,6 +292,7 @@ class DownloadResource(RestServlet):
|
||||
max_timeout_ms,
|
||||
ip_address,
|
||||
True,
|
||||
bypass_quarantine=bypass_quarantine,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -19,9 +19,12 @@
|
||||
#
|
||||
#
|
||||
import logging
|
||||
from bisect import bisect
|
||||
from http import HTTPStatus
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from unpaddedbase64 import decode_base64, encode_base64
|
||||
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.http.server import HttpServer
|
||||
from synapse.http.servlet import RestServlet, parse_strings_from_args
|
||||
@@ -35,10 +38,34 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MUTUAL_ROOMS_BATCH_LIMIT = 100
|
||||
|
||||
|
||||
def _parse_mutual_rooms_batch_token_args(args: dict[bytes, list[bytes]]) -> str | None:
|
||||
from_batches = parse_strings_from_args(args, "from")
|
||||
if not from_batches:
|
||||
return None
|
||||
if len(from_batches) > 1:
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Duplicate from query parameter",
|
||||
errcode=Codes.INVALID_PARAM,
|
||||
)
|
||||
if from_batches[0]:
|
||||
try:
|
||||
return decode_base64(from_batches[0]).decode("utf-8")
|
||||
except Exception:
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Malformed from token",
|
||||
errcode=Codes.INVALID_PARAM,
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
class UserMutualRoomsServlet(RestServlet):
|
||||
"""
|
||||
GET /uk.half-shot.msc2666/user/mutual_rooms?user_id={user_id} HTTP/1.1
|
||||
GET /uk.half-shot.msc2666/user/mutual_rooms?user_id={user_id}&from={token} HTTP/1.1
|
||||
"""
|
||||
|
||||
PATTERNS = client_patterns(
|
||||
@@ -56,6 +83,7 @@ class UserMutualRoomsServlet(RestServlet):
|
||||
args: dict[bytes, list[bytes]] = request.args # type: ignore
|
||||
|
||||
user_ids = parse_strings_from_args(args, "user_id", required=True)
|
||||
from_batch = _parse_mutual_rooms_batch_token_args(args)
|
||||
|
||||
if len(user_ids) > 1:
|
||||
raise SynapseError(
|
||||
@@ -64,29 +92,52 @@ class UserMutualRoomsServlet(RestServlet):
|
||||
errcode=Codes.INVALID_PARAM,
|
||||
)
|
||||
|
||||
# We don't do batching, so a batch token is illegal by default
|
||||
if b"batch_token" in args:
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Unknown batch_token",
|
||||
errcode=Codes.INVALID_PARAM,
|
||||
)
|
||||
|
||||
user_id = user_ids[0]
|
||||
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
if user_id == requester.user.to_string():
|
||||
raise SynapseError(
|
||||
HTTPStatus.UNPROCESSABLE_ENTITY,
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"You cannot request a list of shared rooms with yourself",
|
||||
errcode=Codes.INVALID_PARAM,
|
||||
errcode=Codes.UNKNOWN,
|
||||
)
|
||||
|
||||
rooms = await self.store.get_mutual_rooms_between_users(
|
||||
frozenset((requester.user.to_string(), user_id))
|
||||
# Sort here instead of the database function, so that we don't expose
|
||||
# clients to any unrelated changes to the sorting algorithm.
|
||||
rooms = sorted(
|
||||
await self.store.get_mutual_rooms_between_users(
|
||||
frozenset((requester.user.to_string(), user_id))
|
||||
)
|
||||
)
|
||||
|
||||
return 200, {"joined": list(rooms)}
|
||||
if from_batch:
|
||||
# A from_batch token was provided, so cut off any rooms where the ID is
|
||||
# lower than or equal to the token. This method doesn't care whether the
|
||||
# provided token room still exists, nor whether it's even a real room ID.
|
||||
#
|
||||
# However, if rooms with a lower ID are added after the token was issued,
|
||||
# they will not be included until the client makes a new request without a
|
||||
# from token. This is considered acceptable, as clients generally won't
|
||||
# persist these results for long periods.
|
||||
rooms = rooms[bisect(rooms, from_batch) :]
|
||||
|
||||
if len(rooms) <= MUTUAL_ROOMS_BATCH_LIMIT:
|
||||
# We've reached the end of the list, don't return a batch token
|
||||
return 200, {"joined": rooms}
|
||||
|
||||
rooms = rooms[:MUTUAL_ROOMS_BATCH_LIMIT]
|
||||
# We use urlsafe unpadded base64 encoding for the batch token in order to
|
||||
# handle funny room IDs in old pre-v12 rooms properly. We also truncate it
|
||||
# to stay within the 255-character limit of opaque tokens.
|
||||
next_batch = encode_base64(rooms[-1].encode("utf-8"), urlsafe=True)[:255]
|
||||
# Due to the truncation, it is technically possible to have conflicting next
|
||||
# batches by creating hundreds of rooms with the same 191 character prefix
|
||||
# in the room ID. In the event that some silly user does that, don't let
|
||||
# them paginate further.
|
||||
if next_batch == from_batch:
|
||||
return 200, {"joined": rooms}
|
||||
|
||||
return 200, {"joined": list(rooms), "next_batch": next_batch}
|
||||
|
||||
|
||||
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||
|
||||
@@ -34,6 +34,7 @@ from typing import (
|
||||
Any,
|
||||
Awaitable,
|
||||
Callable,
|
||||
Optional,
|
||||
TypeVar,
|
||||
cast,
|
||||
)
|
||||
@@ -320,7 +321,7 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
self,
|
||||
hostname: str,
|
||||
config: HomeServerConfig,
|
||||
reactor: ISynapseReactor | None = None,
|
||||
reactor: Optional[ISynapseReactor] = None,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
@@ -353,7 +354,7 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
self._module_web_resources_consumed = False
|
||||
|
||||
# This attribute is set by the free function `refresh_certificate`.
|
||||
self.tls_server_context_factory: IOpenSSLContextFactory | None = None
|
||||
self.tls_server_context_factory: Optional[IOpenSSLContextFactory] = None
|
||||
|
||||
self._is_shutdown = False
|
||||
self._async_shutdown_handlers: list[ShutdownInfo] = []
|
||||
|
||||
@@ -61,6 +61,7 @@ class LocalMedia:
|
||||
url_cache: str | None
|
||||
last_access_ts: int
|
||||
quarantined_by: str | None
|
||||
quarantined_ts: int | None
|
||||
safe_from_quarantine: bool
|
||||
user_id: str | None
|
||||
authenticated: bool | None
|
||||
@@ -78,6 +79,7 @@ class RemoteMedia:
|
||||
created_ts: int
|
||||
last_access_ts: int
|
||||
quarantined_by: str | None
|
||||
quarantined_ts: int | None
|
||||
authenticated: bool | None
|
||||
sha256: str | None
|
||||
|
||||
@@ -243,6 +245,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
||||
"user_id",
|
||||
"authenticated",
|
||||
"sha256",
|
||||
"quarantined_ts",
|
||||
),
|
||||
allow_none=True,
|
||||
desc="get_local_media",
|
||||
@@ -262,6 +265,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
||||
user_id=row[8],
|
||||
authenticated=row[9],
|
||||
sha256=row[10],
|
||||
quarantined_ts=row[11],
|
||||
)
|
||||
|
||||
async def get_local_media_by_user_paginate(
|
||||
@@ -319,7 +323,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
||||
safe_from_quarantine,
|
||||
user_id,
|
||||
authenticated,
|
||||
sha256
|
||||
sha256,
|
||||
quarantined_ts
|
||||
FROM local_media_repository
|
||||
WHERE user_id = ?
|
||||
ORDER BY {order_by_column} {order}, media_id ASC
|
||||
@@ -345,6 +350,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
||||
user_id=row[9],
|
||||
authenticated=row[10],
|
||||
sha256=row[11],
|
||||
quarantined_ts=row[12],
|
||||
)
|
||||
for row in txn
|
||||
]
|
||||
@@ -695,6 +701,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
||||
"quarantined_by",
|
||||
"authenticated",
|
||||
"sha256",
|
||||
"quarantined_ts",
|
||||
),
|
||||
allow_none=True,
|
||||
desc="get_cached_remote_media",
|
||||
@@ -713,6 +720,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
||||
quarantined_by=row[6],
|
||||
authenticated=row[7],
|
||||
sha256=row[8],
|
||||
quarantined_ts=row[9],
|
||||
)
|
||||
|
||||
async def store_cached_remote_media(
|
||||
|
||||
@@ -945,6 +945,50 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
||||
max_lifetime=max_lifetime,
|
||||
)
|
||||
|
||||
async def get_quarantined_media_mxcs(
|
||||
self, index_start: int, index_limit: int, local: bool
|
||||
) -> list[str]:
|
||||
"""Retrieves all the quarantined media MXC URIs starting from the given position,
|
||||
ordered from oldest quarantined timestamp, then alphabetically by media ID
|
||||
(including origin).
|
||||
|
||||
Note that on established servers the "quarantined timestamp" may be zero due to
|
||||
being introduced after the quarantine timestamp field was introduced.
|
||||
|
||||
Args:
|
||||
index_start: The position to start from.
|
||||
index_limit: The maximum number of results to return.
|
||||
local: When true, only local media will be returned. When false, only remote media will be returned.
|
||||
|
||||
Returns:
|
||||
The quarantined media as a list of media IDs.
|
||||
"""
|
||||
|
||||
def _get_quarantined_media_mxcs_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> list[str]:
|
||||
# We order by quarantined timestamp *and* media ID (including origin, when
|
||||
# known) to ensure the ordering is stable for established servers.
|
||||
if local:
|
||||
sql = "SELECT '' as media_origin, media_id FROM local_media_repository WHERE quarantined_by IS NOT NULL ORDER BY quarantined_ts, media_id ASC LIMIT ? OFFSET ?"
|
||||
else:
|
||||
sql = "SELECT media_origin, media_id FROM remote_media_cache WHERE quarantined_by IS NOT NULL ORDER BY quarantined_ts, media_origin, media_id ASC LIMIT ? OFFSET ?"
|
||||
txn.execute(sql, (index_limit, index_start))
|
||||
|
||||
mxcs = []
|
||||
|
||||
for media_origin, media_id in txn:
|
||||
if local:
|
||||
media_origin = self.hs.hostname
|
||||
mxcs.append(f"mxc://{media_origin}/{media_id}")
|
||||
|
||||
return mxcs
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_quarantined_media_mxcs",
|
||||
_get_quarantined_media_mxcs_txn,
|
||||
)
|
||||
|
||||
async def get_media_mxcs_in_room(self, room_id: str) -> tuple[list[str], list[str]]:
|
||||
"""Retrieves all the local and remote media MXC URIs in a given room
|
||||
|
||||
@@ -952,7 +996,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
||||
room_id
|
||||
|
||||
Returns:
|
||||
The local and remote media as a lists of the media IDs.
|
||||
The local and remote media as lists of the media IDs.
|
||||
"""
|
||||
|
||||
def _get_media_mxcs_in_room_txn(
|
||||
@@ -1147,6 +1191,10 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
||||
The total number of media items quarantined
|
||||
"""
|
||||
total_media_quarantined = 0
|
||||
now_ts: int | None = self.clock.time_msec()
|
||||
|
||||
if quarantined_by is None:
|
||||
now_ts = None
|
||||
|
||||
# Effectively a legacy path, update any media that was explicitly named.
|
||||
if media_ids:
|
||||
@@ -1155,13 +1203,13 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
||||
)
|
||||
sql = f"""
|
||||
UPDATE local_media_repository
|
||||
SET quarantined_by = ?
|
||||
SET quarantined_by = ?, quarantined_ts = ?
|
||||
WHERE {sql_many_clause_sql}"""
|
||||
|
||||
if quarantined_by is not None:
|
||||
sql += " AND safe_from_quarantine = FALSE"
|
||||
|
||||
txn.execute(sql, [quarantined_by] + sql_many_clause_args)
|
||||
txn.execute(sql, [quarantined_by, now_ts] + sql_many_clause_args)
|
||||
# Note that a rowcount of -1 can be used to indicate no rows were affected.
|
||||
total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0
|
||||
|
||||
@@ -1172,13 +1220,13 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
||||
)
|
||||
sql = f"""
|
||||
UPDATE local_media_repository
|
||||
SET quarantined_by = ?
|
||||
SET quarantined_by = ?, quarantined_ts = ?
|
||||
WHERE {sql_many_clause_sql}"""
|
||||
|
||||
if quarantined_by is not None:
|
||||
sql += " AND safe_from_quarantine = FALSE"
|
||||
|
||||
txn.execute(sql, [quarantined_by] + sql_many_clause_args)
|
||||
txn.execute(sql, [quarantined_by, now_ts] + sql_many_clause_args)
|
||||
total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0
|
||||
|
||||
return total_media_quarantined
|
||||
@@ -1202,6 +1250,10 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
||||
The total number of media items quarantined
|
||||
"""
|
||||
total_media_quarantined = 0
|
||||
now_ts: int | None = self.clock.time_msec()
|
||||
|
||||
if quarantined_by is None:
|
||||
now_ts = None
|
||||
|
||||
if media:
|
||||
sql_in_list_clause, sql_args = make_tuple_in_list_sql_clause(
|
||||
@@ -1211,10 +1263,10 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
||||
)
|
||||
sql = f"""
|
||||
UPDATE remote_media_cache
|
||||
SET quarantined_by = ?
|
||||
SET quarantined_by = ?, quarantined_ts = ?
|
||||
WHERE {sql_in_list_clause}"""
|
||||
|
||||
txn.execute(sql, [quarantined_by] + sql_args)
|
||||
txn.execute(sql, [quarantined_by, now_ts] + sql_args)
|
||||
total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0
|
||||
|
||||
total_media_quarantined = 0
|
||||
@@ -1224,9 +1276,9 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
||||
)
|
||||
sql = f"""
|
||||
UPDATE remote_media_cache
|
||||
SET quarantined_by = ?
|
||||
SET quarantined_by = ?, quarantined_ts = ?
|
||||
WHERE {sql_many_clause_sql}"""
|
||||
txn.execute(sql, [quarantined_by] + sql_many_clause_args)
|
||||
txn.execute(sql, [quarantined_by, now_ts] + sql_many_clause_args)
|
||||
total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0
|
||||
|
||||
return total_media_quarantined
|
||||
|
||||
@@ -747,6 +747,27 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
|
||||
return frozenset(room_ids)
|
||||
|
||||
async def get_memberships_for_user(self, user_id: str) -> dict[str, str]:
|
||||
"""Returns a dict of room_id to membership state for a given user.
|
||||
|
||||
If a remote user only returns rooms this server is currently
|
||||
participating in.
|
||||
"""
|
||||
|
||||
rows = cast(
|
||||
list[tuple[str, str]],
|
||||
await self.db_pool.simple_select_list(
|
||||
"current_state_events",
|
||||
keyvalues={
|
||||
"type": EventTypes.Member,
|
||||
"state_key": user_id,
|
||||
},
|
||||
retcols=["room_id", "membership"],
|
||||
desc="get_memberships_for_user",
|
||||
),
|
||||
)
|
||||
return dict(rows)
|
||||
|
||||
@cached(max_entries=500000, iterable=True)
|
||||
async def get_rooms_for_user(self, user_id: str) -> frozenset[str]:
|
||||
"""Returns a set of room_ids the user is currently joined to.
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Mapping, cast
|
||||
from typing import TYPE_CHECKING, AbstractSet, Mapping, cast
|
||||
|
||||
import attr
|
||||
|
||||
@@ -26,13 +26,16 @@ from synapse.storage.database import (
|
||||
DatabasePool,
|
||||
LoggingDatabaseConnection,
|
||||
LoggingTransaction,
|
||||
make_in_list_sql_clause,
|
||||
)
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.types import MultiWriterStreamToken, RoomStreamToken
|
||||
from synapse.types.handlers.sliding_sync import (
|
||||
HaveSentRoom,
|
||||
HaveSentRoomFlag,
|
||||
MutablePerConnectionState,
|
||||
PerConnectionState,
|
||||
RoomLazyMembershipChanges,
|
||||
RoomStatusMap,
|
||||
RoomSyncConfig,
|
||||
)
|
||||
@@ -373,6 +376,13 @@ class SlidingSyncStore(SQLBaseStore):
|
||||
value_values=values,
|
||||
)
|
||||
|
||||
self._persist_sliding_sync_connection_lazy_members_txn(
|
||||
txn,
|
||||
connection_key,
|
||||
connection_position,
|
||||
per_connection_state.room_lazy_membership,
|
||||
)
|
||||
|
||||
return connection_position
|
||||
|
||||
@cached(iterable=True, max_entries=100000)
|
||||
@@ -446,6 +456,23 @@ class SlidingSyncStore(SQLBaseStore):
|
||||
"""
|
||||
txn.execute(sql, (connection_key, connection_position))
|
||||
|
||||
# Move any lazy membership entries for this connection position to have
|
||||
# `NULL` connection position, indicating that it applies to all future
|
||||
# positions on this connection. This is safe because we have deleted all
|
||||
# other (potentially forked) connection positions, and so all future
|
||||
# positions in this connection will be a continuation of the current
|
||||
# position. Thus any lazy membership entries we have sent down will still
|
||||
# be valid.
|
||||
self.db_pool.simple_update_txn(
|
||||
txn,
|
||||
table="sliding_sync_connection_lazy_members",
|
||||
keyvalues={
|
||||
"connection_key": connection_key,
|
||||
"connection_position": connection_position,
|
||||
},
|
||||
updatevalues={"connection_position": None},
|
||||
)
|
||||
|
||||
# Fetch and create a mapping from required state ID to the actual
|
||||
# required state for the connection.
|
||||
rows = self.db_pool.simple_select_list_txn(
|
||||
@@ -525,8 +552,153 @@ class SlidingSyncStore(SQLBaseStore):
|
||||
receipts=RoomStatusMap(receipts),
|
||||
account_data=RoomStatusMap(account_data),
|
||||
room_configs=room_configs,
|
||||
room_lazy_membership={},
|
||||
)
|
||||
|
||||
async def get_sliding_sync_connection_lazy_members(
|
||||
self,
|
||||
connection_position: int,
|
||||
room_id: str,
|
||||
user_ids: AbstractSet[str],
|
||||
) -> Mapping[str, int]:
|
||||
"""Get which user IDs in the room we have previously sent lazy
|
||||
membership for.
|
||||
|
||||
Args:
|
||||
connection_position: The sliding sync connection position.
|
||||
room_id: The room ID to get lazy members for.
|
||||
user_ids: The user IDs to check whether we've previously sent
|
||||
because of lazy membership.
|
||||
|
||||
Returns:
|
||||
The mapping of user IDs to the last seen timestamp for those user
|
||||
IDs. Only includes user IDs that we have previously sent lazy
|
||||
membership for, and so may be a subset of the `user_ids` passed in.
|
||||
"""
|
||||
|
||||
def get_sliding_sync_connection_lazy_members_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Mapping[str, int]:
|
||||
user_clause, user_args = make_in_list_sql_clause(
|
||||
txn.database_engine, "user_id", user_ids
|
||||
)
|
||||
|
||||
# Fetch all the lazy membership entries for the given connection,
|
||||
# room and user IDs. We don't have the `connection_key` here, so we
|
||||
# join against `sliding_sync_connection_positions` to get it.
|
||||
#
|
||||
# Beware that there are two `connection_position` columns in the
|
||||
# query which are different, the one in
|
||||
# `sliding_sync_connection_positions` is the one we match to get the
|
||||
# connection_key, whereas the one in
|
||||
# `sliding_sync_connection_lazy_members` is what we filter against
|
||||
# (it may be null or the same as the one passed in).
|
||||
#
|
||||
# FIXME: We should pass in `connection_key` here to avoid the join.
|
||||
# We don't do this currently as the caller doesn't have it handy.
|
||||
sql = f"""
|
||||
SELECT user_id, members.connection_position, last_seen_ts
|
||||
FROM sliding_sync_connection_lazy_members AS members
|
||||
INNER JOIN sliding_sync_connection_positions AS pos USING (connection_key)
|
||||
WHERE pos.connection_position = ? AND room_id = ? AND {user_clause}
|
||||
"""
|
||||
|
||||
txn.execute(sql, (connection_position, room_id, *user_args))
|
||||
|
||||
# Filter out any cache entries that only apply to forked connection
|
||||
# positions. Entries with `NULL` `connection_position` apply to all
|
||||
# positions on the connection.
|
||||
return {
|
||||
user_id: last_seen_ts
|
||||
for user_id, db_connection_position, last_seen_ts in txn
|
||||
if db_connection_position == connection_position
|
||||
or db_connection_position is None
|
||||
}
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_sliding_sync_connection_lazy_members",
|
||||
get_sliding_sync_connection_lazy_members_txn,
|
||||
db_autocommit=True, # Avoid transaction for single read
|
||||
)
|
||||
|
||||
def _persist_sliding_sync_connection_lazy_members_txn(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
connection_key: int,
|
||||
new_connection_position: int,
|
||||
all_changes: dict[str, RoomLazyMembershipChanges],
|
||||
) -> None:
|
||||
"""Persist that we have sent lazy membership for the given user IDs."""
|
||||
|
||||
now = self.clock.time_msec()
|
||||
|
||||
# Figure out which cache entries to add or update.
|
||||
#
|
||||
# These are either a) new entries we've never sent before (i.e. with a
|
||||
# None last_seen_ts), or b) where the `last_seen_ts` is old enough that
|
||||
# we want to update it.
|
||||
#
|
||||
# We don't update the timestamp every time to avoid hammering the DB
|
||||
# with writes, and we don't need the timestamp to be precise. It is used
|
||||
# to evict old entries that haven't been used in a while.
|
||||
to_update: list[tuple[str, str]] = []
|
||||
for room_id, room_changes in all_changes.items():
|
||||
user_ids_to_update = room_changes.get_returned_user_ids_to_update(
|
||||
self.clock
|
||||
)
|
||||
to_update.extend((room_id, user_id) for user_id in user_ids_to_update)
|
||||
|
||||
if to_update:
|
||||
# Upsert the new/updated entries.
|
||||
#
|
||||
# Ignore conflicts where the existing entry has a different
|
||||
# connection position (i.e. from a forked connection position). This
|
||||
# may mean that we lose some updates, but that's acceptable as this
|
||||
# is a cache and its fine for it to *not* include rows. (Downstream
|
||||
# this will cause us to maybe send a few extra lazy members down
|
||||
# sync, but we're allowed to send extra members).
|
||||
sql = """
|
||||
INSERT INTO sliding_sync_connection_lazy_members
|
||||
(connection_key, connection_position, room_id, user_id, last_seen_ts)
|
||||
VALUES {value_placeholder}
|
||||
ON CONFLICT (connection_key, room_id, user_id)
|
||||
DO UPDATE SET last_seen_ts = EXCLUDED.last_seen_ts
|
||||
WHERE sliding_sync_connection_lazy_members.connection_position IS NULL
|
||||
OR sliding_sync_connection_lazy_members.connection_position = EXCLUDED.connection_position
|
||||
"""
|
||||
|
||||
args = [
|
||||
(connection_key, new_connection_position, room_id, user_id, now)
|
||||
for room_id, user_id in to_update
|
||||
]
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
sql = sql.format(value_placeholder="?")
|
||||
txn.execute_values(sql, args, fetch=False)
|
||||
else:
|
||||
sql = sql.format(value_placeholder="(?, ?, ?, ?, ?)")
|
||||
txn.execute_batch(sql, args)
|
||||
|
||||
# Remove any invalidated entries.
|
||||
to_remove: list[tuple[str, str]] = []
|
||||
for room_id, room_changes in all_changes.items():
|
||||
for user_id in room_changes.invalidated_user_ids:
|
||||
to_remove.append((room_id, user_id))
|
||||
|
||||
if to_remove:
|
||||
# We don't try and match on connection position here: it's fine to
|
||||
# remove it from all forks. This is a cache so it's fine to expire
|
||||
# arbitrary entries, the worst that happens is we send a few extra
|
||||
# lazy members down sync.
|
||||
self.db_pool.simple_delete_many_batch_txn(
|
||||
txn,
|
||||
table="sliding_sync_connection_lazy_members",
|
||||
keys=("connection_key", "room_id", "user_id"),
|
||||
values=[
|
||||
(connection_key, room_id, user_id) for room_id, user_id in to_remove
|
||||
],
|
||||
)
|
||||
|
||||
@wrap_as_background_process("delete_old_sliding_sync_connections")
|
||||
async def delete_old_sliding_sync_connections(self) -> None:
|
||||
"""Delete sliding sync connections that have not been used for a long time."""
|
||||
@@ -564,6 +736,10 @@ class PerConnectionStateDB:
|
||||
|
||||
room_configs: Mapping[str, "RoomSyncConfig"]
|
||||
|
||||
room_lazy_membership: dict[str, RoomLazyMembershipChanges]
|
||||
"""Lazy membership changes to persist alongside this state. Only used
|
||||
when persisting."""
|
||||
|
||||
@staticmethod
|
||||
async def from_state(
|
||||
per_connection_state: "MutablePerConnectionState", store: "DataStore"
|
||||
@@ -618,6 +794,7 @@ class PerConnectionStateDB:
|
||||
receipts=RoomStatusMap(receipts),
|
||||
account_data=RoomStatusMap(account_data),
|
||||
room_configs=per_connection_state.room_configs.maps[0],
|
||||
room_lazy_membership=per_connection_state.room_lazy_membership,
|
||||
)
|
||||
|
||||
async def to_state(self, store: "DataStore") -> "PerConnectionState":
|
||||
|
||||
@@ -0,0 +1,60 @@
|
||||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2025 Element Creations Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
|
||||
-- Tracks which member states have been sent to the client for lazy-loaded
|
||||
-- members in sliding sync. This is a *cache* as it doesn't matter if we send
|
||||
-- down members we've previously sent down, i.e. it's safe to delete any rows.
|
||||
--
|
||||
-- We could have tracked these as part of the
|
||||
-- `sliding_sync_connection_required_state` table, but that would bloat that
|
||||
-- table significantly as most rooms will have lazy-loaded members. We want to
|
||||
-- keep that table small as we always pull out all rows for the connection for
|
||||
-- every request, so storing lots of data there would be bad for performance. To
|
||||
-- keep that table small we also deduplicate the requested state across
|
||||
-- different rooms, which if we stored lazy members there would prevent.
|
||||
--
|
||||
-- We track a *rough* `last_seen_ts` for each user in each room which indicates
|
||||
-- when we last would've sent their member state to the client. `last_seen_ts`
|
||||
-- is used so that we can remove members which haven't been seen for a while to
|
||||
-- save space. This is a *rough* timestamp as we don't want to update the
|
||||
-- timestamp every time to avoid hammering the DB with writes, and we don't need
|
||||
-- the timestamp to be precise (as it is used to evict old entries that haven't
|
||||
-- been used in a while).
|
||||
--
|
||||
-- Care must be taken when handling "forked" positions, i.e. we have responded
|
||||
-- to a request with a position and then get another different request using the
|
||||
-- previous position as a base. We track this by including a
|
||||
-- `connection_position` for newly inserted rows. When we advance the position
|
||||
-- we set this to NULL for all rows which were present at that position, and
|
||||
-- delete all other rows. When reading rows we can then filter out any rows
|
||||
-- which have a non-NULL `connection_position` which is not the current
|
||||
-- position.
|
||||
--
|
||||
-- I.e. `connection_position` is NULL for rows which are valid for *all*
|
||||
-- positions on the connection, and is non-NULL for rows which are only valid
|
||||
-- for a specific position.
|
||||
--
|
||||
-- When invalidating rows, we can just delete them. Technically this could
|
||||
-- invalidate for a forked position, but this is acceptable as equivalent to a
|
||||
-- cache eviction.
|
||||
CREATE TABLE sliding_sync_connection_lazy_members (
|
||||
connection_key BIGINT NOT NULL REFERENCES sliding_sync_connections(connection_key) ON DELETE CASCADE,
|
||||
connection_position BIGINT REFERENCES sliding_sync_connection_positions(connection_position) ON DELETE CASCADE,
|
||||
room_id TEXT NOT NULL,
|
||||
user_id TEXT NOT NULL,
|
||||
last_seen_ts BIGINT NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX sliding_sync_connection_lazy_members_idx ON sliding_sync_connection_lazy_members (connection_key, room_id, user_id);
|
||||
CREATE INDEX sliding_sync_connection_lazy_members_pos_idx ON sliding_sync_connection_lazy_members (connection_key, connection_position) WHERE connection_position IS NOT NULL;
|
||||
@@ -0,0 +1,27 @@
|
||||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2025 Element Creations, Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
-- Add a timestamp for when the sliding sync connection position was last used,
|
||||
-- only updated with a small granularity.
|
||||
--
|
||||
-- This should be NOT NULL, but we need to consider existing rows. In future we
|
||||
-- may want to either backfill this or delete all rows with a NULL value (and
|
||||
-- then make it NOT NULL).
|
||||
ALTER TABLE local_media_repository ADD COLUMN quarantined_ts BIGINT;
|
||||
ALTER TABLE remote_media_cache ADD COLUMN quarantined_ts BIGINT;
|
||||
|
||||
UPDATE local_media_repository SET quarantined_ts = 0 WHERE quarantined_by IS NOT NULL;
|
||||
UPDATE remote_media_cache SET quarantined_ts = 0 WHERE quarantined_by IS NOT NULL;
|
||||
|
||||
-- Note: We *probably* should have an index on quarantined_ts, but we're going
|
||||
-- to try to defer that to a future migration after seeing the performance impact.
|
||||
@@ -49,12 +49,21 @@ from synapse.types import (
|
||||
UserID,
|
||||
)
|
||||
from synapse.types.rest.client import SlidingSyncBody
|
||||
from synapse.util.clock import Clock
|
||||
from synapse.util.duration import Duration
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.handlers.relations import BundledAggregations
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# How often to update the last seen timestamp for lazy members.
|
||||
#
|
||||
# We don't update the timestamp every time to avoid hammering the DB with
|
||||
# writes, and we don't need the timestamp to be precise (as it is used to evict
|
||||
# old entries that haven't been used in a while).
|
||||
LAZY_MEMBERS_UPDATE_INTERVAL = Duration(hours=1)
|
||||
|
||||
|
||||
class SlidingSyncConfig(SlidingSyncBody):
|
||||
"""
|
||||
@@ -891,6 +900,69 @@ class PerConnectionState:
|
||||
return len(self.rooms) + len(self.receipts) + len(self.room_configs)
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True)
|
||||
class RoomLazyMembershipChanges:
|
||||
"""Changes to lazily-loaded room memberships for a given room."""
|
||||
|
||||
returned_user_id_to_last_seen_ts_map: Mapping[str, int | None] = attr.Factory(dict)
|
||||
"""Map from user ID to timestamp for users whose membership we have lazily
|
||||
loaded in this room an request. The timestamp indicates the time we
|
||||
previously needed the membership, or None if we sent it down for the first
|
||||
time in this request.
|
||||
|
||||
We track a *rough* `last_seen_ts` for each user in each room which indicates
|
||||
when we last would've sent their member state to the client. This is used so
|
||||
that we can remove members which haven't been seen for a while to save
|
||||
space.
|
||||
|
||||
Note: this will include users whose membership we would have sent down but
|
||||
didn't due to us having previously sent them.
|
||||
"""
|
||||
|
||||
invalidated_user_ids: AbstractSet[str] = attr.Factory(set)
|
||||
"""Set of user IDs whose latest membership we have *not* sent down"""
|
||||
|
||||
def get_returned_user_ids_to_update(self, clock: Clock) -> StrCollection:
|
||||
"""Get the user IDs whose last seen timestamp we need to update in the
|
||||
database.
|
||||
|
||||
This is a subset of user IDs in `returned_user_id_to_last_seen_ts_map`,
|
||||
whose timestamp is either None (first time we've sent them) or older
|
||||
than `LAZY_MEMBERS_UPDATE_INTERVAL`.
|
||||
|
||||
We only update the timestamp in the database every so often to avoid
|
||||
hammering the DB with writes. We don't need the timestamp to be precise,
|
||||
as the timestamp is used to evict old entries that haven't been used in
|
||||
a while.
|
||||
"""
|
||||
|
||||
now_ms = clock.time_msec()
|
||||
return [
|
||||
user_id
|
||||
for user_id, last_seen_ts in self.returned_user_id_to_last_seen_ts_map.items()
|
||||
if last_seen_ts is None
|
||||
or now_ms - last_seen_ts >= LAZY_MEMBERS_UPDATE_INTERVAL.as_millis()
|
||||
]
|
||||
|
||||
def has_updates(self, clock: Clock) -> bool:
|
||||
"""Check if there are any updates to the lazy membership changes.
|
||||
|
||||
Called to check if we need to persist changes to the lazy membership
|
||||
state for the room. We want to avoid persisting the state if there are
|
||||
no changes, to avoid unnecessary writes (and cache misses due to new
|
||||
connection position).
|
||||
"""
|
||||
|
||||
# We consider there to be updates if there are any invalidated user
|
||||
# IDs...
|
||||
if self.invalidated_user_ids:
|
||||
return True
|
||||
|
||||
# ...or if any of the returned user IDs need their last seen timestamp
|
||||
# updating in the database.
|
||||
return bool(self.get_returned_user_ids_to_update(clock))
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True)
|
||||
class MutablePerConnectionState(PerConnectionState):
|
||||
"""A mutable version of `PerConnectionState`"""
|
||||
@@ -903,12 +975,28 @@ class MutablePerConnectionState(PerConnectionState):
|
||||
|
||||
room_configs: typing.ChainMap[str, RoomSyncConfig]
|
||||
|
||||
def has_updates(self) -> bool:
|
||||
# A map from room ID to the lazily-loaded memberships needed for the
|
||||
# request in that room.
|
||||
room_lazy_membership: dict[str, RoomLazyMembershipChanges] = attr.Factory(dict)
|
||||
|
||||
def has_updates(self, clock: Clock) -> bool:
|
||||
"""Check if there are any updates to the per-connection state that need
|
||||
persisting.
|
||||
|
||||
It is important that we don't spuriously do persistence, as that will
|
||||
always generate a new connection position which will invalidate some of
|
||||
the caches. It doesn't need to be perfect, but we should avoid always
|
||||
generating new connection positions when doing lazy loading
|
||||
"""
|
||||
return (
|
||||
bool(self.rooms.get_updates())
|
||||
or bool(self.receipts.get_updates())
|
||||
or bool(self.account_data.get_updates())
|
||||
or bool(self.get_room_config_updates())
|
||||
or any(
|
||||
change.has_updates(clock)
|
||||
for change in self.room_lazy_membership.values()
|
||||
)
|
||||
)
|
||||
|
||||
def get_room_config_updates(self) -> Mapping[str, RoomSyncConfig]:
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
#
|
||||
|
||||
import queue
|
||||
from typing import Any, BinaryIO, cast
|
||||
from typing import Any, BinaryIO, Optional, Union, cast
|
||||
|
||||
from twisted.internet import threads
|
||||
from twisted.internet.defer import Deferred
|
||||
@@ -50,7 +50,7 @@ class BackgroundFileConsumer:
|
||||
self._reactor: ISynapseReactor = reactor
|
||||
|
||||
# Producer we're registered with
|
||||
self._producer: IPushProducer | IPullProducer | None = None
|
||||
self._producer: Optional[Union[IPushProducer, IPullProducer]] = None
|
||||
|
||||
# True if PushProducer, false if PullProducer
|
||||
self.streaming = False
|
||||
@@ -72,7 +72,7 @@ class BackgroundFileConsumer:
|
||||
self._write_exception: Exception | None = None
|
||||
|
||||
def registerProducer(
|
||||
self, producer: IPushProducer | IPullProducer, streaming: bool
|
||||
self, producer: Union[IPushProducer, IPullProducer], streaming: bool
|
||||
) -> None:
|
||||
"""Part of IConsumer interface
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
#
|
||||
#
|
||||
import logging
|
||||
from typing import AbstractSet, Mapping
|
||||
from typing import AbstractSet
|
||||
from unittest.mock import patch
|
||||
|
||||
import attr
|
||||
@@ -38,13 +38,17 @@ from synapse.handlers.sliding_sync import (
|
||||
RoomSyncConfig,
|
||||
StateValues,
|
||||
_required_state_changes,
|
||||
_RequiredStateChangesReturn,
|
||||
)
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client import knock, login, room
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
||||
from synapse.types import JsonDict, StateMap, StreamToken, UserID, create_requester
|
||||
from synapse.types.handlers.sliding_sync import PerConnectionState, SlidingSyncConfig
|
||||
from synapse.types.handlers.sliding_sync import (
|
||||
PerConnectionState,
|
||||
SlidingSyncConfig,
|
||||
)
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.clock import Clock
|
||||
|
||||
@@ -3827,12 +3831,11 @@ class RequiredStateChangesTestParameters:
|
||||
previous_required_state_map: dict[str, set[str]]
|
||||
request_required_state_map: dict[str, set[str]]
|
||||
state_deltas: StateMap[str]
|
||||
expected_with_state_deltas: tuple[
|
||||
Mapping[str, AbstractSet[str]] | None, StateFilter
|
||||
]
|
||||
expected_without_state_deltas: tuple[
|
||||
Mapping[str, AbstractSet[str]] | None, StateFilter
|
||||
]
|
||||
expected_with_state_deltas: _RequiredStateChangesReturn
|
||||
expected_without_state_deltas: _RequiredStateChangesReturn
|
||||
|
||||
previously_returned_lazy_user_ids: AbstractSet[str] = frozenset()
|
||||
request_lazy_load_user_ids: AbstractSet[str] = frozenset()
|
||||
|
||||
|
||||
class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
@@ -3848,8 +3851,12 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
request_required_state_map={"type1": {"state_key"}},
|
||||
state_deltas={("type1", "state_key"): "$event_id"},
|
||||
# No changes
|
||||
expected_with_state_deltas=(None, StateFilter.none()),
|
||||
expected_without_state_deltas=(None, StateFilter.none()),
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
None, StateFilter.none()
|
||||
),
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
None, StateFilter.none()
|
||||
),
|
||||
),
|
||||
),
|
||||
(
|
||||
@@ -3862,14 +3869,14 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
"type2": {"state_key"},
|
||||
},
|
||||
state_deltas={("type2", "state_key"): "$event_id"},
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# We've added a type so we should persist the changed required state
|
||||
# config.
|
||||
{"type1": {"state_key"}, "type2": {"state_key"}},
|
||||
# We should see the new type added
|
||||
StateFilter.from_types([("type2", "state_key")]),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
{"type1": {"state_key"}, "type2": {"state_key"}},
|
||||
StateFilter.from_types([("type2", "state_key")]),
|
||||
),
|
||||
@@ -3885,7 +3892,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
"type2": {"state_key"},
|
||||
},
|
||||
state_deltas={("type2", "state_key"): "$event_id"},
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# We've added a type so we should persist the changed required state
|
||||
# config.
|
||||
{"type1": {"state_key"}, "type2": {"state_key"}},
|
||||
@@ -3894,7 +3901,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
[("type1", "state_key"), ("type2", "state_key")]
|
||||
),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
{"type1": {"state_key"}, "type2": {"state_key"}},
|
||||
StateFilter.from_types(
|
||||
[("type1", "state_key"), ("type2", "state_key")]
|
||||
@@ -3909,14 +3916,14 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
previous_required_state_map={"type": {"state_key1"}},
|
||||
request_required_state_map={"type": {"state_key1", "state_key2"}},
|
||||
state_deltas={("type", "state_key2"): "$event_id"},
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# We've added a key so we should persist the changed required state
|
||||
# config.
|
||||
{"type": {"state_key1", "state_key2"}},
|
||||
# We should see the new state_keys added
|
||||
StateFilter.from_types([("type", "state_key2")]),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
{"type": {"state_key1", "state_key2"}},
|
||||
StateFilter.from_types([("type", "state_key2")]),
|
||||
),
|
||||
@@ -3929,7 +3936,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
previous_required_state_map={"type": {"state_key1"}},
|
||||
request_required_state_map={"type": {"state_key2", "state_key3"}},
|
||||
state_deltas={("type", "state_key2"): "$event_id"},
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# We've added a key so we should persist the changed required state
|
||||
# config.
|
||||
#
|
||||
@@ -3940,7 +3947,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
[("type", "state_key2"), ("type", "state_key3")]
|
||||
),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
{"type": {"state_key1", "state_key2", "state_key3"}},
|
||||
StateFilter.from_types(
|
||||
[("type", "state_key2"), ("type", "state_key3")]
|
||||
@@ -3964,7 +3971,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
},
|
||||
request_required_state_map={"type1": {"state_key"}},
|
||||
state_deltas={("type2", "state_key"): "$event_id"},
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# Remove `type2` since there's been a change to that state,
|
||||
# (persist the change to required state). That way next time,
|
||||
# they request `type2`, we see that we haven't sent it before
|
||||
@@ -3975,7 +3982,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
# less state now
|
||||
StateFilter.none(),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
# `type2` is no longer requested but since that state hasn't
|
||||
# changed, nothing should change (we should still keep track
|
||||
# that we've sent `type2` before).
|
||||
@@ -3998,7 +4005,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
},
|
||||
request_required_state_map={},
|
||||
state_deltas={("type2", "state_key"): "$event_id"},
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# Remove `type2` since there's been a change to that state,
|
||||
# (persist the change to required state). That way next time,
|
||||
# they request `type2`, we see that we haven't sent it before
|
||||
@@ -4009,7 +4016,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
# less state now
|
||||
StateFilter.none(),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
# `type2` is no longer requested but since that state hasn't
|
||||
# changed, nothing should change (we should still keep track
|
||||
# that we've sent `type2` before).
|
||||
@@ -4029,7 +4036,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
previous_required_state_map={"type": {"state_key1", "state_key2"}},
|
||||
request_required_state_map={"type": {"state_key1"}},
|
||||
state_deltas={("type", "state_key2"): "$event_id"},
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# Remove `(type, state_key2)` since there's been a change
|
||||
# to that state (persist the change to required state).
|
||||
# That way next time, they request `(type, state_key2)`, we see
|
||||
@@ -4041,7 +4048,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
# less state now
|
||||
StateFilter.none(),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
# `(type, state_key2)` is no longer requested but since that
|
||||
# state hasn't changed, nothing should change (we should still
|
||||
# keep track that we've sent `(type, state_key1)` and `(type,
|
||||
@@ -4073,11 +4080,11 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
("other_type", "state_key"): "$event_id",
|
||||
},
|
||||
# We've added a wildcard, so we persist the change and request everything
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
{"type1": {"state_key2"}, StateValues.WILDCARD: {"state_key"}},
|
||||
StateFilter.all(),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
{"type1": {"state_key2"}, StateValues.WILDCARD: {"state_key"}},
|
||||
StateFilter.all(),
|
||||
),
|
||||
@@ -4103,13 +4110,13 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
("other_type", "state_key"): "$event_id",
|
||||
},
|
||||
# We've removed a type wildcard, so we persist the change but don't request anything
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
{"type1": {"state_key2"}},
|
||||
# We don't need to request anything more if they are requesting
|
||||
# less state now
|
||||
StateFilter.none(),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
{"type1": {"state_key2"}},
|
||||
# We don't need to request anything more if they are requesting
|
||||
# less state now
|
||||
@@ -4129,11 +4136,11 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
state_deltas={("type2", "state_key"): "$event_id"},
|
||||
# We've added a wildcard state_key, so we persist the change and
|
||||
# request all of the state for that type
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
{"type1": {"state_key"}, "type2": {StateValues.WILDCARD}},
|
||||
StateFilter.from_types([("type2", None)]),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
{"type1": {"state_key"}, "type2": {StateValues.WILDCARD}},
|
||||
StateFilter.from_types([("type2", None)]),
|
||||
),
|
||||
@@ -4151,7 +4158,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
state_deltas={("type2", "state_key"): "$event_id"},
|
||||
# We've removed a state_key wildcard, so we persist the change and
|
||||
# request nothing
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
{"type1": {"state_key"}},
|
||||
# We don't need to request anything more if they are requesting
|
||||
# less state now
|
||||
@@ -4160,7 +4167,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
# We've removed a state_key wildcard but there have been no matching
|
||||
# state changes, so no changes needed, just persist the
|
||||
# `request_required_state_map` as-is.
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
None,
|
||||
# We don't need to request anything more if they are requesting
|
||||
# less state now
|
||||
@@ -4180,7 +4187,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
},
|
||||
request_required_state_map={"type1": {"state_key1"}},
|
||||
state_deltas={("type1", "state_key3"): "$event_id"},
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# We've removed some state keys from the type, but only state_key3 was
|
||||
# changed so only that one should be removed.
|
||||
{"type1": {"state_key1", "state_key2"}},
|
||||
@@ -4188,7 +4195,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
# less state now
|
||||
StateFilter.none(),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
# No changes needed, just persist the
|
||||
# `request_required_state_map` as-is
|
||||
None,
|
||||
@@ -4207,14 +4214,14 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
previous_required_state_map={},
|
||||
request_required_state_map={"type1": {StateValues.ME}},
|
||||
state_deltas={("type1", "@user:test"): "$event_id"},
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# We've added a type so we should persist the changed required state
|
||||
# config.
|
||||
{"type1": {StateValues.ME}},
|
||||
# We should see the new state_keys added
|
||||
StateFilter.from_types([("type1", "@user:test")]),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
{"type1": {StateValues.ME}},
|
||||
StateFilter.from_types([("type1", "@user:test")]),
|
||||
),
|
||||
@@ -4229,7 +4236,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
previous_required_state_map={"type1": {StateValues.ME}},
|
||||
request_required_state_map={},
|
||||
state_deltas={("type1", "@user:test"): "$event_id"},
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# Remove `type1` since there's been a change to that state,
|
||||
# (persist the change to required state). That way next time,
|
||||
# they request `type1`, we see that we haven't sent it before
|
||||
@@ -4240,7 +4247,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
# less state now
|
||||
StateFilter.none(),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
# `type1` is no longer requested but since that state hasn't
|
||||
# changed, nothing should change (we should still keep track
|
||||
# that we've sent `type1` before).
|
||||
@@ -4260,14 +4267,14 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
previous_required_state_map={},
|
||||
request_required_state_map={"type1": {"@user:test"}},
|
||||
state_deltas={("type1", "@user:test"): "$event_id"},
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# We've added a type so we should persist the changed required state
|
||||
# config.
|
||||
{"type1": {"@user:test"}},
|
||||
# We should see the new state_keys added
|
||||
StateFilter.from_types([("type1", "@user:test")]),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
{"type1": {"@user:test"}},
|
||||
StateFilter.from_types([("type1", "@user:test")]),
|
||||
),
|
||||
@@ -4282,7 +4289,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
previous_required_state_map={"type1": {"@user:test"}},
|
||||
request_required_state_map={},
|
||||
state_deltas={("type1", "@user:test"): "$event_id"},
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# Remove `type1` since there's been a change to that state,
|
||||
# (persist the change to required state). That way next time,
|
||||
# they request `type1`, we see that we haven't sent it before
|
||||
@@ -4293,7 +4300,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
# less state now
|
||||
StateFilter.none(),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
# `type1` is no longer requested but since that state hasn't
|
||||
# changed, nothing should change (we should still keep track
|
||||
# that we've sent `type1` before).
|
||||
@@ -4313,13 +4320,13 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
previous_required_state_map={},
|
||||
request_required_state_map={EventTypes.Member: {StateValues.LAZY}},
|
||||
state_deltas={(EventTypes.Member, "@user:test"): "$event_id"},
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# If a "$LAZY" has been added or removed we always update the
|
||||
# required state to what was requested for simplicity.
|
||||
{EventTypes.Member: {StateValues.LAZY}},
|
||||
StateFilter.none(),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
{EventTypes.Member: {StateValues.LAZY}},
|
||||
StateFilter.none(),
|
||||
),
|
||||
@@ -4334,7 +4341,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
previous_required_state_map={EventTypes.Member: {StateValues.LAZY}},
|
||||
request_required_state_map={},
|
||||
state_deltas={(EventTypes.Member, "@user:test"): "$event_id"},
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# If a "$LAZY" has been added or removed we always update the
|
||||
# required state to what was requested for simplicity.
|
||||
{},
|
||||
@@ -4342,7 +4349,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
# less state now
|
||||
StateFilter.none(),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
# `EventTypes.Member` is no longer requested but since that
|
||||
# state hasn't changed, nothing should change (we should still
|
||||
# keep track that we've sent `EventTypes.Member` before).
|
||||
@@ -4361,41 +4368,40 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
we're sending down another response without any timeline events.
|
||||
""",
|
||||
RequiredStateChangesTestParameters(
|
||||
previous_required_state_map={
|
||||
EventTypes.Member: {
|
||||
StateValues.LAZY,
|
||||
"@user2:test",
|
||||
"@user3:test",
|
||||
}
|
||||
},
|
||||
previous_required_state_map={EventTypes.Member: {StateValues.LAZY}},
|
||||
request_required_state_map={EventTypes.Member: {StateValues.LAZY}},
|
||||
previously_returned_lazy_user_ids={"@user2:test", "@user3:test"},
|
||||
request_lazy_load_user_ids=set(),
|
||||
state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"},
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# The `request_required_state_map` hasn't changed
|
||||
None,
|
||||
# We don't need to request anything more if they are requesting
|
||||
# less state now
|
||||
StateFilter.none(),
|
||||
# Previous request did not include any explicit members,
|
||||
# so there is no extra users to add to the lazy cache.
|
||||
extra_users_to_add_to_lazy_cache=frozenset(),
|
||||
# Remove "@user2:test" since that state has changed and is no
|
||||
# longer being requested anymore. Since something was removed,
|
||||
# we should persist the changed to required state. That way next
|
||||
# time, they request "@user2:test", we see that we haven't sent
|
||||
# it before and send the new state. (we should still keep track
|
||||
# that we've sent specific `EventTypes.Member` before)
|
||||
{
|
||||
EventTypes.Member: {
|
||||
StateValues.LAZY,
|
||||
"@user3:test",
|
||||
}
|
||||
},
|
||||
# We don't need to request anything more if they are requesting
|
||||
# less state now
|
||||
StateFilter.none(),
|
||||
lazy_members_invalidated={"@user2:test"},
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
# We're not requesting any specific `EventTypes.Member` now but
|
||||
# since that state hasn't changed, nothing should change (we
|
||||
# should still keep track that we've sent specific
|
||||
# `EventTypes.Member` before).
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
# The `request_required_state_map` hasn't changed
|
||||
None,
|
||||
# We don't need to request anything more if they are requesting
|
||||
# less state now
|
||||
StateFilter.none(),
|
||||
# Previous request did not include any explicit members,
|
||||
# so there is no extra users to add to the lazy cache.
|
||||
extra_users_to_add_to_lazy_cache=frozenset(),
|
||||
# Nothing should change (we should still keep track that
|
||||
# we've sent specific `EventTypes.Member` before).
|
||||
lazy_members_invalidated=frozenset(),
|
||||
),
|
||||
),
|
||||
),
|
||||
@@ -4407,50 +4413,37 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
we're sending down another response with a new event from user4.
|
||||
""",
|
||||
RequiredStateChangesTestParameters(
|
||||
previous_required_state_map={
|
||||
EventTypes.Member: {
|
||||
StateValues.LAZY,
|
||||
"@user2:test",
|
||||
"@user3:test",
|
||||
}
|
||||
},
|
||||
request_required_state_map={
|
||||
EventTypes.Member: {StateValues.LAZY, "@user4:test"}
|
||||
},
|
||||
previous_required_state_map={EventTypes.Member: {StateValues.LAZY}},
|
||||
request_required_state_map={EventTypes.Member: {StateValues.LAZY}},
|
||||
previously_returned_lazy_user_ids={"@user2:test", "@user3:test"},
|
||||
request_lazy_load_user_ids={"@user4:test"},
|
||||
state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"},
|
||||
expected_with_state_deltas=(
|
||||
# Since "@user4:test" was added, we should persist the changed
|
||||
# required state config.
|
||||
#
|
||||
# Also remove "@user2:test" since that state has changed and is no
|
||||
# longer being requested anymore. Since something was removed,
|
||||
# we also should persist the changed to required state. That way next
|
||||
# time, they request "@user2:test", we see that we haven't sent
|
||||
# it before and send the new state. (we should still keep track
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# The `request_required_state_map` hasn't changed
|
||||
None,
|
||||
# We should see the new state_keys added
|
||||
StateFilter.from_types([(EventTypes.Member, "@user4:test")]),
|
||||
# Previous request did not include any explicit members,
|
||||
# so there is no extra users to add to the lazy cache.
|
||||
extra_users_to_add_to_lazy_cache=frozenset(),
|
||||
# Remove "@user2:test" since that state has changed and
|
||||
# is no longer being requested anymore. Since something
|
||||
# was removed, we also should persist the changed to
|
||||
# required state. That way next time, they request
|
||||
# "@user2:test", we see that we haven't sent it before
|
||||
# and send the new state. (we should still keep track
|
||||
# that we've sent specific `EventTypes.Member` before)
|
||||
{
|
||||
EventTypes.Member: {
|
||||
StateValues.LAZY,
|
||||
"@user3:test",
|
||||
"@user4:test",
|
||||
}
|
||||
},
|
||||
# We should see the new state_keys added
|
||||
StateFilter.from_types([(EventTypes.Member, "@user4:test")]),
|
||||
lazy_members_invalidated={"@user2:test"},
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
# Since "@user4:test" was added, we should persist the changed
|
||||
# required state config.
|
||||
{
|
||||
EventTypes.Member: {
|
||||
StateValues.LAZY,
|
||||
"@user2:test",
|
||||
"@user3:test",
|
||||
"@user4:test",
|
||||
}
|
||||
},
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
# The `request_required_state_map` hasn't changed
|
||||
None,
|
||||
# We should see the new state_keys added
|
||||
StateFilter.from_types([(EventTypes.Member, "@user4:test")]),
|
||||
# Previous request did not include any explicit members,
|
||||
# so there is no extra users to add to the lazy cache.
|
||||
extra_users_to_add_to_lazy_cache=frozenset(),
|
||||
lazy_members_invalidated=frozenset(),
|
||||
),
|
||||
),
|
||||
),
|
||||
@@ -4464,40 +4457,81 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
EventTypes.Member: {"@user2:test", "@user3:test"}
|
||||
},
|
||||
request_required_state_map={EventTypes.Member: {StateValues.LAZY}},
|
||||
previously_returned_lazy_user_ids=frozenset(),
|
||||
request_lazy_load_user_ids=frozenset(),
|
||||
state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"},
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# Since `StateValues.LAZY` was added, we should persist the
|
||||
# changed required state config.
|
||||
#
|
||||
# Also remove "@user2:test" since that state has changed and is no
|
||||
# longer being requested anymore. Since something was removed,
|
||||
# we also should persist the changed to required state. That way next
|
||||
# time, they request "@user2:test", we see that we haven't sent
|
||||
# it before and send the new state. (we should still keep track
|
||||
# that we've sent specific `EventTypes.Member` before)
|
||||
{
|
||||
EventTypes.Member: {
|
||||
StateValues.LAZY,
|
||||
"@user3:test",
|
||||
}
|
||||
},
|
||||
# We don't need to request anything more if they are requesting
|
||||
# less state now
|
||||
{EventTypes.Member: {StateValues.LAZY}},
|
||||
# No users are being lazy loaded, so nothing to request.
|
||||
StateFilter.none(),
|
||||
# Remember the fact that we've sent @user3 down before,
|
||||
# but not @user2 as that has been invalidated.
|
||||
extra_users_to_add_to_lazy_cache={"@user3:test"},
|
||||
# Nothing to invalidate as there are no existing lazy members.
|
||||
lazy_members_invalidated=frozenset(),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
# Since `StateValues.LAZY` was added, we should persist the
|
||||
# changed required state config.
|
||||
{
|
||||
EventTypes.Member: {
|
||||
StateValues.LAZY,
|
||||
"@user2:test",
|
||||
"@user3:test",
|
||||
}
|
||||
},
|
||||
# We don't need to request anything more if they are requesting
|
||||
# less state now
|
||||
{EventTypes.Member: {StateValues.LAZY}},
|
||||
# No users are being lazy loaded, so nothing to request.
|
||||
StateFilter.none(),
|
||||
# Remember the fact that we've sent the users down before.
|
||||
extra_users_to_add_to_lazy_cache={"@user2:test", "@user3:test"},
|
||||
# Nothing to invalidate as there are no existing lazy members.
|
||||
lazy_members_invalidated=frozenset(),
|
||||
),
|
||||
),
|
||||
),
|
||||
(
|
||||
"state_key_expand_lazy_keep_previous_memberships_need_previous_sent",
|
||||
"""
|
||||
Test expanding the `required_state` to lazy-loading room
|
||||
members. If a previously explicit membership is requested then
|
||||
we should not send it again (as it was already sent before).
|
||||
""",
|
||||
RequiredStateChangesTestParameters(
|
||||
previous_required_state_map={
|
||||
EventTypes.Member: {"@user2:test", "@user3:test"}
|
||||
},
|
||||
request_required_state_map={EventTypes.Member: {StateValues.LAZY}},
|
||||
previously_returned_lazy_user_ids=frozenset(),
|
||||
request_lazy_load_user_ids={"@user3:test"},
|
||||
state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"},
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# Since `StateValues.LAZY` was added, we should persist the
|
||||
# changed required state config.
|
||||
{EventTypes.Member: {StateValues.LAZY}},
|
||||
# We have already sent @user3 down before.
|
||||
#
|
||||
# `@user3:test` is required for lazy loading, but we've
|
||||
# already sent it down before (due to it being in
|
||||
# `previous_required_state_map`), so we don't need to
|
||||
# request it again.
|
||||
StateFilter.none(),
|
||||
# Remember the fact that we've sent @user3 down before,
|
||||
# but not @user2 as that has been invalidated.
|
||||
extra_users_to_add_to_lazy_cache={"@user3:test"},
|
||||
# Nothing to invalidate as there are no existing lazy members.
|
||||
lazy_members_invalidated=frozenset(),
|
||||
),
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
# Since `StateValues.LAZY` was added, we should persist the
|
||||
# changed required state config.
|
||||
{EventTypes.Member: {StateValues.LAZY}},
|
||||
# We have already sent @user3 down before.
|
||||
#
|
||||
# `@user3:test` is required for lazy loading, but we've
|
||||
# already sent it down before (due to it being in
|
||||
# `previous_required_state_map`), so we don't need to
|
||||
# request it again.
|
||||
StateFilter.none(),
|
||||
# Remember the fact that we've sent the users down before.
|
||||
extra_users_to_add_to_lazy_cache={"@user2:test", "@user3:test"},
|
||||
# Nothing to invalidate as there are no existing lazy members.
|
||||
lazy_members_invalidated=frozenset(),
|
||||
),
|
||||
),
|
||||
),
|
||||
@@ -4507,36 +4541,33 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
Test retracting the `required_state` to no longer lazy-loading room members.
|
||||
""",
|
||||
RequiredStateChangesTestParameters(
|
||||
previous_required_state_map={
|
||||
EventTypes.Member: {
|
||||
StateValues.LAZY,
|
||||
"@user2:test",
|
||||
"@user3:test",
|
||||
}
|
||||
},
|
||||
previous_required_state_map={EventTypes.Member: {StateValues.LAZY}},
|
||||
request_required_state_map={},
|
||||
previously_returned_lazy_user_ids={"@user2:test", "@user3:test"},
|
||||
request_lazy_load_user_ids=set(),
|
||||
state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"},
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# Remove `EventTypes.Member` since there's been a change to that
|
||||
# state, (persist the change to required state). That way next
|
||||
# time, they request `EventTypes.Member`, we see that we haven't
|
||||
# sent it before and send the new state. (if we were tracking
|
||||
# that we sent any other state, we should still keep track
|
||||
# that).
|
||||
#
|
||||
# This acts the same as the `simple_remove_type` test. It's
|
||||
# possible that we could remember the specific `state_keys` that
|
||||
# we have sent down before but this currently just acts the same
|
||||
# as if a whole `type` was removed. Perhaps it's good that we
|
||||
# "garbage collect" and forget what we've sent before for a
|
||||
# given `type` when the client stops caring about a certain
|
||||
# `type`.
|
||||
# state, (persist the change to required state).
|
||||
{},
|
||||
# We don't need to request anything more if they are requesting
|
||||
# less state now
|
||||
StateFilter.none(),
|
||||
# Previous request did not include any explicit members,
|
||||
# so there is no extra users to add to the lazy cache.
|
||||
extra_users_to_add_to_lazy_cache=frozenset(),
|
||||
# Explicitly remove the now invalidated @user2:test
|
||||
# membership.
|
||||
#
|
||||
# We don't invalidate @user3:test as that membership
|
||||
# hasn't changed. We continue to store the existing lazy
|
||||
# members since they might be useful for future
|
||||
# requests. (Alternatively, we could invalidate all
|
||||
# members in the room when the client stops lazy
|
||||
# loading, but we opt to keep track of them).
|
||||
lazy_members_invalidated={"@user2:test"},
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
# `EventTypes.Member` is no longer requested but since that
|
||||
# state hasn't changed, nothing should change (we should still
|
||||
# keep track that we've sent `EventTypes.Member` before).
|
||||
@@ -4544,13 +4575,20 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
# We don't need to request anything more if they are requesting
|
||||
# less state now
|
||||
StateFilter.none(),
|
||||
# Previous request did not include any explicit members,
|
||||
# so there is no extra users to add to the lazy cache.
|
||||
extra_users_to_add_to_lazy_cache=frozenset(),
|
||||
# Nothing has been invalidated.
|
||||
lazy_members_invalidated=frozenset(),
|
||||
),
|
||||
),
|
||||
),
|
||||
(
|
||||
"state_key_retract_lazy_keep_previous_memberships_with_new_memberships",
|
||||
"state_key_retract_lazy_keep_previous_explicit_memberships",
|
||||
"""
|
||||
Test retracting the `required_state` to no longer lazy-loading room members.
|
||||
Test removing explicit memberships from the `required_state`
|
||||
when lazy-loading room members tracks previously sent
|
||||
memberships.
|
||||
""",
|
||||
RequiredStateChangesTestParameters(
|
||||
previous_required_state_map={
|
||||
@@ -4560,39 +4598,144 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
"@user3:test",
|
||||
}
|
||||
},
|
||||
request_required_state_map={EventTypes.Member: {"@user4:test"}},
|
||||
request_required_state_map={EventTypes.Member: {StateValues.LAZY}},
|
||||
previously_returned_lazy_user_ids=frozenset(),
|
||||
request_lazy_load_user_ids={"@user3:test"},
|
||||
state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"},
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# Since an explicit membership was removed, we record
|
||||
# the new required state config and move them to lazy
|
||||
# members.
|
||||
{EventTypes.Member: {StateValues.LAZY}},
|
||||
# We have already sent @user3 down before.
|
||||
#
|
||||
# `@user3:test` is required for lazy loading, but we've
|
||||
# already sent it down before (due to it being in
|
||||
# `previous_required_state_map`), so we don't need to
|
||||
# request it again.
|
||||
StateFilter.none(),
|
||||
# Remember the fact that we've sent @user3 down before,
|
||||
# but not @user2 as that has been invalidated.
|
||||
extra_users_to_add_to_lazy_cache={"@user3:test"},
|
||||
# Nothing to invalidate as there are no existing lazy members.
|
||||
lazy_members_invalidated=frozenset(),
|
||||
),
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
# While some explicit memberships were removed, there were no
|
||||
# state changes, so we don't need to persist the new required
|
||||
# state config yet.
|
||||
None,
|
||||
# We have already sent @user3 down before.
|
||||
#
|
||||
# `@user3:test` is required for lazy loading, but we've
|
||||
# already sent it down before (due to it being in
|
||||
# `previous_required_state_map`), so we don't need to
|
||||
# request it again.
|
||||
StateFilter.none(),
|
||||
# Remember the fact that we've sent the users down before.
|
||||
extra_users_to_add_to_lazy_cache=frozenset(),
|
||||
# Nothing to invalidate as there are no existing lazy members.
|
||||
lazy_members_invalidated=frozenset(),
|
||||
),
|
||||
),
|
||||
),
|
||||
(
|
||||
"state_key_retract_lazy_keep_previous_explicit_me_memberships",
|
||||
"""
|
||||
Test removing explicit $ME memberships from the `required_state`
|
||||
when lazy-loading room members tracks previously sent
|
||||
memberships.
|
||||
""",
|
||||
RequiredStateChangesTestParameters(
|
||||
previous_required_state_map={
|
||||
EventTypes.Member: {
|
||||
StateValues.LAZY,
|
||||
StateValues.ME,
|
||||
"@user2:test",
|
||||
}
|
||||
},
|
||||
request_required_state_map={EventTypes.Member: {StateValues.LAZY}},
|
||||
previously_returned_lazy_user_ids=frozenset(),
|
||||
request_lazy_load_user_ids={"@user:test"},
|
||||
state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"},
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# Since an explicit membership was removed, we record
|
||||
# the new required state config and move them to lazy
|
||||
# members.
|
||||
{EventTypes.Member: {StateValues.LAZY}},
|
||||
# We have already sent @user down before.
|
||||
#
|
||||
# `@user:test` is required for lazy loading, but we've
|
||||
# already sent it down before (due to `StateValues.ME`
|
||||
# being in `previous_required_state_map`), so we don't
|
||||
# need to request it again.
|
||||
StateFilter.none(),
|
||||
# Remember the fact that we've sent @user down before,
|
||||
# but not @user2 as that has been invalidated.
|
||||
extra_users_to_add_to_lazy_cache={"@user:test"},
|
||||
# Nothing to invalidate as there are no existing lazy members.
|
||||
lazy_members_invalidated=frozenset(),
|
||||
),
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
# While some explicit memberships were removed, there were no
|
||||
# state changes, so we don't need to persist the new required
|
||||
# state config yet.
|
||||
None,
|
||||
# We have already sent @user down before.
|
||||
#
|
||||
# `@user:test` is required for lazy loading, but we've
|
||||
# already sent it down before (due to `StateValues.ME`
|
||||
# being in `previous_required_state_map`), so we don't
|
||||
# need to request it again.
|
||||
StateFilter.none(),
|
||||
# No relevant state has changed and we don't persist the
|
||||
# changed required_state_map, so we don't yet move the
|
||||
# $ME state to the lazy cache.
|
||||
extra_users_to_add_to_lazy_cache=frozenset(),
|
||||
# Nothing to invalidate as there are no existing lazy members.
|
||||
lazy_members_invalidated=frozenset(),
|
||||
),
|
||||
),
|
||||
),
|
||||
(
|
||||
"state_key_retract_lazy_keep_previous_memberships_with_new_memberships",
|
||||
"""
|
||||
Test retracting the `required_state` to no longer lazy-loading room members.
|
||||
""",
|
||||
RequiredStateChangesTestParameters(
|
||||
previous_required_state_map={EventTypes.Member: {StateValues.LAZY}},
|
||||
request_required_state_map={EventTypes.Member: {"@user4:test"}},
|
||||
previously_returned_lazy_user_ids={"@user2:test", "@user3:test"},
|
||||
request_lazy_load_user_ids=frozenset(),
|
||||
state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"},
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
# Since "@user4:test" was added, we should persist the changed
|
||||
# required state config.
|
||||
#
|
||||
{EventTypes.Member: {"@user4:test"}},
|
||||
# We should see the new state_keys added
|
||||
StateFilter.from_types([(EventTypes.Member, "@user4:test")]),
|
||||
# Previous request did not include any explicit members,
|
||||
# so there is no extra users to add to the lazy cache.
|
||||
extra_users_to_add_to_lazy_cache=frozenset(),
|
||||
# Also remove "@user2:test" since that state has changed and is no
|
||||
# longer being requested anymore. Since something was removed,
|
||||
# we also should persist the changed to required state. That way next
|
||||
# time, they request "@user2:test", we see that we haven't sent
|
||||
# it before and send the new state. (we should still keep track
|
||||
# that we've sent specific `EventTypes.Member` before)
|
||||
{
|
||||
EventTypes.Member: {
|
||||
"@user3:test",
|
||||
"@user4:test",
|
||||
}
|
||||
},
|
||||
# We should see the new state_keys added
|
||||
StateFilter.from_types([(EventTypes.Member, "@user4:test")]),
|
||||
lazy_members_invalidated={"@user2:test"},
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
# Since "@user4:test" was added, we should persist the changed
|
||||
# required state config.
|
||||
{
|
||||
EventTypes.Member: {
|
||||
"@user2:test",
|
||||
"@user3:test",
|
||||
"@user4:test",
|
||||
}
|
||||
},
|
||||
{EventTypes.Member: {"@user4:test"}},
|
||||
# We should see the new state_keys added
|
||||
StateFilter.from_types([(EventTypes.Member, "@user4:test")]),
|
||||
# Previous request did not include any explicit members,
|
||||
# so there is no extra users to add to the lazy cache.
|
||||
extra_users_to_add_to_lazy_cache=frozenset(),
|
||||
# We don't invalidate user2 as they haven't changed
|
||||
lazy_members_invalidated=frozenset(),
|
||||
),
|
||||
),
|
||||
),
|
||||
@@ -4613,7 +4756,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
# room required state config to match the request. And since we we're previously
|
||||
# already fetching everything, we don't have to fetch anything now that they've
|
||||
# narrowed.
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
{
|
||||
StateValues.WILDCARD: {
|
||||
"state_key1",
|
||||
@@ -4623,7 +4766,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
},
|
||||
StateFilter.none(),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
{
|
||||
StateValues.WILDCARD: {
|
||||
"state_key1",
|
||||
@@ -4649,11 +4792,11 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
},
|
||||
state_deltas={("type1", "state_key1"): "$event_id"},
|
||||
# We've added a wildcard, so we persist the change and request everything
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
{StateValues.WILDCARD: {StateValues.WILDCARD}},
|
||||
StateFilter.all(),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
{StateValues.WILDCARD: {StateValues.WILDCARD}},
|
||||
StateFilter.all(),
|
||||
),
|
||||
@@ -4673,7 +4816,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
# request. And since we we're previously already fetching
|
||||
# everything, we don't have to fetch anything now that they've
|
||||
# narrowed.
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
{
|
||||
"type1": {
|
||||
"state_key1",
|
||||
@@ -4683,7 +4826,7 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
},
|
||||
StateFilter.none(),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
{
|
||||
"type1": {
|
||||
"state_key1",
|
||||
@@ -4708,11 +4851,11 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
# update the effective room required state config to match the
|
||||
# request. And we need to request all of the state for that type
|
||||
# because we previously, only sent down a few keys.
|
||||
expected_with_state_deltas=(
|
||||
expected_with_state_deltas=_RequiredStateChangesReturn(
|
||||
{"type1": {StateValues.WILDCARD, "state_key2", "state_key3"}},
|
||||
StateFilter.from_types([("type1", None)]),
|
||||
),
|
||||
expected_without_state_deltas=(
|
||||
expected_without_state_deltas=_RequiredStateChangesReturn(
|
||||
{
|
||||
"type1": {
|
||||
StateValues.WILDCARD,
|
||||
@@ -4734,42 +4877,66 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
test_parameters: RequiredStateChangesTestParameters,
|
||||
) -> None:
|
||||
# Without `state_deltas`
|
||||
changed_required_state_map, added_state_filter = _required_state_changes(
|
||||
state_changes = _required_state_changes(
|
||||
user_id="@user:test",
|
||||
prev_required_state_map=test_parameters.previous_required_state_map,
|
||||
request_required_state_map=test_parameters.request_required_state_map,
|
||||
previously_returned_lazy_user_ids=test_parameters.previously_returned_lazy_user_ids,
|
||||
request_lazy_load_user_ids=test_parameters.request_lazy_load_user_ids,
|
||||
state_deltas={},
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
changed_required_state_map,
|
||||
test_parameters.expected_without_state_deltas[0],
|
||||
state_changes.changed_required_state_map,
|
||||
test_parameters.expected_without_state_deltas.changed_required_state_map,
|
||||
"changed_required_state_map does not match (without state_deltas)",
|
||||
)
|
||||
self.assertEqual(
|
||||
added_state_filter,
|
||||
test_parameters.expected_without_state_deltas[1],
|
||||
state_changes.added_state_filter,
|
||||
test_parameters.expected_without_state_deltas.added_state_filter,
|
||||
"added_state_filter does not match (without state_deltas)",
|
||||
)
|
||||
self.assertEqual(
|
||||
state_changes.lazy_members_invalidated,
|
||||
test_parameters.expected_without_state_deltas.lazy_members_invalidated,
|
||||
"lazy_members_invalidated does not match (without state_deltas)",
|
||||
)
|
||||
self.assertEqual(
|
||||
state_changes.extra_users_to_add_to_lazy_cache,
|
||||
test_parameters.expected_without_state_deltas.extra_users_to_add_to_lazy_cache,
|
||||
"lazy_members_previously_returned does not match (without state_deltas)",
|
||||
)
|
||||
|
||||
# With `state_deltas`
|
||||
changed_required_state_map, added_state_filter = _required_state_changes(
|
||||
state_changes = _required_state_changes(
|
||||
user_id="@user:test",
|
||||
prev_required_state_map=test_parameters.previous_required_state_map,
|
||||
request_required_state_map=test_parameters.request_required_state_map,
|
||||
previously_returned_lazy_user_ids=test_parameters.previously_returned_lazy_user_ids,
|
||||
request_lazy_load_user_ids=test_parameters.request_lazy_load_user_ids,
|
||||
state_deltas=test_parameters.state_deltas,
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
changed_required_state_map,
|
||||
test_parameters.expected_with_state_deltas[0],
|
||||
state_changes.changed_required_state_map,
|
||||
test_parameters.expected_with_state_deltas.changed_required_state_map,
|
||||
"changed_required_state_map does not match (with state_deltas)",
|
||||
)
|
||||
self.assertEqual(
|
||||
added_state_filter,
|
||||
test_parameters.expected_with_state_deltas[1],
|
||||
state_changes.added_state_filter,
|
||||
test_parameters.expected_with_state_deltas.added_state_filter,
|
||||
"added_state_filter does not match (with state_deltas)",
|
||||
)
|
||||
self.assertEqual(
|
||||
state_changes.lazy_members_invalidated,
|
||||
test_parameters.expected_with_state_deltas.lazy_members_invalidated,
|
||||
"lazy_members_invalidated does not match (with state_deltas)",
|
||||
)
|
||||
self.assertEqual(
|
||||
state_changes.extra_users_to_add_to_lazy_cache,
|
||||
test_parameters.expected_with_state_deltas.extra_users_to_add_to_lazy_cache,
|
||||
"lazy_members_previously_returned does not match (with state_deltas)",
|
||||
)
|
||||
|
||||
@parameterized.expand(
|
||||
[
|
||||
@@ -4805,12 +4972,16 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
}
|
||||
|
||||
# (function under test)
|
||||
changed_required_state_map, added_state_filter = _required_state_changes(
|
||||
state_changes = _required_state_changes(
|
||||
user_id="@user:test",
|
||||
prev_required_state_map=previous_required_state_map,
|
||||
request_required_state_map=request_required_state_map,
|
||||
previously_returned_lazy_user_ids=frozenset(),
|
||||
request_lazy_load_user_ids=frozenset(),
|
||||
state_deltas={},
|
||||
)
|
||||
changed_required_state_map = state_changes.changed_required_state_map
|
||||
|
||||
assert changed_required_state_map is not None
|
||||
|
||||
# We should only remember up to the maximum number of state keys
|
||||
@@ -4874,12 +5045,16 @@ class RequiredStateChangesTestCase(unittest.TestCase):
|
||||
)
|
||||
|
||||
# (function under test)
|
||||
changed_required_state_map, added_state_filter = _required_state_changes(
|
||||
state_changes = _required_state_changes(
|
||||
user_id="@user:test",
|
||||
prev_required_state_map=previous_required_state_map,
|
||||
request_required_state_map=request_required_state_map,
|
||||
previously_returned_lazy_user_ids=frozenset(),
|
||||
request_lazy_load_user_ids=frozenset(),
|
||||
state_deltas={},
|
||||
)
|
||||
changed_required_state_map = state_changes.changed_required_state_map
|
||||
|
||||
assert changed_required_state_map is not None
|
||||
|
||||
# Should include all of the requested state
|
||||
|
||||
@@ -71,14 +71,43 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
|
||||
return resources
|
||||
|
||||
def _ensure_quarantined(
|
||||
self, admin_user_tok: str, server_and_media_id: str
|
||||
self,
|
||||
user_tok: str,
|
||||
server_and_media_id: str,
|
||||
include_bypass_param: bool = False,
|
||||
) -> None:
|
||||
"""Ensure a piece of media is quarantined when trying to access it."""
|
||||
"""Ensure a piece of media is quarantined when trying to access it.
|
||||
|
||||
The include_bypass_param flag enables the presence of the
|
||||
admin_unsafely_bypass_quarantine query parameter, but still expects that the
|
||||
request will fail to download the media.
|
||||
"""
|
||||
if include_bypass_param:
|
||||
query_string = "?admin_unsafely_bypass_quarantine=true"
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f"/_matrix/client/v1/media/download/{server_and_media_id}{query_string}",
|
||||
shorthand=False,
|
||||
access_token=user_tok,
|
||||
)
|
||||
|
||||
# Non-admins can't bypass, so this should fail regardless of whether the
|
||||
# media is actually quarantined.
|
||||
self.assertEqual(
|
||||
400,
|
||||
channel.code,
|
||||
msg=(
|
||||
"Expected to receive a 400 when bypassing quarantined media: %s"
|
||||
% server_and_media_id
|
||||
),
|
||||
)
|
||||
|
||||
# Repeat the request, this time without the bypass parameter.
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f"/_matrix/client/v1/media/download/{server_and_media_id}",
|
||||
shorthand=False,
|
||||
access_token=admin_user_tok,
|
||||
access_token=user_tok,
|
||||
)
|
||||
|
||||
# Should be quarantined
|
||||
@@ -91,6 +120,62 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
|
||||
),
|
||||
)
|
||||
|
||||
def test_admin_can_bypass_quarantine(self) -> None:
|
||||
self.register_user("admin", "pass", admin=True)
|
||||
admin_user_tok = self.login("admin", "pass")
|
||||
|
||||
# Upload some media
|
||||
response = self.helper.upload_media(SMALL_PNG, tok=admin_user_tok)
|
||||
|
||||
# Extract media ID from the response
|
||||
server_name_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
|
||||
server_name, media_id = server_name_and_media_id.split("/")
|
||||
|
||||
# Attempt to access the media
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f"/_matrix/client/v1/media/download/{server_name_and_media_id}",
|
||||
shorthand=False,
|
||||
access_token=admin_user_tok,
|
||||
)
|
||||
|
||||
# Should be successful
|
||||
self.assertEqual(200, channel.code)
|
||||
|
||||
# Quarantine the media
|
||||
url = "/_synapse/admin/v1/media/quarantine/%s/%s" % (
|
||||
urllib.parse.quote(server_name),
|
||||
urllib.parse.quote(media_id),
|
||||
)
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
url,
|
||||
access_token=admin_user_tok,
|
||||
)
|
||||
self.pump(1.0)
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
|
||||
# Now access it *without* the bypass parameter - this should fail (as expected).
|
||||
self._ensure_quarantined(
|
||||
admin_user_tok, server_name_and_media_id, include_bypass_param=False
|
||||
)
|
||||
|
||||
# Now access it *with* the bypass parameter - this should work
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f"/_matrix/client/v1/media/download/{server_name_and_media_id}?admin_unsafely_bypass_quarantine=true",
|
||||
shorthand=False,
|
||||
access_token=admin_user_tok,
|
||||
)
|
||||
self.assertEqual(
|
||||
200,
|
||||
channel.code,
|
||||
msg=(
|
||||
"Expected to receive a 200 on accessing (with bypass) quarantined media: %s"
|
||||
% server_name_and_media_id
|
||||
),
|
||||
)
|
||||
|
||||
@parameterized.expand(
|
||||
[
|
||||
# Attempt quarantine media APIs as non-admin
|
||||
@@ -154,8 +239,14 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
|
||||
self.pump(1.0)
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
|
||||
# Attempt to access the media
|
||||
self._ensure_quarantined(admin_user_tok, server_name_and_media_id)
|
||||
# Attempt to access the media (and ensure non-admins can't download it, even
|
||||
# with a bypass parameter). Admins cannot download it without the bypass param.
|
||||
self._ensure_quarantined(
|
||||
non_admin_user_tok, server_name_and_media_id, include_bypass_param=True
|
||||
)
|
||||
self._ensure_quarantined(
|
||||
admin_user_tok, server_name_and_media_id, include_bypass_param=False
|
||||
)
|
||||
|
||||
@parameterized.expand(
|
||||
[
|
||||
@@ -214,9 +305,21 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
|
||||
server_and_media_id_1 = mxc_1[6:]
|
||||
server_and_media_id_2 = mxc_2[6:]
|
||||
|
||||
# Test that we cannot download any of the media anymore
|
||||
self._ensure_quarantined(admin_user_tok, server_and_media_id_1)
|
||||
self._ensure_quarantined(admin_user_tok, server_and_media_id_2)
|
||||
# Test that we cannot download any of the media anymore, especially with the
|
||||
# bypass parameter set. Admins cannot download the media without supplying the
|
||||
# bypass parameter, so we check that too.
|
||||
self._ensure_quarantined(
|
||||
non_admin_user_tok, server_and_media_id_1, include_bypass_param=True
|
||||
)
|
||||
self._ensure_quarantined(
|
||||
non_admin_user_tok, server_and_media_id_2, include_bypass_param=True
|
||||
)
|
||||
self._ensure_quarantined(
|
||||
admin_user_tok, server_and_media_id_1, include_bypass_param=False
|
||||
)
|
||||
self._ensure_quarantined(
|
||||
admin_user_tok, server_and_media_id_2, include_bypass_param=False
|
||||
)
|
||||
|
||||
def test_quarantine_all_media_by_user(self) -> None:
|
||||
self.register_user("user_admin", "pass", admin=True)
|
||||
@@ -263,10 +366,27 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
|
||||
channel.json_body, {"num_quarantined": 3}, "Expected 3 quarantined items"
|
||||
)
|
||||
|
||||
# Attempt to access each piece of media
|
||||
self._ensure_quarantined(admin_user_tok, server_and_media_id_1)
|
||||
self._ensure_quarantined(admin_user_tok, server_and_media_id_2)
|
||||
self._ensure_quarantined(admin_user_tok, server_and_media_id_3)
|
||||
# Attempt to access each piece of media, ensuring that it can't be downloaded
|
||||
# even with a bypass parameter. Admins should not be able to download the media
|
||||
# either when not supplying the bypass parameter, so we check that too.
|
||||
self._ensure_quarantined(
|
||||
non_admin_user_tok, server_and_media_id_1, include_bypass_param=True
|
||||
)
|
||||
self._ensure_quarantined(
|
||||
non_admin_user_tok, server_and_media_id_2, include_bypass_param=True
|
||||
)
|
||||
self._ensure_quarantined(
|
||||
non_admin_user_tok, server_and_media_id_3, include_bypass_param=True
|
||||
)
|
||||
self._ensure_quarantined(
|
||||
admin_user_tok, server_and_media_id_1, include_bypass_param=False
|
||||
)
|
||||
self._ensure_quarantined(
|
||||
admin_user_tok, server_and_media_id_2, include_bypass_param=False
|
||||
)
|
||||
self._ensure_quarantined(
|
||||
admin_user_tok, server_and_media_id_3, include_bypass_param=False
|
||||
)
|
||||
|
||||
def test_cannot_quarantine_safe_media(self) -> None:
|
||||
self.register_user("user_admin", "pass", admin=True)
|
||||
@@ -307,8 +427,14 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
|
||||
# Attempt to access each piece of media, the first should fail, the
|
||||
# second should succeed.
|
||||
self._ensure_quarantined(admin_user_tok, server_and_media_id_1)
|
||||
# second should succeed. We check both the non-admin user with a bypass
|
||||
# parameter, and the admin user without.
|
||||
self._ensure_quarantined(
|
||||
non_admin_user_tok, server_and_media_id_1, include_bypass_param=True
|
||||
)
|
||||
self._ensure_quarantined(
|
||||
admin_user_tok, server_and_media_id_1, include_bypass_param=False
|
||||
)
|
||||
|
||||
# Attempt to access each piece of media
|
||||
channel = self.make_request(
|
||||
|
||||
@@ -756,6 +756,112 @@ class DeleteMediaByDateSizeTestCase(_AdminMediaTests):
|
||||
self.assertFalse(os.path.exists(local_path))
|
||||
|
||||
|
||||
class ListQuarantinedMediaTestCase(_AdminMediaTests):
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.store = hs.get_datastores().main
|
||||
self.server_name = hs.hostname
|
||||
|
||||
@parameterized.expand(["local", "remote"])
|
||||
def test_no_auth(self, kind: str) -> None:
|
||||
"""
|
||||
Try to list quarantined media without authentication.
|
||||
"""
|
||||
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
"/_synapse/admin/v1/media/quarantined?kind=%s" % (kind,),
|
||||
)
|
||||
|
||||
self.assertEqual(401, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
|
||||
|
||||
@parameterized.expand(["local", "remote"])
|
||||
def test_requester_is_not_admin(self, kind: str) -> None:
|
||||
"""
|
||||
If the user is not a server admin, an error is returned.
|
||||
"""
|
||||
self.other_user = self.register_user("user", "pass")
|
||||
self.other_user_token = self.login("user", "pass")
|
||||
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
"/_synapse/admin/v1/media/quarantined?kind=%s" % (kind,),
|
||||
access_token=self.other_user_token,
|
||||
)
|
||||
|
||||
self.assertEqual(403, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
|
||||
|
||||
def test_list_quarantined_media(self) -> None:
|
||||
"""
|
||||
Ensure we actually get results for each page. We can't really test that
|
||||
remote media is quarantined, but we can test that local media is.
|
||||
"""
|
||||
self.admin_user = self.register_user("admin", "pass", admin=True)
|
||||
self.admin_user_tok = self.login("admin", "pass")
|
||||
|
||||
def _upload() -> str:
|
||||
return self.helper.upload_media(
|
||||
SMALL_PNG, tok=self.admin_user_tok, expect_code=200
|
||||
)["content_uri"][6:].split("/")[1] # Cut off 'mxc://' and domain
|
||||
|
||||
self.media_id_1 = _upload()
|
||||
self.media_id_2 = _upload()
|
||||
self.media_id_3 = _upload()
|
||||
|
||||
def _quarantine(media_id: str) -> None:
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
"/_synapse/admin/v1/media/quarantine/%s/%s"
|
||||
% (
|
||||
self.server_name,
|
||||
media_id,
|
||||
),
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
|
||||
_quarantine(self.media_id_1)
|
||||
_quarantine(self.media_id_2)
|
||||
_quarantine(self.media_id_3)
|
||||
|
||||
# Page 1
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
"/_synapse/admin/v1/media/quarantined?kind=local&from=0&limit=1",
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(1, len(channel.json_body["media"]))
|
||||
|
||||
# Page 2
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
"/_synapse/admin/v1/media/quarantined?kind=local&from=1&limit=1",
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(1, len(channel.json_body["media"]))
|
||||
|
||||
# Page 3
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
"/_synapse/admin/v1/media/quarantined?kind=local&from=2&limit=1",
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(1, len(channel.json_body["media"]))
|
||||
|
||||
# Page 4 (no media)
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
"/_synapse/admin/v1/media/quarantined?kind=local&from=3&limit=1",
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(0, len(channel.json_body["media"]))
|
||||
|
||||
|
||||
class QuarantineMediaByIDTestCase(_AdminMediaTests):
|
||||
def upload_media_and_return_media_id(self, data: bytes) -> str:
|
||||
# Upload some media into the room
|
||||
|
||||
@@ -2976,6 +2976,120 @@ class JoinAliasRoomTestCase(unittest.HomeserverTestCase):
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(private_room_id, channel.json_body["joined_rooms"][0])
|
||||
|
||||
def test_joined_rooms(self) -> None:
|
||||
"""
|
||||
Test joined_rooms admin endpoint.
|
||||
"""
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
f"/_matrix/client/v3/join/{self.public_room_id}",
|
||||
content={"user_id": self.second_user_id},
|
||||
access_token=self.second_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(self.public_room_id, channel.json_body["room_id"])
|
||||
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f"/_synapse/admin/v1/users/{self.second_user_id}/joined_rooms",
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(self.public_room_id, channel.json_body["joined_rooms"][0])
|
||||
|
||||
def test_memberships(self) -> None:
|
||||
"""
|
||||
Test user memberships admin endpoint.
|
||||
"""
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
f"/_matrix/client/v3/join/{self.public_room_id}",
|
||||
content={"user_id": self.second_user_id},
|
||||
access_token=self.second_tok,
|
||||
)
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
|
||||
other_room_id = self.helper.create_room_as(
|
||||
self.admin_user, tok=self.admin_user_tok
|
||||
)
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
f"/_matrix/client/v3/join/{other_room_id}",
|
||||
content={"user_id": self.second_user_id},
|
||||
access_token=self.second_tok,
|
||||
)
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f"/_synapse/admin/v1/users/{self.second_user_id}/memberships",
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(
|
||||
{
|
||||
"memberships": {
|
||||
self.public_room_id: Membership.JOIN,
|
||||
other_room_id: Membership.JOIN,
|
||||
}
|
||||
},
|
||||
channel.json_body,
|
||||
)
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
f"/_matrix/client/v3/rooms/{other_room_id}/leave",
|
||||
content={"user_id": self.second_user_id},
|
||||
access_token=self.second_tok,
|
||||
)
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
|
||||
invited_room_id = self.helper.create_room_as(
|
||||
self.admin_user, tok=self.admin_user_tok
|
||||
)
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
f"/_matrix/client/v3/rooms/{invited_room_id}/invite",
|
||||
content={"user_id": self.second_user_id},
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
|
||||
banned_room_id = self.helper.create_room_as(
|
||||
self.admin_user, tok=self.admin_user_tok
|
||||
)
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
f"/_matrix/client/v3/rooms/{banned_room_id}/ban",
|
||||
content={"user_id": self.second_user_id},
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f"/_synapse/admin/v1/users/{self.second_user_id}/memberships",
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(
|
||||
{
|
||||
"memberships": {
|
||||
self.public_room_id: Membership.JOIN,
|
||||
other_room_id: Membership.LEAVE,
|
||||
invited_room_id: Membership.INVITE,
|
||||
banned_room_id: Membership.BAN,
|
||||
}
|
||||
},
|
||||
channel.json_body,
|
||||
)
|
||||
|
||||
def test_context_as_non_admin(self) -> None:
|
||||
"""
|
||||
Test that, without being admin, one cannot use the context admin API
|
||||
|
||||
@@ -690,7 +690,7 @@ class SlidingSyncFiltersTestCase(SlidingSyncBase):
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
# Create a remote invite room without any `unsigned.invite_room_state`
|
||||
_remote_invite_room_id = self._create_remote_invite_room_for_user(
|
||||
_remote_invite_room_id, _ = self._create_remote_invite_room_for_user(
|
||||
user1_id, None
|
||||
)
|
||||
|
||||
@@ -760,7 +760,7 @@ class SlidingSyncFiltersTestCase(SlidingSyncBase):
|
||||
|
||||
# Create a remote invite room with some `unsigned.invite_room_state`
|
||||
# indicating that the room is encrypted.
|
||||
remote_invite_room_id = self._create_remote_invite_room_for_user(
|
||||
remote_invite_room_id, _ = self._create_remote_invite_room_for_user(
|
||||
user1_id,
|
||||
[
|
||||
StrippedStateEvent(
|
||||
@@ -849,7 +849,7 @@ class SlidingSyncFiltersTestCase(SlidingSyncBase):
|
||||
|
||||
# Create a remote invite room with some `unsigned.invite_room_state`
|
||||
# but don't set any room encryption event.
|
||||
remote_invite_room_id = self._create_remote_invite_room_for_user(
|
||||
remote_invite_room_id, _ = self._create_remote_invite_room_for_user(
|
||||
user1_id,
|
||||
[
|
||||
StrippedStateEvent(
|
||||
@@ -1484,7 +1484,7 @@ class SlidingSyncFiltersTestCase(SlidingSyncBase):
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
# Create a remote invite room without any `unsigned.invite_room_state`
|
||||
_remote_invite_room_id = self._create_remote_invite_room_for_user(
|
||||
_remote_invite_room_id, _ = self._create_remote_invite_room_for_user(
|
||||
user1_id, None
|
||||
)
|
||||
|
||||
@@ -1554,7 +1554,7 @@ class SlidingSyncFiltersTestCase(SlidingSyncBase):
|
||||
|
||||
# Create a remote invite room with some `unsigned.invite_room_state` indicating
|
||||
# that it is a space room
|
||||
remote_invite_room_id = self._create_remote_invite_room_for_user(
|
||||
remote_invite_room_id, _ = self._create_remote_invite_room_for_user(
|
||||
user1_id,
|
||||
[
|
||||
StrippedStateEvent(
|
||||
@@ -1637,7 +1637,7 @@ class SlidingSyncFiltersTestCase(SlidingSyncBase):
|
||||
|
||||
# Create a remote invite room with some `unsigned.invite_room_state`
|
||||
# but the create event does not specify a room type (normal room)
|
||||
remote_invite_room_id = self._create_remote_invite_room_for_user(
|
||||
remote_invite_room_id, _ = self._create_remote_invite_room_for_user(
|
||||
user1_id,
|
||||
[
|
||||
StrippedStateEvent(
|
||||
|
||||
@@ -23,6 +23,7 @@ from synapse.api.constants import EventContentFields, EventTypes, JoinRules, Mem
|
||||
from synapse.handlers.sliding_sync import StateValues
|
||||
from synapse.rest.client import knock, login, room, sync
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.databases.main.events import DeltaState, SlidingSyncTableChanges
|
||||
from synapse.util.clock import Clock
|
||||
|
||||
from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase
|
||||
@@ -642,11 +643,6 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
# This appears because *some* membership in the room changed and the
|
||||
# heroes are recalculated and is thrown in because we have it. But this
|
||||
# is technically optional and not needed because we've already seen user2
|
||||
# in the last sync (and their membership hasn't changed).
|
||||
state_map[(EventTypes.Member, user2_id)],
|
||||
# Appears because there is a message in the timeline from this user
|
||||
state_map[(EventTypes.Member, user4_id)],
|
||||
# Appears because there is a membership event in the timeline from this user
|
||||
@@ -841,6 +837,437 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
|
||||
exact=True,
|
||||
)
|
||||
|
||||
def test_lazy_loading_room_members_limited_sync(self) -> None:
|
||||
"""Test that when using lazy loading for room members and a limited sync
|
||||
missing a membership change, we include the membership change next time
|
||||
said user says something.
|
||||
"""
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||
|
||||
# Send a message from each user to the room so that both memberships are sent down.
|
||||
self.helper.send(room_id1, "1", tok=user1_tok)
|
||||
self.helper.send(room_id1, "2", tok=user2_tok)
|
||||
|
||||
# Make a first sync with lazy loading for the room members to establish
|
||||
# a position
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [
|
||||
[EventTypes.Member, StateValues.LAZY],
|
||||
],
|
||||
"timeline_limit": 2,
|
||||
}
|
||||
}
|
||||
}
|
||||
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# We should see both membership events in required_state
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Member, user1_id)],
|
||||
state_map[(EventTypes.Member, user2_id)],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# User2 changes their display name (causing a membership change)
|
||||
self.helper.send_state(
|
||||
room_id1,
|
||||
event_type=EventTypes.Member,
|
||||
state_key=user2_id,
|
||||
body={
|
||||
EventContentFields.MEMBERSHIP: Membership.JOIN,
|
||||
EventContentFields.MEMBERSHIP_DISPLAYNAME: "New Name",
|
||||
},
|
||||
tok=user2_tok,
|
||||
)
|
||||
|
||||
# Send a couple of messages to the room to push out the membership change
|
||||
self.helper.send(room_id1, "3", tok=user1_tok)
|
||||
self.helper.send(room_id1, "4", tok=user1_tok)
|
||||
|
||||
# Make an incremental Sliding Sync request
|
||||
response_body, from_token = self.do_sync(
|
||||
sync_body, since=from_token, tok=user1_tok
|
||||
)
|
||||
|
||||
# The membership change should *not* be included yet as user2 doesn't
|
||||
# have any events in the timeline.
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1].get("required_state", []),
|
||||
set(),
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Now user2 sends a message to the room
|
||||
self.helper.send(room_id1, "5", tok=user2_tok)
|
||||
|
||||
# Make another incremental Sliding Sync request
|
||||
response_body, from_token = self.do_sync(
|
||||
sync_body, since=from_token, tok=user1_tok
|
||||
)
|
||||
|
||||
# The membership change should now be included as user2 has an event
|
||||
# in the timeline.
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1].get("required_state", []),
|
||||
{
|
||||
state_map[(EventTypes.Member, user2_id)],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
def test_lazy_loading_room_members_across_multiple_rooms(self) -> None:
|
||||
"""Test that lazy loading room members are tracked per-room correctly."""
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
# Create two rooms with both users in them and send a message in each
|
||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||
self.helper.send(room_id1, "room1-msg1", tok=user2_tok)
|
||||
|
||||
room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.join(room_id2, user1_id, tok=user1_tok)
|
||||
self.helper.send(room_id2, "room2-msg1", tok=user2_tok)
|
||||
|
||||
# Make a sync with lazy loading for the room members to establish
|
||||
# a position
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [
|
||||
[EventTypes.Member, StateValues.LAZY],
|
||||
],
|
||||
"timeline_limit": 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# We expect to see only user2's membership in both rooms
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Member, user2_id)],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Send a message in room1 from user1
|
||||
self.helper.send(room_id1, "room1-msg2", tok=user1_tok)
|
||||
|
||||
# Make an incremental Sliding Sync request and check that we get user1's
|
||||
# membership.
|
||||
response_body, from_token = self.do_sync(
|
||||
sync_body, since=from_token, tok=user1_tok
|
||||
)
|
||||
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Member, user1_id)],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Send a message in room2 from user1
|
||||
self.helper.send(room_id2, "room2-msg2", tok=user1_tok)
|
||||
|
||||
# Make an incremental Sliding Sync request and check that we get user1's
|
||||
# membership.
|
||||
response_body, from_token = self.do_sync(
|
||||
sync_body, since=from_token, tok=user1_tok
|
||||
)
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id2)
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id2]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Member, user1_id)],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
def test_lazy_loading_room_members_across_multiple_connections(self) -> None:
|
||||
"""Test that lazy loading room members are tracked per-connection
|
||||
correctly.
|
||||
|
||||
This catches bugs where if a membership got sent down one connection,
|
||||
it would incorrectly assume it was sent down another connection.
|
||||
"""
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||
|
||||
self.helper.send(room_id1, "1", tok=user2_tok)
|
||||
|
||||
# Make a sync with lazy loading for the room members to establish
|
||||
# a position
|
||||
sync_body1 = {
|
||||
"conn_id": "first-connection",
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [
|
||||
[EventTypes.Member, StateValues.LAZY],
|
||||
],
|
||||
"timeline_limit": 1,
|
||||
}
|
||||
},
|
||||
}
|
||||
response_body, from_token1 = self.do_sync(sync_body1, tok=user1_tok)
|
||||
|
||||
# We expect to see only user2's membership in the room
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Member, user2_id)],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Now make a new connection
|
||||
sync_body2 = {
|
||||
"conn_id": "second-connection",
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [
|
||||
[EventTypes.Member, StateValues.LAZY],
|
||||
],
|
||||
"timeline_limit": 1,
|
||||
}
|
||||
},
|
||||
}
|
||||
response_body, from_token2 = self.do_sync(sync_body2, tok=user1_tok)
|
||||
|
||||
# We should see user2's membership as this is a new connection
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Member, user2_id)],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# If we send a message from user1 and sync again on the first connection,
|
||||
# we should get user1's membership
|
||||
self.helper.send(room_id1, "2", tok=user1_tok)
|
||||
response_body, from_token1 = self.do_sync(
|
||||
sync_body1, since=from_token1, tok=user1_tok
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Member, user1_id)],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# We sync again on the first connection to "ack" the position. This
|
||||
# triggers the `sliding_sync_connection_lazy_members` to set its
|
||||
# connection_position to null.
|
||||
self.do_sync(sync_body1, since=from_token1, tok=user1_tok)
|
||||
|
||||
# If we sync again on the second connection, we should also get user1's
|
||||
# membership
|
||||
response_body, _ = self.do_sync(sync_body2, since=from_token2, tok=user1_tok)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Member, user1_id)],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
def test_lazy_loading_room_members_forked_position(self) -> None:
|
||||
"""Test that lazy loading room members are tracked correctly when a
|
||||
connection position is reused"""
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||
|
||||
self.helper.send(room_id1, "1", tok=user2_tok)
|
||||
|
||||
# Make a sync with lazy loading for the room members to establish
|
||||
# a position
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [
|
||||
[EventTypes.Member, StateValues.LAZY],
|
||||
],
|
||||
"timeline_limit": 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# We expect to see only user2's membership in the room
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Member, user2_id)],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Send a message in room1 from user1
|
||||
self.helper.send(room_id1, "2", tok=user1_tok)
|
||||
|
||||
# Make an incremental Sliding Sync request and check that we get user1's
|
||||
# membership.
|
||||
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Member, user1_id)],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Now, reuse the original position and check we still get user1's
|
||||
# membership.
|
||||
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Member, user1_id)],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
def test_lazy_loading_room_members_explicit_membership_removed(self) -> None:
|
||||
"""Test the case where we requested explicit memberships and then later
|
||||
changed to lazy loading."""
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||
|
||||
self.helper.send(room_id1, "1", tok=user2_tok)
|
||||
|
||||
# Make a sync with lazy loading for the room members to establish
|
||||
# a position
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [
|
||||
[EventTypes.Member, StateValues.ME],
|
||||
],
|
||||
"timeline_limit": 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# We expect to see only user1's membership in the room
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Member, user1_id)],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Now change to lazy loading...
|
||||
sync_body["lists"]["foo-list"]["required_state"] = [
|
||||
[EventTypes.Member, StateValues.LAZY],
|
||||
]
|
||||
|
||||
# Send a message in room1 from user2
|
||||
self.helper.send(room_id1, "2", tok=user2_tok)
|
||||
response_body, from_token = self.do_sync(
|
||||
sync_body, since=from_token, tok=user1_tok
|
||||
)
|
||||
|
||||
# We should see user2's membership as it's in the timeline
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Member, user2_id)],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Now send a message in room1 from user1
|
||||
self.helper.send(room_id1, "3", tok=user1_tok)
|
||||
|
||||
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||
|
||||
# We should not see any memberships as we've already seen user1's
|
||||
# membership.
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1].get("required_state", []),
|
||||
[],
|
||||
exact=True,
|
||||
)
|
||||
|
||||
def test_rooms_required_state_me(self) -> None:
|
||||
"""
|
||||
Test `rooms.required_state` correctly handles $ME.
|
||||
@@ -1686,3 +2113,135 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
|
||||
# We should not see the room name again, as we have already sent that
|
||||
# down.
|
||||
self.assertIsNone(response_body["rooms"][room_id1].get("required_state"))
|
||||
|
||||
def test_lazy_loading_room_members_state_reset_non_limited_timeline(self) -> None:
|
||||
"""Test that when using lazy-loaded members, if a membership state is
|
||||
reset to a previous state and the sync is not limited, then we send down
|
||||
the state reset.
|
||||
|
||||
Regression test as previously we only returned membership relevant to
|
||||
the timeline and so did not tell clients about state resets for
|
||||
users who did not send any timeline events.
|
||||
"""
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
room_id = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
|
||||
content = self.helper.join(room_id, user1_id, tok=user1_tok)
|
||||
first_event_id = content["event_id"]
|
||||
|
||||
# Send a message so that the user1 membership comes down sync (because we're lazy-loading room members)
|
||||
self.helper.send(room_id, "msg", tok=user1_tok)
|
||||
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [
|
||||
[EventTypes.Member, StateValues.LAZY],
|
||||
],
|
||||
"timeline_limit": 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# Check that user1 is returned
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id)
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Member, user1_id)],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# user1 changes their display name
|
||||
content = self.helper.send_state(
|
||||
room_id,
|
||||
EventTypes.Member,
|
||||
body={"membership": "join", "displayname": "New display name"},
|
||||
state_key=user1_id,
|
||||
tok=user1_tok,
|
||||
)
|
||||
second_event_id = content["event_id"]
|
||||
|
||||
response_body, from_token = self.do_sync(
|
||||
sync_body, since=from_token, tok=user1_tok
|
||||
)
|
||||
|
||||
# We should see the updated membership state
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id)
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Member, user1_id)],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
self.assertEqual(
|
||||
response_body["rooms"][room_id]["required_state"][0]["event_id"],
|
||||
second_event_id,
|
||||
)
|
||||
|
||||
# Now, fake a reset the membership state to the first event
|
||||
persist_event_store = self.hs.get_datastores().persist_events
|
||||
assert persist_event_store is not None
|
||||
|
||||
self.get_success(
|
||||
persist_event_store.update_current_state(
|
||||
room_id,
|
||||
DeltaState(
|
||||
to_insert={(EventTypes.Member, user1_id): first_event_id},
|
||||
to_delete=[],
|
||||
),
|
||||
# We don't need to worry about sliding sync changes for this test
|
||||
SlidingSyncTableChanges(
|
||||
room_id=room_id,
|
||||
joined_room_bump_stamp_to_fully_insert=None,
|
||||
joined_room_updates={},
|
||||
membership_snapshot_shared_insert_values={},
|
||||
to_insert_membership_snapshots=[],
|
||||
to_delete_membership_snapshots=[],
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
# Send a message from *user2* so that user1 wouldn't normally get
|
||||
# synced.
|
||||
self.helper.send(room_id, "msg2", tok=user2_tok)
|
||||
|
||||
response_body, from_token = self.do_sync(
|
||||
sync_body, since=from_token, tok=user1_tok
|
||||
)
|
||||
|
||||
# This should be a non-limited sync as there is only one timeline event
|
||||
# (<= `timeline_limit). This is important as we're specifically testing the non-`limited`
|
||||
# timeline scenario. And for reference, we don't send down state resets
|
||||
# on limited timelines when using lazy loaded memberships.
|
||||
self.assertFalse(
|
||||
response_body["rooms"][room_id].get("limited", False),
|
||||
"Expected a non-limited timeline",
|
||||
)
|
||||
|
||||
# We should see the reset membership state of user1
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id)
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Member, user1_id)],
|
||||
},
|
||||
)
|
||||
self.assertEqual(
|
||||
response_body["rooms"][room_id]["required_state"][0]["event_id"],
|
||||
first_event_id,
|
||||
)
|
||||
|
||||
@@ -257,7 +257,7 @@ class SlidingSyncBase(unittest.HomeserverTestCase):
|
||||
invitee_user_id: str,
|
||||
unsigned_invite_room_state: list[StrippedStateEvent] | None,
|
||||
invite_room_id: str | None = None,
|
||||
) -> str:
|
||||
) -> tuple[str, EventBase]:
|
||||
"""
|
||||
Create a fake invite for a remote room and persist it.
|
||||
|
||||
@@ -323,11 +323,13 @@ class SlidingSyncBase(unittest.HomeserverTestCase):
|
||||
context = EventContext.for_outlier(self.hs.get_storage_controllers())
|
||||
persist_controller = self.hs.get_storage_controllers().persistence
|
||||
assert persist_controller is not None
|
||||
self.get_success(persist_controller.persist_event(invite_event, context))
|
||||
persisted_event, _, _ = self.get_success(
|
||||
persist_controller.persist_event(invite_event, context)
|
||||
)
|
||||
|
||||
self._remote_invite_count += 1
|
||||
|
||||
return invite_room_id
|
||||
return invite_room_id, persisted_event
|
||||
|
||||
def _bump_notifier_wait_for_events(
|
||||
self,
|
||||
@@ -763,7 +765,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
# Create a remote room invite (out-of-band membership)
|
||||
room_id = self._create_remote_invite_room_for_user(user1_id, None)
|
||||
room_id, _ = self._create_remote_invite_room_for_user(user1_id, None)
|
||||
|
||||
# Make the Sliding Sync request
|
||||
sync_body = {
|
||||
|
||||
@@ -55,12 +55,16 @@ class UserMutualRoomsTest(unittest.HomeserverTestCase):
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.store = hs.get_datastores().main
|
||||
mutual_rooms.MUTUAL_ROOMS_BATCH_LIMIT = 10
|
||||
|
||||
def _get_mutual_rooms(self, token: str, other_user: str) -> FakeChannel:
|
||||
def _get_mutual_rooms(
|
||||
self, token: str, other_user: str, since_token: str | None = None
|
||||
) -> FakeChannel:
|
||||
return self.make_request(
|
||||
"GET",
|
||||
"/_matrix/client/unstable/uk.half-shot.msc2666/user/mutual_rooms"
|
||||
f"?user_id={quote(other_user)}",
|
||||
f"?user_id={quote(other_user)}"
|
||||
+ (f"&from={quote(since_token)}" if since_token else ""),
|
||||
access_token=token,
|
||||
)
|
||||
|
||||
@@ -141,6 +145,52 @@ class UserMutualRoomsTest(unittest.HomeserverTestCase):
|
||||
for room_id_id in channel.json_body["joined"]:
|
||||
self.assertIn(room_id_id, [room_id_one, room_id_two])
|
||||
|
||||
def _create_rooms_for_pagination_test(
|
||||
self, count: int
|
||||
) -> tuple[str, str, list[str]]:
|
||||
u1 = self.register_user("user1", "pass")
|
||||
u1_token = self.login(u1, "pass")
|
||||
u2 = self.register_user("user2", "pass")
|
||||
u2_token = self.login(u2, "pass")
|
||||
room_ids = []
|
||||
for i in range(count):
|
||||
room_id = self.helper.create_room_as(u1, is_public=i % 2 == 0, tok=u1_token)
|
||||
self.helper.invite(room_id, src=u1, targ=u2, tok=u1_token)
|
||||
self.helper.join(room_id, user=u2, tok=u2_token)
|
||||
room_ids.append(room_id)
|
||||
room_ids.sort()
|
||||
return u1_token, u2, room_ids
|
||||
|
||||
def test_shared_room_list_pagination_two_pages(self) -> None:
|
||||
u1_token, u2, room_ids = self._create_rooms_for_pagination_test(15)
|
||||
|
||||
channel = self._get_mutual_rooms(u1_token, u2)
|
||||
self.assertEqual(200, channel.code, channel.result)
|
||||
self.assertEqual(channel.json_body["joined"], room_ids[0:10])
|
||||
self.assertIn("next_batch", channel.json_body)
|
||||
|
||||
channel = self._get_mutual_rooms(u1_token, u2, channel.json_body["next_batch"])
|
||||
self.assertEqual(200, channel.code, channel.result)
|
||||
self.assertEqual(channel.json_body["joined"], room_ids[10:20])
|
||||
self.assertNotIn("next_batch", channel.json_body)
|
||||
|
||||
def test_shared_room_list_pagination_one_page(self) -> None:
|
||||
u1_token, u2, room_ids = self._create_rooms_for_pagination_test(10)
|
||||
|
||||
channel = self._get_mutual_rooms(u1_token, u2)
|
||||
self.assertEqual(200, channel.code, channel.result)
|
||||
self.assertEqual(channel.json_body["joined"], room_ids)
|
||||
self.assertNotIn("next_batch", channel.json_body)
|
||||
|
||||
def test_shared_room_list_pagination_invalid_token(self) -> None:
|
||||
u1_token, u2, room_ids = self._create_rooms_for_pagination_test(10)
|
||||
|
||||
channel = self._get_mutual_rooms(u1_token, u2, "!<>##faketoken")
|
||||
self.assertEqual(400, channel.code, channel.result)
|
||||
self.assertEqual(
|
||||
"M_INVALID_PARAM", channel.json_body["errcode"], channel.result
|
||||
)
|
||||
|
||||
def test_shared_room_list_after_leave(self) -> None:
|
||||
"""
|
||||
A room should no longer be considered shared if the other
|
||||
@@ -172,3 +222,14 @@ class UserMutualRoomsTest(unittest.HomeserverTestCase):
|
||||
channel = self._get_mutual_rooms(u2_token, u1)
|
||||
self.assertEqual(200, channel.code, channel.result)
|
||||
self.assertEqual(len(channel.json_body["joined"]), 0)
|
||||
|
||||
def test_shared_room_list_nonexistent_user(self) -> None:
|
||||
u1 = self.register_user("user1", "pass")
|
||||
u1_token = self.login(u1, "pass")
|
||||
|
||||
# Check shared rooms from user1's perspective.
|
||||
# We should see the one room in common
|
||||
channel = self._get_mutual_rooms(u1_token, "@meow:example.com")
|
||||
self.assertEqual(200, channel.code, channel.result)
|
||||
self.assertEqual(len(channel.json_body["joined"]), 0)
|
||||
self.assertNotIn("next_batch", channel.json_body)
|
||||
|
||||
@@ -147,7 +147,7 @@ class FakeChannel:
|
||||
_reactor: MemoryReactorClock
|
||||
result: dict = attr.Factory(dict)
|
||||
_ip: str = "127.0.0.1"
|
||||
_producer: IPullProducer | IPushProducer | None = None
|
||||
_producer: Optional[Union[IPullProducer, IPushProducer]] = None
|
||||
resource_usage: ContextResourceUsage | None = None
|
||||
_request: Request | None = None
|
||||
|
||||
@@ -248,7 +248,7 @@ class FakeChannel:
|
||||
# TODO This should ensure that the IProducer is an IPushProducer or
|
||||
# IPullProducer, unfortunately twisted.protocols.basic.FileSender does
|
||||
# implement those, but doesn't declare it.
|
||||
self._producer = cast(IPushProducer | IPullProducer, producer)
|
||||
self._producer = cast(Union[IPushProducer, IPullProducer], producer)
|
||||
self.producerStreaming = streaming
|
||||
|
||||
def _produce() -> None:
|
||||
@@ -852,7 +852,7 @@ class FakeTransport:
|
||||
"""Test reactor
|
||||
"""
|
||||
|
||||
_protocol: IProtocol | None = None
|
||||
_protocol: Optional[IProtocol] = None
|
||||
"""The Protocol which is producing data for this transport. Optional, but if set
|
||||
will get called back for connectionLost() notifications etc.
|
||||
"""
|
||||
@@ -871,7 +871,7 @@ class FakeTransport:
|
||||
disconnected = False
|
||||
connected = True
|
||||
buffer: bytes = b""
|
||||
producer: IPushProducer | None = None
|
||||
producer: Optional[IPushProducer] = None
|
||||
autoflush: bool = True
|
||||
|
||||
def getPeer(self) -> IPv4Address | IPv6Address:
|
||||
@@ -1073,7 +1073,7 @@ def setup_test_homeserver(
|
||||
cleanup_func: Callable[[Callable[[], Optional["Deferred[None]"]]], None],
|
||||
server_name: str = "test",
|
||||
config: HomeServerConfig | None = None,
|
||||
reactor: ISynapseReactor | None = None,
|
||||
reactor: Optional[ISynapseReactor] = None,
|
||||
homeserver_to_use: type[HomeServer] = TestHomeServer,
|
||||
db_txn_limit: int | None = None,
|
||||
**extra_homeserver_attributes: Any,
|
||||
|
||||
@@ -30,19 +30,23 @@ from synapse.api.room_versions import RoomVersions
|
||||
from synapse.events import EventBase, StrippedStateEvent, make_event_from_dict
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client import login, room
|
||||
from synapse.rest.client import login, room, sync
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.databases.main.events import DeltaState
|
||||
from synapse.storage.databases.main.events_bg_updates import (
|
||||
_resolve_stale_data_in_sliding_sync_joined_rooms_table,
|
||||
_resolve_stale_data_in_sliding_sync_membership_snapshots_table,
|
||||
)
|
||||
from synapse.types import create_requester
|
||||
from synapse.types import SlidingSyncStreamToken, create_requester
|
||||
from synapse.types.handlers.sliding_sync import (
|
||||
LAZY_MEMBERS_UPDATE_INTERVAL,
|
||||
StateValues,
|
||||
)
|
||||
from synapse.types.storage import _BackgroundUpdates
|
||||
from synapse.util.clock import Clock
|
||||
|
||||
from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase
|
||||
from tests.test_utils.event_injection import create_event
|
||||
from tests.unittest import HomeserverTestCase
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -86,7 +90,7 @@ class _SlidingSyncMembershipSnapshotResult:
|
||||
forgotten: bool = False
|
||||
|
||||
|
||||
class SlidingSyncTablesTestCaseBase(HomeserverTestCase):
|
||||
class SlidingSyncTablesTestCaseBase(SlidingSyncBase):
|
||||
"""
|
||||
Helpers to deal with testing that the
|
||||
`sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` database tables are
|
||||
@@ -97,6 +101,7 @@ class SlidingSyncTablesTestCaseBase(HomeserverTestCase):
|
||||
admin.register_servlets,
|
||||
login.register_servlets,
|
||||
room.register_servlets,
|
||||
sync.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
@@ -202,78 +207,6 @@ class SlidingSyncTablesTestCaseBase(HomeserverTestCase):
|
||||
for row in rows
|
||||
}
|
||||
|
||||
_remote_invite_count: int = 0
|
||||
|
||||
def _create_remote_invite_room_for_user(
|
||||
self,
|
||||
invitee_user_id: str,
|
||||
unsigned_invite_room_state: list[StrippedStateEvent] | None,
|
||||
) -> tuple[str, EventBase]:
|
||||
"""
|
||||
Create a fake invite for a remote room and persist it.
|
||||
|
||||
We don't have any state for these kind of rooms and can only rely on the
|
||||
stripped state included in the unsigned portion of the invite event to identify
|
||||
the room.
|
||||
|
||||
Args:
|
||||
invitee_user_id: The person being invited
|
||||
unsigned_invite_room_state: List of stripped state events to assist the
|
||||
receiver in identifying the room.
|
||||
|
||||
Returns:
|
||||
The room ID of the remote invite room and the persisted remote invite event.
|
||||
"""
|
||||
invite_room_id = f"!test_room{self._remote_invite_count}:remote_server"
|
||||
|
||||
invite_event_dict = {
|
||||
"room_id": invite_room_id,
|
||||
"sender": "@inviter:remote_server",
|
||||
"state_key": invitee_user_id,
|
||||
"depth": 1,
|
||||
"origin_server_ts": 1,
|
||||
"type": EventTypes.Member,
|
||||
"content": {"membership": Membership.INVITE},
|
||||
"auth_events": [],
|
||||
"prev_events": [],
|
||||
}
|
||||
if unsigned_invite_room_state is not None:
|
||||
serialized_stripped_state_events = []
|
||||
for stripped_event in unsigned_invite_room_state:
|
||||
serialized_stripped_state_events.append(
|
||||
{
|
||||
"type": stripped_event.type,
|
||||
"state_key": stripped_event.state_key,
|
||||
"sender": stripped_event.sender,
|
||||
"content": stripped_event.content,
|
||||
}
|
||||
)
|
||||
|
||||
invite_event_dict["unsigned"] = {
|
||||
"invite_room_state": serialized_stripped_state_events
|
||||
}
|
||||
|
||||
invite_event = make_event_from_dict(
|
||||
invite_event_dict,
|
||||
room_version=RoomVersions.V10,
|
||||
)
|
||||
invite_event.internal_metadata.outlier = True
|
||||
invite_event.internal_metadata.out_of_band_membership = True
|
||||
|
||||
self.get_success(
|
||||
self.store.maybe_store_room_on_outlier_membership(
|
||||
room_id=invite_room_id, room_version=invite_event.room_version
|
||||
)
|
||||
)
|
||||
context = EventContext.for_outlier(self.hs.get_storage_controllers())
|
||||
persisted_event, _, _ = self.get_success(
|
||||
self.persist_controller.persist_event(invite_event, context)
|
||||
)
|
||||
|
||||
self._remote_invite_count += 1
|
||||
|
||||
return invite_room_id, persisted_event
|
||||
|
||||
def _retract_remote_invite_for_user(
|
||||
self,
|
||||
user_id: str,
|
||||
@@ -3052,6 +2985,141 @@ class SlidingSyncTablesTestCase(SlidingSyncTablesTestCaseBase):
|
||||
exact=True,
|
||||
)
|
||||
|
||||
def test_lazy_loading_room_members_last_seen_ts(self) -> None:
|
||||
"""Test that the `last_seen_ts` column in
|
||||
`sliding_sync_connection_lazy_members` is correctly kept up to date.
|
||||
|
||||
We expect that it only gets updated every
|
||||
`LAZY_MEMBERS_UPDATE_INTERVAL`, rather than on every sync.
|
||||
"""
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
room_id = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
|
||||
self.helper.join(room_id, user1_id, tok=user1_tok)
|
||||
|
||||
# Send a message so that user1 comes down sync.
|
||||
self.helper.send(room_id, "msg", tok=user1_tok)
|
||||
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [
|
||||
[EventTypes.Member, StateValues.LAZY],
|
||||
],
|
||||
"timeline_limit": 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# Check that user1 is returned
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id)
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Member, user1_id)],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Check that we have an entry in sliding_sync_connection_lazy_members
|
||||
connection_pos1 = self.get_success(
|
||||
SlidingSyncStreamToken.from_string(self.store, from_token)
|
||||
).connection_position
|
||||
lazy_member_entries = self.get_success(
|
||||
self.store.get_sliding_sync_connection_lazy_members(
|
||||
connection_pos1, room_id, {user1_id}
|
||||
)
|
||||
)
|
||||
self.assertIn(user1_id, lazy_member_entries)
|
||||
|
||||
prev_timestamp = lazy_member_entries[user1_id]
|
||||
|
||||
# If user1 sends a message then we consider it for lazy loading. We have
|
||||
# previously returned it so we don't send the state down again, but it
|
||||
# is still eligible for updating the timestamp. Since we last updated
|
||||
# the timestamp within the last `LAZY_MEMBERS_UPDATE_INTERVAL`, we do not
|
||||
# update it.
|
||||
self.helper.send(room_id, "msg2", tok=user1_tok)
|
||||
|
||||
response_body, from_token = self.do_sync(
|
||||
sync_body, since=from_token, tok=user1_tok
|
||||
)
|
||||
|
||||
# We expect the required_state map to be empty as nothing has changed.
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id)
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id].get("required_state", []),
|
||||
{},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
connection_pos2 = self.get_success(
|
||||
SlidingSyncStreamToken.from_string(self.store, from_token)
|
||||
).connection_position
|
||||
|
||||
lazy_member_entries = self.get_success(
|
||||
self.store.get_sliding_sync_connection_lazy_members(
|
||||
connection_pos2, room_id, {user1_id}
|
||||
)
|
||||
)
|
||||
|
||||
# The timestamp should be unchanged.
|
||||
self.assertEqual(lazy_member_entries[user1_id], prev_timestamp)
|
||||
|
||||
# Now advance the time by `LAZY_MEMBERS_UPDATE_INTERVAL` so that we
|
||||
# would update the timestamp.
|
||||
self.reactor.advance(LAZY_MEMBERS_UPDATE_INTERVAL.as_secs())
|
||||
|
||||
# Send a message from user2
|
||||
self.helper.send(room_id, "msg3", tok=user2_tok)
|
||||
|
||||
response_body, from_token = self.do_sync(
|
||||
sync_body, since=from_token, tok=user1_tok
|
||||
)
|
||||
|
||||
connection_pos3 = self.get_success(
|
||||
SlidingSyncStreamToken.from_string(self.store, from_token)
|
||||
).connection_position
|
||||
|
||||
lazy_member_entries = self.get_success(
|
||||
self.store.get_sliding_sync_connection_lazy_members(
|
||||
connection_pos3, room_id, {user1_id}
|
||||
)
|
||||
)
|
||||
|
||||
# The timestamp for user1 should be unchanged, as they were not sent down.
|
||||
self.assertEqual(lazy_member_entries[user1_id], prev_timestamp)
|
||||
|
||||
# Now if user1 sends a message, then the timestamp should be updated as
|
||||
# its been over `LAZY_MEMBERS_UPDATE_INTERVAL` since we last updated it.
|
||||
# (Even though we don't send the state down again).
|
||||
self.helper.send(room_id, "msg4", tok=user1_tok)
|
||||
|
||||
response_body, from_token = self.do_sync(
|
||||
sync_body, since=from_token, tok=user1_tok
|
||||
)
|
||||
connection_pos4 = self.get_success(
|
||||
SlidingSyncStreamToken.from_string(self.store, from_token)
|
||||
).connection_position
|
||||
|
||||
lazy_member_entries = self.get_success(
|
||||
self.store.get_sliding_sync_connection_lazy_members(
|
||||
connection_pos4, room_id, {user1_id}
|
||||
)
|
||||
)
|
||||
# The timestamp for user1 should be updated.
|
||||
self.assertGreater(lazy_member_entries[user1_id], prev_timestamp)
|
||||
|
||||
|
||||
class SlidingSyncTablesBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase):
|
||||
"""
|
||||
|
||||
@@ -37,6 +37,7 @@ from typing import (
|
||||
Iterable,
|
||||
Mapping,
|
||||
NoReturn,
|
||||
Optional,
|
||||
Protocol,
|
||||
TypeVar,
|
||||
)
|
||||
@@ -636,7 +637,7 @@ class HomeserverTestCase(TestCase):
|
||||
self,
|
||||
server_name: str | None = None,
|
||||
config: JsonDict | None = None,
|
||||
reactor: ISynapseReactor | None = None,
|
||||
reactor: Optional[ISynapseReactor] = None,
|
||||
clock: Clock | None = None,
|
||||
**extra_homeserver_attributes: Any,
|
||||
) -> HomeServer:
|
||||
|
||||
@@ -198,7 +198,9 @@ def default_config(
|
||||
"rc_invites": {
|
||||
"per_room": {"per_second": 10000, "burst_count": 10000},
|
||||
"per_user": {"per_second": 10000, "burst_count": 10000},
|
||||
"per_issuer": {"per_second": 10000, "burst_count": 10000},
|
||||
},
|
||||
"rc_room_creation": {"per_second": 10000, "burst_count": 10000},
|
||||
"rc_3pid_validation": {"per_second": 10000, "burst_count": 10000},
|
||||
"rc_presence": {"per_user": {"per_second": 10000, "burst_count": 10000}},
|
||||
"saml2_enabled": False,
|
||||
|
||||
Reference in New Issue
Block a user