Compare commits

..

5 Commits

Author SHA1 Message Date
Andrew Morgan
6c292dc4ee 1.138.2 2025-09-24 12:26:49 +01:00
Andrew Morgan
120389b077 Note ubuntu release support update in the upgrade notes 2025-09-24 12:25:41 +01:00
Andrew Morgan
71b34b3a07 Drop support for Ubuntu 24.10 'Oracular Oriole', add support for Ubuntu 25.04 'Plucky Puffin' (#18962) 2025-09-24 12:24:32 +01:00
Andrew Morgan
0fbf296c99 1.138.1 2025-09-24 11:32:48 +01:00
Andrew Morgan
0c8594c9a8 Fix performance regression related to delayed events processing (#18926) 2025-09-24 11:30:47 +01:00
85 changed files with 1128 additions and 1016 deletions

View File

@@ -24,7 +24,7 @@ jobs:
mdbook-version: '0.4.17'
- name: Setup python
uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: "3.x"

View File

@@ -64,7 +64,7 @@ jobs:
run: echo 'window.SYNAPSE_VERSION = "${{ needs.pre.outputs.branch-version }}";' > ./docs/website_files/version.js
- name: Setup python
uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: "3.x"

View File

@@ -93,7 +93,7 @@ jobs:
-e POSTGRES_PASSWORD=postgres \
-e POSTGRES_INITDB_ARGS="--lc-collate C --lc-ctype C --encoding UTF8" \
postgres:${{ matrix.postgres-version }}
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: "3.x"
- run: pip install .[all,test]
@@ -209,7 +209,7 @@ jobs:
- name: Prepare Complement's Prerequisites
run: synapse/.ci/scripts/setup_complement_prerequisites.sh
- uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0
- uses: actions/setup-go@d35c59abb061a4a6fb18e82ac0862c26744d6ab5 # v5.5.0
with:
cache-dependency-path: complement/go.sum
go-version-file: complement/go.mod

View File

@@ -17,7 +17,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: '3.x'
- run: pip install tomli

View File

@@ -28,7 +28,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: "3.x"
- id: set-distros
@@ -74,7 +74,7 @@ jobs:
${{ runner.os }}-buildx-
- name: Set up python
uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: "3.x"
@@ -134,7 +134,7 @@ jobs:
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
# setup-python@v4 doesn't impose a default python version. Need to use 3.x
# here, because `python` on osx points to Python 2.7.
@@ -166,7 +166,7 @@ jobs:
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: "3.10"

View File

@@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: "3.x"
- name: Install check-jsonschema
@@ -41,7 +41,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: "3.x"
- name: Install PyYAML

View File

@@ -107,7 +107,7 @@ jobs:
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: "3.x"
- run: "pip install 'click==8.1.1' 'GitPython>=3.1.20'"
@@ -117,7 +117,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: "3.x"
- run: .ci/scripts/check_lockfile.py
@@ -199,7 +199,7 @@ jobs:
with:
ref: ${{ github.event.pull_request.head.sha }}
fetch-depth: 0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: "3.x"
- run: "pip install 'towncrier>=18.6.0rc1'"
@@ -327,7 +327,7 @@ jobs:
if: ${{ needs.changes.outputs.linting_readme == 'true' }}
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: "3.x"
- run: "pip install rstcheck"
@@ -377,7 +377,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: "3.x"
- id: get-matrix
@@ -468,7 +468,7 @@ jobs:
sudo apt-get -qq install build-essential libffi-dev python3-dev \
libxml2-dev libxslt-dev xmlsec1 zlib1g-dev libjpeg-dev libwebp-dev
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: '3.9'
@@ -727,7 +727,7 @@ jobs:
- name: Prepare Complement's Prerequisites
run: synapse/.ci/scripts/setup_complement_prerequisites.sh
- uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0
- uses: actions/setup-go@d35c59abb061a4a6fb18e82ac0862c26744d6ab5 # v5.5.0
with:
cache-dependency-path: complement/go.sum
go-version-file: complement/go.mod

View File

@@ -182,7 +182,7 @@ jobs:
- name: Prepare Complement's Prerequisites
run: synapse/.ci/scripts/setup_complement_prerequisites.sh
- uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0
- uses: actions/setup-go@d35c59abb061a4a6fb18e82ac0862c26744d6ab5 # v5.5.0
with:
cache-dependency-path: complement/go.sum
go-version-file: complement/go.mod

View File

@@ -1,3 +1,21 @@
# Synapse 1.138.2 (2025-09-24)
## Internal Changes
- Drop support for Ubuntu 24.10 Oracular Oriole, and add support for Ubuntu 25.04 Plucky Puffin. ([\#18962](https://github.com/element-hq/synapse/issues/18962))
# Synapse 1.138.1 (2025-09-24)
## Bugfixes
- Fix a performance regression related to the experimental Delayed Events ([MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140)) feature. ([\#18926](https://github.com/element-hq/synapse/issues/18926))
# Synapse 1.138.0 (2025-09-09)
No significant changes since 1.138.0rc1.

4
Cargo.lock generated
View File

@@ -753,9 +753,9 @@ checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956"
[[package]]
name = "log"
version = "0.4.28"
version = "0.4.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432"
checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
[[package]]
name = "lru-slab"

View File

@@ -1 +0,0 @@
Remove obsolete and experimental `/sync/e2ee` endpoint.

View File

@@ -1 +0,0 @@
Fix `LaterGauge` metrics to collect from all servers.

View File

@@ -1 +0,0 @@
Configure Synapse to run MSC4306: Thread Subscriptions Complement tests.

View File

@@ -1 +0,0 @@
Fix bug where we did not send invite revocations over federation.

View File

@@ -1 +0,0 @@
Update push rules for experimental [MSC4306: Thread Subscriptions](https://github.com/matrix-org/matrix-doc/issues/4306) to follow newer draft.

View File

@@ -1 +0,0 @@
Use the `Enum`'s value for the dictionary key when responding to an admin request for experimental features.

View File

@@ -1 +0,0 @@
Fix prefixed support for MSC4133.

View File

@@ -1 +0,0 @@
Suppress "Applying schema" log noise bulk when `SYNAPSE_LOG_TESTING` is set.

View File

@@ -1 +0,0 @@
Start background tasks after we fork the process (daemonize).

View File

@@ -1 +0,0 @@
Better explain how we manage the logcontext in `run_in_background(...)` and `run_as_background_process(...)`.

View File

@@ -1 +0,0 @@
Replace usages of the deprecated `pkg_resources` interface in preparation of setuptools dropping it soon.

12
debian/changelog vendored
View File

@@ -1,3 +1,15 @@
matrix-synapse-py3 (1.138.2) stable; urgency=medium
* New Synapse release 1.138.2.
-- Synapse Packaging team <packages@matrix.org> Wed, 24 Sep 2025 12:26:16 +0100
matrix-synapse-py3 (1.138.1) stable; urgency=medium
* New Synapse release 1.138.1.
-- Synapse Packaging team <packages@matrix.org> Wed, 24 Sep 2025 11:32:38 +0100
matrix-synapse-py3 (1.138.0) stable; urgency=medium
* New Synapse release 1.138.0.

View File

@@ -133,8 +133,6 @@ experimental_features:
msc3984_appservice_key_query: true
# Invite filtering
msc4155_enabled: true
# Thread Subscriptions
msc4306_enabled: true
server_notices:
system_mxid_localpart: _server

View File

@@ -77,13 +77,6 @@ loggers:
#}
synapse.visibility.filtered_event_debug:
level: DEBUG
{#
If Synapse is under test, we don't care about seeing the "Applying schema" log
lines at the INFO level every time we run the tests (it's 100 lines of bulk)
#}
synapse.storage.prepare_database:
level: WARN
{% endif %}
root:

View File

@@ -59,28 +59,6 @@ def do_request_handling():
logger.debug("phew")
```
### The `sentinel` context
The default logcontext is `synapse.logging.context.SENTINEL_CONTEXT`, which is an empty
sentinel value to represent the root logcontext. This is what is used when there is no
other logcontext set. The phrase "clear/reset the logcontext" means to set the current
logcontext to the `sentinel` logcontext.
No CPU/database usage metrics are recorded against the `sentinel` logcontext.
Ideally, nothing from the Synapse homeserver would be logged against the `sentinel`
logcontext as we want to know which server the logs came from. In practice, this is not
always the case yet especially outside of request handling.
Global things outside of Synapse (e.g. Twisted reactor code) should run in the
`sentinel` logcontext. It's only when it calls into application code that a logcontext
gets activated. This means the reactor should be started in the `sentinel` logcontext,
and any time an awaitable yields control back to the reactor, it should reset the
logcontext to be the `sentinel` logcontext. This is important to avoid leaking the
current logcontext to the reactor (which would then get picked up and associated with
the next thing the reactor does).
## Using logcontexts with awaitables
Awaitables break the linear flow of code so that there is no longer a single entry point

View File

@@ -35,7 +35,7 @@ handlers:
loggers:
synapse:
level: INFO
handlers: [file]
handlers: [remote]
synapse.storage.SQL:
level: WARNING
```

View File

@@ -117,6 +117,16 @@ each upgrade are complete before moving on to the next upgrade, to avoid
stacking them up. You can monitor the currently running background updates with
[the Admin API](usage/administration/admin_api/background_updates.html#status).
# Upgrading to v1.138.1
## Drop support for Ubuntu 24.10 Oracular Oriole, and add support for Ubuntu 25.04 Plucky Puffin
Ubuntu 24.10 Oracular Oriole [has been end-of-life since 10 Jul
2025](https://endoflife.date/ubuntu). This release drops support for Ubuntu
24.10, and in its place adds support for Ubuntu 25.04 Plucky Puffin.
This notice also applies to the v1.139.0 release.
# Upgrading to v1.136.0
## Deprecate `run_as_background_process` exported as part of the module API interface in favor of `ModuleApi.run_as_background_process`

26
poetry.lock generated
View File

@@ -919,14 +919,14 @@ i18n = ["Babel (>=2.7)"]
[[package]]
name = "jsonschema"
version = "4.25.1"
version = "4.25.0"
description = "An implementation of JSON Schema validation for Python"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "jsonschema-4.25.1-py3-none-any.whl", hash = "sha256:3fba0169e345c7175110351d456342c364814cfcf3b964ba4587f22915230a63"},
{file = "jsonschema-4.25.1.tar.gz", hash = "sha256:e4a9655ce0da0c0b67a085847e00a3a51449e1157f4f75e9fb5aa545e122eb85"},
{file = "jsonschema-4.25.0-py3-none-any.whl", hash = "sha256:24c2e8da302de79c8b9382fee3e76b355e44d2a4364bb207159ce10b517bd716"},
{file = "jsonschema-4.25.0.tar.gz", hash = "sha256:e63acf5c11762c0e6672ffb61482bdf57f0876684d8d249c0fe2d730d48bc55f"},
]
[package.dependencies]
@@ -1531,14 +1531,14 @@ files = [
[[package]]
name = "phonenumbers"
version = "9.0.13"
version = "9.0.12"
description = "Python version of Google's common library for parsing, formatting, storing and validating international phone numbers."
optional = false
python-versions = "*"
groups = ["main"]
files = [
{file = "phonenumbers-9.0.13-py2.py3-none-any.whl", hash = "sha256:b97661e177773e7509c6d503e0f537cd0af22aa3746231654590876eb9430915"},
{file = "phonenumbers-9.0.13.tar.gz", hash = "sha256:eca06e01382412c45316868f86a44bb217c02f9ee7196589041556a2f54a7639"},
{file = "phonenumbers-9.0.12-py2.py3-none-any.whl", hash = "sha256:900633afc3e12191458d710262df5efc117838bd1e2e613b64fa254a86bb20a1"},
{file = "phonenumbers-9.0.12.tar.gz", hash = "sha256:ccadff6b949494bd606836d8c9678bee5b55cb1cbad1e98bf7adae108e6fd0be"},
]
[[package]]
@@ -2747,14 +2747,14 @@ files = [
[[package]]
name = "towncrier"
version = "25.8.0"
version = "24.8.0"
description = "Building newsfiles for your project."
optional = false
python-versions = ">=3.9"
python-versions = ">=3.8"
groups = ["dev"]
files = [
{file = "towncrier-25.8.0-py3-none-any.whl", hash = "sha256:b953d133d98f9aeae9084b56a3563fd2519dfc6ec33f61c9cd2c61ff243fb513"},
{file = "towncrier-25.8.0.tar.gz", hash = "sha256:eef16d29f831ad57abb3ae32a0565739866219f1ebfbdd297d32894eb9940eb1"},
{file = "towncrier-24.8.0-py3-none-any.whl", hash = "sha256:9343209592b839209cdf28c339ba45792fbfe9775b5f9c177462fd693e127d8d"},
{file = "towncrier-24.8.0.tar.gz", hash = "sha256:013423ee7eed102b2f393c287d22d95f66f1a3ea10a4baa82d298001a7f18af3"},
]
[package.dependencies]
@@ -3011,14 +3011,14 @@ files = [
[[package]]
name = "types-requests"
version = "2.32.4.20250809"
version = "2.32.4.20250611"
description = "Typing stubs for requests"
optional = false
python-versions = ">=3.9"
groups = ["dev"]
files = [
{file = "types_requests-2.32.4.20250809-py3-none-any.whl", hash = "sha256:f73d1832fb519ece02c85b1f09d5f0dd3108938e7d47e7f94bbfa18a6782b163"},
{file = "types_requests-2.32.4.20250809.tar.gz", hash = "sha256:d8060de1c8ee599311f56ff58010fb4902f462a1470802cf9f6ed27bc46c4df3"},
{file = "types_requests-2.32.4.20250611-py3-none-any.whl", hash = "sha256:ad2fe5d3b0cb3c2c902c8815a70e7fb2302c4b8c1f77bdcd738192cdb3878072"},
{file = "types_requests-2.32.4.20250611.tar.gz", hash = "sha256:741c8777ed6425830bf51e54d6abe245f79b4dcb9019f1622b773463946bf826"},
]
[package.dependencies]

View File

@@ -101,7 +101,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry]
name = "matrix-synapse"
version = "1.138.0"
version = "1.138.2"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "AGPL-3.0-or-later"

View File

@@ -289,10 +289,10 @@ pub const BASE_APPEND_CONTENT_RULES: &[PushRule] = &[PushRule {
default_enabled: true,
}];
pub const BASE_APPEND_POSTCONTENT_RULES: &[PushRule] = &[
pub const BASE_APPEND_UNDERRIDE_RULES: &[PushRule] = &[
PushRule {
rule_id: Cow::Borrowed("global/postcontent/.io.element.msc4306.rule.unsubscribed_thread"),
priority_class: 6,
rule_id: Cow::Borrowed("global/content/.io.element.msc4306.rule.unsubscribed_thread"),
priority_class: 1,
conditions: Cow::Borrowed(&[Condition::Known(
KnownCondition::Msc4306ThreadSubscription { subscribed: false },
)]),
@@ -301,8 +301,8 @@ pub const BASE_APPEND_POSTCONTENT_RULES: &[PushRule] = &[
default_enabled: true,
},
PushRule {
rule_id: Cow::Borrowed("global/postcontent/.io.element.msc4306.rule.subscribed_thread"),
priority_class: 6,
rule_id: Cow::Borrowed("global/content/.io.element.msc4306.rule.subscribed_thread"),
priority_class: 1,
conditions: Cow::Borrowed(&[Condition::Known(
KnownCondition::Msc4306ThreadSubscription { subscribed: true },
)]),
@@ -310,9 +310,6 @@ pub const BASE_APPEND_POSTCONTENT_RULES: &[PushRule] = &[
default: true,
default_enabled: true,
},
];
pub const BASE_APPEND_UNDERRIDE_RULES: &[PushRule] = &[
PushRule {
rule_id: Cow::Borrowed("global/underride/.m.rule.call"),
priority_class: 1,
@@ -729,7 +726,6 @@ lazy_static! {
.iter()
.chain(BASE_APPEND_OVERRIDE_RULES.iter())
.chain(BASE_APPEND_CONTENT_RULES.iter())
.chain(BASE_APPEND_POSTCONTENT_RULES.iter())
.chain(BASE_APPEND_UNDERRIDE_RULES.iter())
.map(|rule| { (&*rule.rule_id, rule) })
.collect();

View File

@@ -527,7 +527,6 @@ impl PushRules {
.chain(base_rules::BASE_APPEND_OVERRIDE_RULES.iter())
.chain(self.content.iter())
.chain(base_rules::BASE_APPEND_CONTENT_RULES.iter())
.chain(base_rules::BASE_APPEND_POSTCONTENT_RULES.iter())
.chain(self.room.iter())
.chain(self.sender.iter())
.chain(self.underride.iter())

View File

@@ -32,7 +32,7 @@ DISTS = (
"debian:sid", # (rolling distro, no EOL)
"ubuntu:jammy", # 22.04 LTS (EOL 2027-04) (our EOL forced by Python 3.10 is 2026-10-04)
"ubuntu:noble", # 24.04 LTS (EOL 2029-06)
"ubuntu:oracular", # 24.10 (EOL 2025-07)
"ubuntu:plucky", # 25.04 (EOL 2026-01)
"debian:trixie", # (EOL not specified yet)
)

View File

@@ -230,7 +230,6 @@ test_packages=(
./tests/msc3967
./tests/msc4140
./tests/msc4155
./tests/msc4306
)
# Enable dirty runs, so tests will reuse the same container where possible.

View File

@@ -153,13 +153,9 @@ def get_registered_paths_for_default(
"""
hs = MockHomeserver(base_config, worker_app)
# TODO We only do this to avoid an error, but don't need the database etc
hs.setup()
registered_paths = get_registered_paths_for_hs(hs)
hs.cleanup()
return registered_paths
return get_registered_paths_for_hs(hs)
def elide_http_methods_if_unconflicting(

View File

@@ -99,7 +99,6 @@ from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
from synapse.types import ISynapseReactor
from synapse.util import SYNAPSE_VERSION, Clock
from synapse.util.stringutils import random_string
# Cast safety: Twisted does some naughty magic which replaces the
# twisted.internet.reactor module with a Reactor instance at runtime.
@@ -324,7 +323,6 @@ class MockHomeserver:
self.config = config
self.hostname = config.server.server_name
self.version_string = SYNAPSE_VERSION
self.instance_id = random_string(5)
def get_clock(self) -> Clock:
return self.clock
@@ -332,9 +330,6 @@ class MockHomeserver:
def get_reactor(self) -> ISynapseReactor:
return reactor
def get_instance_id(self) -> str:
return self.instance_id
def get_instance_name(self) -> str:
return "master"
@@ -690,15 +685,7 @@ class Porter:
)
prepare_database(db_conn, engine, config=self.hs_config)
# Type safety: ignore that we're using Mock homeservers here.
store = Store(
DatabasePool(
hs, # type: ignore[arg-type]
db_config,
engine,
),
db_conn,
hs, # type: ignore[arg-type]
)
store = Store(DatabasePool(hs, db_config, engine), db_conn, hs) # type: ignore[arg-type]
db_conn.commit()
return store

View File

@@ -120,13 +120,6 @@ def main() -> None:
# DB.
hs.setup()
# This will cause all of the relevant storage classes to be instantiated and call
# `register_background_update_handler(...)`,
# `register_background_index_update(...)`,
# `register_background_validate_constraint(...)`, etc so they are available to use
# if we are asked to run those background updates.
hs.get_storage_controllers()
if args.run_background_updates:
run_background_updates(hs)

View File

@@ -609,28 +609,10 @@ async def start(hs: "HomeServer") -> None:
setup_sentry(hs)
setup_sdnotify(hs)
# Register background tasks required by this server. This must be done
# somewhat manually due to the background tasks not being registered
# unless handlers are instantiated.
#
# While we could "start" these before the reactor runs, nothing will happen until
# the reactor is running, so we may as well do it here in `start`.
#
# Additionally, this means we also start them after we daemonize and fork the
# process which means we can avoid any potential problems with cputime metrics
# getting confused about the per-thread resource usage appearing to go backwards
# because we're comparing the resource usage (`rusage`) from the original process to
# the forked process.
# If background tasks are running on the main process or this is the worker in
# charge of them, start collecting the phone home stats and shared usage metrics.
if hs.config.worker.run_background_tasks:
hs.start_background_tasks()
# TODO: This should be moved to same pattern we use for other background tasks:
# Add to `REQUIRED_ON_BACKGROUND_TASK_STARTUP` and rely on
# `start_background_tasks` to start it.
await hs.get_common_usage_metrics_manager().setup()
# TODO: This feels like another pattern that should refactored as one of the
# `REQUIRED_ON_BACKGROUND_TASK_STARTUP`
start_phone_stats_home(hs)
# We now freeze all allocated objects in the hopes that (almost)

View File

@@ -22,7 +22,6 @@
import argparse
import errno
import importlib.resources as importlib_resources
import logging
import os
import re
@@ -47,6 +46,7 @@ from typing import (
import attr
import jinja2
import pkg_resources
import yaml
from synapse.types import StrSequence
@@ -174,8 +174,8 @@ class Config:
self.root = root_config
# Get the path to the default Synapse template directory
self.default_template_dir = str(
importlib_resources.files("synapse").joinpath("res").joinpath("templates")
self.default_template_dir = pkg_resources.resource_filename(
"synapse", "res/templates"
)
@staticmethod

View File

@@ -18,13 +18,13 @@
# [This file includes modifications made by New Vector Limited]
#
#
import importlib.resources as importlib_resources
import json
import re
from typing import Any, Dict, Iterable, List, Optional, Pattern
from urllib import parse as urlparse
import attr
import pkg_resources
from synapse.types import JsonDict, StrSequence
@@ -64,12 +64,7 @@ class OembedConfig(Config):
"""
# Whether to use the packaged providers.json file.
if not oembed_config.get("disable_default_providers") or False:
path = (
importlib_resources.files("synapse")
.joinpath("res")
.joinpath("providers.json")
)
with path.open("r", encoding="utf-8") as s:
with pkg_resources.resource_stream("synapse", "res/providers.json") as s:
providers = json.load(s)
yield from self._parse_and_validate_provider(

View File

@@ -37,7 +37,6 @@ Events are replicated via a separate events stream.
"""
import logging
from enum import Enum
from typing import (
TYPE_CHECKING,
Dict,
@@ -68,25 +67,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
class QueueNames(str, Enum):
PRESENCE_MAP = "presence_map"
KEYED_EDU = "keyed_edu"
KEYED_EDU_CHANGED = "keyed_edu_changed"
EDUS = "edus"
POS_TIME = "pos_time"
PRESENCE_DESTINATIONS = "presence_destinations"
queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {}
for queue_name in QueueNames:
queue_name_to_gauge_map[queue_name] = LaterGauge(
name=f"synapse_federation_send_queue_{queue_name.value}_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
class FederationRemoteSendQueue(AbstractFederationSender):
"""A drop in replacement for FederationSender"""
@@ -131,16 +111,23 @@ class FederationRemoteSendQueue(AbstractFederationSender):
# we make a new function, so we need to make a new function so the inner
# lambda binds to the queue rather than to the name of the queue which
# changes. ARGH.
def register(queue_name: QueueNames, queue: Sized) -> None:
queue_name_to_gauge_map[queue_name].register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(queue)},
def register(name: str, queue: Sized) -> None:
LaterGauge(
name="synapse_federation_send_queue_%s_size" % (queue_name,),
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(queue)},
)
for queue_name in QueueNames:
queue = getattr(self, queue_name.value)
assert isinstance(queue, Sized)
register(queue_name, queue=queue)
for queue_name in [
"presence_map",
"keyed_edu",
"keyed_edu_changed",
"edus",
"pos_time",
"presence_destinations",
]:
register(queue_name, getattr(self, queue_name))
self.clock.looping_call(self._clear_queue, 30 * 1000)

View File

@@ -150,7 +150,6 @@ from prometheus_client import Counter
from twisted.internet import defer
import synapse.metrics
from synapse.api.constants import EventTypes, Membership
from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
from synapse.federation.sender.per_destination_queue import (
@@ -200,24 +199,6 @@ sent_pdus_destination_dist_total = Counter(
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_destinations_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_pdus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_edus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
# Time (in s) to wait before trying to wake up destinations that have
# catch-up outstanding.
# Please note that rate limiting still applies, so while the loop is
@@ -417,9 +398,11 @@ class FederationSender(AbstractFederationSender):
# map from destination to PerDestinationQueue
self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
transaction_queue_pending_destinations_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum(
1
for d in self._per_destination_queues.values()
@@ -427,17 +410,22 @@ class FederationSender(AbstractFederationSender):
)
},
)
transaction_queue_pending_pdus_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum(
d.pending_pdu_count() for d in self._per_destination_queues.values()
)
},
)
transaction_queue_pending_edus_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum(
d.pending_edu_count() for d in self._per_destination_queues.values()
)
@@ -656,31 +644,6 @@ class FederationSender(AbstractFederationSender):
)
return
# If we've rescinded an invite then we want to tell the
# other server.
if (
event.type == EventTypes.Member
and event.membership == Membership.LEAVE
and event.sender != event.state_key
):
# We check if this leave event is rescinding an invite
# by looking if there is an invite event for the user in
# the auth events. It could otherwise be a kick or
# unban, which we don't want to send (if the user wasn't
# already in the room).
auth_events = await self.store.get_events_as_list(
event.auth_event_ids()
)
for auth_event in auth_events:
if (
auth_event.type == EventTypes.Member
and auth_event.state_key == event.state_key
and auth_event.membership == Membership.INVITE
):
destinations = set(destinations)
destinations.add(get_domain_from_id(event.state_key))
break
sharded_destinations = {
d
for d in destinations

View File

@@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, List, Optional, Set, Tuple
from twisted.internet.interfaces import IDelayedCall
from synapse.api.constants import EventTypes
from synapse.api.errors import ShadowBanError
from synapse.api.errors import ShadowBanError, SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
from synapse.logging.opentracing import set_tag
@@ -45,6 +45,7 @@ from synapse.types import (
)
from synapse.util.events import generate_fake_event_id
from synapse.util.metrics import Measure
from synapse.util.sentinel import Sentinel
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -146,10 +147,37 @@ class DelayedEventsHandler:
)
async def _unsafe_process_new_event(self) -> None:
# We purposefully fetch the current max room stream ordering before
# doing anything else, as it could increment duing processing of state
# deltas. We want to avoid updating `delayed_events_stream_pos` past
# the stream ordering of the state deltas we've processed. Otherwise
# we'll leave gaps in our processing.
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
# Check that there are actually any delayed events to process. If not, bail early.
delayed_events_count = await self._store.get_count_of_delayed_events()
if delayed_events_count == 0:
# There are no delayed events to process. Update the
# `delayed_events_stream_pos` to the latest `events` stream pos and
# exit early.
self._event_pos = room_max_stream_ordering
logger.debug(
"No delayed events to process. Updating `delayed_events_stream_pos` to max stream ordering (%s)",
room_max_stream_ordering,
)
await self._store.update_delayed_events_stream_pos(room_max_stream_ordering)
event_processing_positions.labels(
name="delayed_events", **{SERVER_NAME_LABEL: self.server_name}
).set(room_max_stream_ordering)
return
# If self._event_pos is None then means we haven't fetched it from the DB yet
if self._event_pos is None:
self._event_pos = await self._store.get_delayed_events_stream_pos()
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos > room_max_stream_ordering:
# apparently, we've processed more events than exist in the database!
# this can happen if events are removed with history purge or similar.
@@ -167,7 +195,7 @@ class DelayedEventsHandler:
self._clock, name="delayed_events_delta", server_name=self.server_name
):
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering:
if self._event_pos >= room_max_stream_ordering:
return
logger.debug(
@@ -202,23 +230,81 @@ class DelayedEventsHandler:
Process current state deltas to cancel other users' pending delayed events
that target the same state.
"""
# Get the senders of each delta's state event (as sender information is
# not currently stored in the `current_state_deltas` table).
event_id_and_sender_dict = await self._store.get_senders_for_event_ids(
[delta.event_id for delta in deltas if delta.event_id is not None]
)
# Note: No need to batch as `get_current_state_deltas` will only ever
# return 100 rows at a time.
for delta in deltas:
logger.debug(
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
)
# `delta.event_id` and `delta.sender` can be `None` in a few valid
# cases (see the docstring of
# `get_current_state_delta_membership_changes_for_user` for details).
if delta.event_id is None:
logger.debug(
"Not handling delta for deleted state: %r %r",
# TODO: Differentiate between this being caused by a state reset
# which removed a user from a room, or the homeserver
# purposefully having left the room. We can do so by checking
# whether there are any local memberships still left in the
# room. If so, then this is the result of a state reset.
#
# If it is a state reset, we should avoid cancelling new,
# delayed state events due to old state resurfacing. So we
# should skip and log a warning in this case.
#
# If the homeserver has left the room, then we should cancel all
# delayed state events intended for this room, as there is no
# need to try and send a delayed event into a room we've left.
logger.warning(
"Skipping state delta (%r, %r) without corresponding event ID. "
"This can happen if the homeserver has left the room (in which "
"case this can be ignored), or if there has been a state reset "
"which has caused the sender to be kicked out of the room",
delta.event_type,
delta.state_key,
)
continue
logger.debug(
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
sender_str = event_id_and_sender_dict.get(
delta.event_id, Sentinel.UNSET_SENTINEL
)
event = await self._store.get_event(delta.event_id, allow_none=True)
if not event:
if sender_str is None:
# An event exists, but the `sender` field was "null" and Synapse
# incorrectly accepted the event. This is not expected.
logger.error(
"Skipping state delta with event ID '%s' as 'sender' was None. "
"This is unexpected - please report it as a bug!",
delta.event_id,
)
continue
if sender_str is Sentinel.UNSET_SENTINEL:
# We have an event ID, but the event was not found in the
# datastore. This can happen if a room, or its history, is
# purged. State deltas related to the room are left behind, but
# the event no longer exists.
#
# As we cannot get the sender of this event, we can't calculate
# whether to cancel delayed events related to this one. So we skip.
logger.debug(
"Skipping state delta with event ID '%s' - the room, or its history, may have been purged",
delta.event_id,
)
continue
try:
sender = UserID.from_string(sender_str)
except SynapseError as e:
logger.error(
"Skipping state delta with Matrix User ID '%s' that failed to parse: %s",
sender_str,
e,
)
continue
sender = UserID.from_string(event.sender)
next_send_ts = await self._store.cancel_delayed_state_events(
room_id=delta.room_id,

View File

@@ -248,10 +248,9 @@ class FederationEventHandler:
self.room_queues[room_id].append((pdu, origin))
return
# If we're not in the room just ditch the event entirely (and not
# invited). This is probably an old server that has come back and thinks
# we're still in the room (or we've been rejoined to the room by a state
# reset).
# If we're not in the room just ditch the event entirely. This is
# probably an old server that has come back and thinks we're still in
# the room (or we've been rejoined to the room by a state reset).
#
# Note that if we were never in the room then we would have already
# dropped the event, since we wouldn't know the room version.
@@ -259,43 +258,6 @@ class FederationEventHandler:
room_id, self.server_name
)
if not is_in_room:
# Check if this is a leave event rescinding an invite
if (
pdu.type == EventTypes.Member
and pdu.membership == Membership.LEAVE
and pdu.state_key != pdu.sender
and self._is_mine_id(pdu.state_key)
):
(
membership,
membership_event_id,
) = await self._store.get_local_current_membership_for_user_in_room(
pdu.state_key, pdu.room_id
)
if (
membership == Membership.INVITE
and membership_event_id
and membership_event_id
in pdu.auth_event_ids() # The invite should be in the auth events of the rescission.
):
invite_event = await self._store.get_event(
membership_event_id, allow_none=True
)
# We cannot fully auth the rescission event, but we can
# check if the sender of the leave event is the same as the
# invite.
#
# Technically, a room admin could rescind the invite, but we
# have no way of knowing who is and isn't a room admin.
if invite_event and pdu.sender == invite_event.sender:
# Handle the rescission event
pdu.internal_metadata.outlier = True
pdu.internal_metadata.out_of_band_membership = True
context = EventContext.for_outlier(self._storage_controllers)
await self.persist_events_and_notify(room_id, [(pdu, context)])
return
logger.info(
"Ignoring PDU from %s as we're not in the room",
origin,

View File

@@ -173,18 +173,6 @@ state_transition_counter = Counter(
labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
)
presence_user_to_current_state_size_gauge = LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
presence_wheel_timer_size_gauge = LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
# "currently_active"
LAST_ACTIVE_GRANULARITY = 60 * 1000
@@ -791,9 +779,11 @@ class PresenceHandler(BasePresenceHandler):
EduTypes.PRESENCE, self.incoming_presence
)
presence_user_to_current_state_size_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(self.user_to_current_state)},
LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_current_state)},
)
# The per-device presence state, maps user to devices to per-device presence state.
@@ -892,9 +882,11 @@ class PresenceHandler(BasePresenceHandler):
60 * 1000,
)
presence_wheel_timer_size_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(self.wheel_timer)},
LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.wheel_timer)},
)
# Used to handle sending of presence to newly joined users/servers
@@ -1548,7 +1540,7 @@ class PresenceHandler(BasePresenceHandler):
self.clock, name="presence_delta", server_name=self.server_name
):
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering:
if self._event_pos >= room_max_stream_ordering:
return
logger.debug(

View File

@@ -13,7 +13,6 @@
#
import enum
import logging
from itertools import chain
from typing import (
@@ -75,6 +74,7 @@ from synapse.types.handlers.sliding_sync import (
)
from synapse.types.state import StateFilter
from synapse.util import MutableOverlayMapping
from synapse.util.sentinel import Sentinel
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -83,12 +83,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
class Sentinel(enum.Enum):
# defining a sentinel in this way allows mypy to correctly handle the
# type of a dictionary lookup and subsequent type narrowing.
UNSET_SENTINEL = object()
# Helper definition for the types that we might return. We do this to avoid
# copying data between types (which can be expensive for many rooms).
RoomsForUserType = Union[RoomsForUserStateReset, RoomsForUser, RoomsForUserSlidingSync]

View File

@@ -20,6 +20,7 @@
#
import itertools
import logging
from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
@@ -27,11 +28,14 @@ from typing import (
Dict,
FrozenSet,
List,
Literal,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
overload,
)
import attr
@@ -116,6 +120,25 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
SyncRequestKey = Tuple[Any, ...]
class SyncVersion(Enum):
"""
Enum for specifying the version of sync request. This is used to key which type of
sync response that we are generating.
This is different than the `sync_type` you might see used in other code below; which
specifies the sub-type sync request (e.g. initial_sync, full_state_sync,
incremental_sync) and is really only relevant for the `/sync` v2 endpoint.
"""
# These string values are semantically significant because they are used in the the
# metrics
# Traditional `/sync` endpoint
SYNC_V2 = "sync_v2"
# Part of MSC3575 Sliding Sync
E2EE_SYNC = "e2ee_sync"
@attr.s(slots=True, frozen=True, auto_attribs=True)
class SyncConfig:
user: UserID
@@ -285,6 +308,26 @@ class SyncResult:
)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class E2eeSyncResult:
"""
Attributes:
next_batch: Token for the next sync
to_device: List of direct messages for the device.
device_lists: List of user_ids whose devices have changed
device_one_time_keys_count: Dict of algorithm to count for one time keys
for this device
device_unused_fallback_key_types: List of key types that have an unused fallback
key
"""
next_batch: StreamToken
to_device: List[JsonDict]
device_lists: DeviceListUpdates
device_one_time_keys_count: JsonMapping
device_unused_fallback_key_types: List[str]
class SyncHandler:
def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
@@ -330,15 +373,52 @@ class SyncHandler:
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
@overload
async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.SYNC_V2],
request_key: SyncRequestKey,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> SyncResult:
) -> SyncResult: ...
@overload
async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.E2EE_SYNC],
request_key: SyncRequestKey,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> E2eeSyncResult: ...
@overload
async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SyncConfig,
sync_version: SyncVersion,
request_key: SyncRequestKey,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> Union[SyncResult, E2eeSyncResult]: ...
async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SyncConfig,
sync_version: SyncVersion,
request_key: SyncRequestKey,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
) -> Union[SyncResult, E2eeSyncResult]:
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
@@ -353,7 +433,8 @@ class SyncHandler:
full_state: Whether to return the full state for each room.
Returns:
returns a full `SyncResult`.
When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
When `SyncVersion.E2EE_SYNC`, returns a `E2eeSyncResult`.
"""
# If the user is not part of the mau group, then check that limits have
# not been exceeded (if not part of the group by this point, almost certain
@@ -365,6 +446,7 @@ class SyncHandler:
request_key,
self._wait_for_sync_for_user,
sync_config,
sync_version,
since_token,
timeout,
full_state,
@@ -373,14 +455,48 @@ class SyncHandler:
logger.debug("Returning sync response for %s", user_id)
return res
@overload
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.SYNC_V2],
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
cache_context: ResponseCacheContext[SyncRequestKey],
) -> SyncResult:
) -> SyncResult: ...
@overload
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.E2EE_SYNC],
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
cache_context: ResponseCacheContext[SyncRequestKey],
) -> E2eeSyncResult: ...
@overload
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
cache_context: ResponseCacheContext[SyncRequestKey],
) -> Union[SyncResult, E2eeSyncResult]: ...
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
cache_context: ResponseCacheContext[SyncRequestKey],
) -> Union[SyncResult, E2eeSyncResult]:
"""The start of the machinery that produces a /sync response.
See https://spec.matrix.org/v1.1/client-server-api/#syncing for full details.
@@ -401,7 +517,7 @@ class SyncHandler:
else:
sync_type = "incremental_sync"
sync_label = f"sync_v2:{sync_type}"
sync_label = f"{sync_version}:{sync_type}"
context = current_context()
if context:
@@ -462,15 +578,19 @@ class SyncHandler:
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result = await self.current_sync_for_user(
sync_config, since_token, full_state=full_state
result: Union[
SyncResult, E2eeSyncResult
] = await self.current_sync_for_user(
sync_config, sync_version, since_token, full_state=full_state
)
else:
# Otherwise, we wait for something to happen and report it to the user.
async def current_sync_callback(
before_token: StreamToken, after_token: StreamToken
) -> SyncResult:
return await self.current_sync_for_user(sync_config, since_token)
) -> Union[SyncResult, E2eeSyncResult]:
return await self.current_sync_for_user(
sync_config, sync_version, since_token
)
result = await self.notifier.wait_for_events(
sync_config.user.to_string(),
@@ -503,15 +623,43 @@ class SyncHandler:
return result
@overload
async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.SYNC_V2],
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> SyncResult:
) -> SyncResult: ...
@overload
async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: Literal[SyncVersion.E2EE_SYNC],
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> E2eeSyncResult: ...
@overload
async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> Union[SyncResult, E2eeSyncResult]: ...
async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> Union[SyncResult, E2eeSyncResult]:
"""
Generates the response body of a sync result, represented as a
`SyncResult`.
`SyncResult`/`E2eeSyncResult`.
This is a wrapper around `generate_sync_result` which starts an open tracing
span to track the sync. See `generate_sync_result` for the next part of your
@@ -524,15 +672,28 @@ class SyncHandler:
full_state: Whether to return the full state for each room.
Returns:
returns a full `SyncResult`.
When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
When `SyncVersion.E2EE_SYNC`, returns a `E2eeSyncResult`.
"""
with start_active_span("sync.current_sync_for_user"):
log_kv({"since_token": since_token})
# Go through the `/sync` v2 path
sync_result = await self.generate_sync_result(
sync_config, since_token, full_state
)
if sync_version == SyncVersion.SYNC_V2:
sync_result: Union[
SyncResult, E2eeSyncResult
] = await self.generate_sync_result(
sync_config, since_token, full_state
)
# Go through the MSC3575 Sliding Sync `/sync/e2ee` path
elif sync_version == SyncVersion.E2EE_SYNC:
sync_result = await self.generate_e2ee_sync_result(
sync_config, since_token
)
else:
raise Exception(
f"Unknown sync_version (this is a Synapse problem): {sync_version}"
)
set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
return sync_result
@@ -1807,6 +1968,102 @@ class SyncHandler:
next_batch=sync_result_builder.now_token,
)
async def generate_e2ee_sync_result(
self,
sync_config: SyncConfig,
since_token: Optional[StreamToken] = None,
) -> E2eeSyncResult:
"""
Generates the response body of a MSC3575 Sliding Sync `/sync/e2ee` result.
This is represented by a `E2eeSyncResult` struct, which is built from small
pieces using a `SyncResultBuilder`. The `sync_result_builder` is passed as a
mutable ("inout") parameter to various helper functions. These retrieve and
process the data which forms the sync body, often writing to the
`sync_result_builder` to store their output.
At the end, we transfer data from the `sync_result_builder` to a new `E2eeSyncResult`
instance to signify that the sync calculation is complete.
"""
user_id = sync_config.user.to_string()
app_service = self.store.get_app_service_by_user_id(user_id)
if app_service:
# We no longer support AS users using /sync directly.
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()
sync_result_builder = await self.get_sync_result_builder(
sync_config,
since_token,
full_state=False,
)
# 1. Calculate `to_device` events
await self._generate_sync_entry_for_to_device(sync_result_builder)
# 2. Calculate `device_lists`
# Device list updates are sent if a since token is provided.
device_lists = DeviceListUpdates()
include_device_list_updates = bool(since_token and since_token.device_list_key)
if include_device_list_updates:
# Note that _generate_sync_entry_for_rooms sets sync_result_builder.joined, which
# is used in calculate_user_changes below.
#
# TODO: Running `_generate_sync_entry_for_rooms()` is a lot of work just to
# figure out the membership changes/derived info needed for
# `_generate_sync_entry_for_device_list()`. In the future, we should try to
# refactor this away.
(
newly_joined_rooms,
newly_left_rooms,
) = await self._generate_sync_entry_for_rooms(sync_result_builder)
# This uses the sync_result_builder.joined which is set in
# `_generate_sync_entry_for_rooms`, if that didn't find any joined
# rooms for some reason it is a no-op.
(
newly_joined_or_invited_or_knocked_users,
newly_left_users,
) = sync_result_builder.calculate_user_changes()
# include_device_list_updates can only be True if we have a
# since token.
assert since_token is not None
device_lists = await self._device_handler.generate_sync_entry_for_device_list(
user_id=user_id,
since_token=since_token,
now_token=sync_result_builder.now_token,
joined_room_ids=sync_result_builder.joined_room_ids,
newly_joined_rooms=newly_joined_rooms,
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
newly_left_rooms=newly_left_rooms,
newly_left_users=newly_left_users,
)
# 3. Calculate `device_one_time_keys_count` and `device_unused_fallback_key_types`
device_id = sync_config.device_id
one_time_keys_count: JsonMapping = {}
unused_fallback_key_types: List[str] = []
if device_id:
# TODO: We should have a way to let clients differentiate between the states of:
# * no change in OTK count since the provided since token
# * the server has zero OTKs left for this device
# Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298
one_time_keys_count = await self.store.count_e2e_one_time_keys(
user_id, device_id
)
unused_fallback_key_types = list(
await self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
)
return E2eeSyncResult(
to_device=sync_result_builder.to_device,
device_lists=device_lists,
device_one_time_keys_count=one_time_keys_count,
device_unused_fallback_key_types=unused_fallback_key_types,
next_batch=sync_result_builder.now_token,
)
async def get_sync_result_builder(
self,
sync_config: SyncConfig,

View File

@@ -164,13 +164,11 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
return counts
in_flight_requests = LaterGauge(
LaterGauge(
name="synapse_http_server_in_flight_requests_count",
desc="",
labelnames=["method", "servlet", SERVER_NAME_LABEL],
)
in_flight_requests.register_hook(
homeserver_instance_id=None, hook=_get_in_flight_counts
caller=_get_in_flight_counts,
)

View File

@@ -227,16 +227,7 @@ LoggingContextOrSentinel = Union["LoggingContext", "_Sentinel"]
class _Sentinel:
"""
Sentinel to represent the root context
This should only be used for tasks outside of Synapse like when we yield control
back to the Twisted reactor (event loop) so we don't leak the current logging
context to other tasks that are scheduled next in the event loop.
Nothing from the Synapse homeserver should be logged with the sentinel context. i.e.
we should always know which server the logs are coming from.
"""
"""Sentinel to represent the root context"""
__slots__ = ["previous_context", "finished", "request", "tag"]
@@ -625,17 +616,9 @@ class LoggingContextFilter(logging.Filter):
class PreserveLoggingContext:
"""
Context manager which replaces the logging context
"""Context manager which replaces the logging context
The previous logging context is restored on exit.
`make_deferred_yieldable` is pretty equivalent to using `with
PreserveLoggingContext():` (using the default sentinel context), i.e. it clears the
logcontext before awaiting (and so before execution passes back to the reactor) and
restores the old context once the awaitable completes (execution passes from the
reactor back to the code).
"""
The previous logging context is restored on exit."""
__slots__ = ["_old_context", "_new_context"]
@@ -801,14 +784,6 @@ def run_in_background(
return from the function, and that the sentinel context is set once the
deferred returned by the function completes.
To explain how the log contexts work here:
- When this function is called, the current context is stored ("original"), we kick
off the background task, and we restore that original context before returning
- When the background task finishes, we don't want to leak our context into the
reactor which would erroneously get attached to the next operation picked up by
the event loop. We add a callback to the deferred which will clear the logging
context after it finishes and yields control back to the reactor.
Useful for wrapping functions that return a deferred or coroutine, which you don't
yield or await on (for instance because you want to pass it to
deferred.gatherResults()).
@@ -820,13 +795,8 @@ def run_in_background(
`f` doesn't raise any deferred exceptions, otherwise a scary-looking
CRITICAL error about an unhandled error will be logged without much
indication about where it came from.
Returns:
Deferred which returns the result of func, or `None` if func raises.
Note that the returned Deferred does not follow the synapse logcontext
rules.
"""
calling_context = current_context()
current = current_context()
try:
res = f(*args, **kwargs)
except Exception:
@@ -836,9 +806,6 @@ def run_in_background(
# `res` may be a coroutine, `Deferred`, some other kind of awaitable, or a plain
# value. Convert it to a `Deferred`.
#
# Wrapping the value in a deferred has the side effect of executing the coroutine,
# if it is one. If it's already a deferred, then we can just use that.
d: "defer.Deferred[R]"
if isinstance(res, typing.Coroutine):
# Wrap the coroutine in a `Deferred`.
@@ -853,24 +820,20 @@ def run_in_background(
# `res` is a plain value. Wrap it in a `Deferred`.
d = defer.succeed(res)
# The deferred has already completed
if d.called and not d.paused:
# The function should have maintained the logcontext, so we can
# optimise out the messing about
return d
# The function may have reset the context before returning, so we need to restore it
# now.
#
# Our goal is to have the caller logcontext unchanged after firing off the
# background task and returning.
set_current_context(calling_context)
# The function may have reset the context before returning, so
# we need to restore it now.
ctx = set_current_context(current)
# The original logcontext will be restored when the deferred completes, but
# there is nothing waiting for it, so it will get leaked into the reactor (which
# would then get picked up by the next thing the reactor does). We therefore
# need to reset the logcontext here (set the `sentinel` logcontext) before
# yielding control back to the reactor.
# The original context will be restored when the deferred
# completes, but there is nothing waiting for it, so it will
# get leaked into the reactor or some other function which
# wasn't expecting it. We therefore need to reset the context
# here.
#
# (If this feels asymmetric, consider it this way: we are
# effectively forking a new thread of execution. We are
@@ -878,7 +841,7 @@ def run_in_background(
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
d.addBoth(_set_context_cb, SENTINEL_CONTEXT)
d.addBoth(_set_context_cb, ctx)
return d
@@ -896,34 +859,20 @@ def run_coroutine_in_background(
coroutine directly rather than a function. We can do this because coroutines
do not run until called, and so calling an async function without awaiting
cannot change the log contexts.
This is an ergonomic helper so we can do this:
```python
run_coroutine_in_background(func1(arg1))
```
Rather than having to do this:
```python
run_in_background(lambda: func1(arg1))
```
"""
calling_context = current_context()
# Wrap the coroutine in a deferred, which will have the side effect of executing the
# coroutine in the background.
current = current_context()
d = defer.ensureDeferred(coroutine)
# The function may have reset the context before returning, so we need to restore it
# now.
#
# Our goal is to have the caller logcontext unchanged after firing off the
# background task and returning.
set_current_context(calling_context)
# The function may have reset the context before returning, so
# we need to restore it now.
ctx = set_current_context(current)
# The original logcontext will be restored when the deferred completes, but
# there is nothing waiting for it, so it will get leaked into the reactor (which
# would then get picked up by the next thing the reactor does). We therefore
# need to reset the logcontext here (set the `sentinel` logcontext) before
# yielding control back to the reactor.
# The original context will be restored when the deferred
# completes, but there is nothing waiting for it, so it will
# get leaked into the reactor or some other function which
# wasn't expecting it. We therefore need to reset the context
# here.
#
# (If this feels asymmetric, consider it this way: we are
# effectively forking a new thread of execution. We are
@@ -931,7 +880,7 @@ def run_coroutine_in_background(
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
d.addBoth(_set_context_cb, SENTINEL_CONTEXT)
d.addBoth(_set_context_cb, ctx)
return d
@@ -939,43 +888,24 @@ T = TypeVar("T")
def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
"""Given a deferred, make it follow the Synapse logcontext rules:
If the deferred has completed, essentially does nothing (just returns another
completed deferred with the result/failure).
If the deferred has not yet completed, resets the logcontext before
returning a deferred. Then, when the deferred completes, restores the
current logcontext before running callbacks/errbacks.
(This is more-or-less the opposite operation to run_in_background.)
"""
Given a deferred, make it follow the Synapse logcontext rules:
- If the deferred has completed, essentially does nothing (just returns another
completed deferred with the result/failure).
- If the deferred has not yet completed, resets the logcontext before returning a
incomplete deferred. Then, when the deferred completes, restores the current
logcontext before running callbacks/errbacks.
This means the resultant deferred can be awaited without leaking the current
logcontext to the reactor (which would then get erroneously picked up by the next
thing the reactor does), and also means that the logcontext is preserved when the
deferred completes.
(This is more-or-less the opposite operation to run_in_background in terms of how it
handles log contexts.)
Pretty much equivalent to using `with PreserveLoggingContext():`, i.e. it clears the
logcontext before awaiting (and so before execution passes back to the reactor) and
restores the old context once the awaitable completes (execution passes from the
reactor back to the code).
"""
# The deferred has already completed
if deferred.called and not deferred.paused:
# it looks like this deferred is ready to run any callbacks we give it
# immediately. We may as well optimise out the logcontext faffery.
return deferred
# Our goal is to have the caller logcontext unchanged after they yield/await the
# returned deferred.
#
# When the caller yield/await's the returned deferred, it may yield
# control back to the reactor. To avoid leaking the current logcontext to the
# reactor (which would then get erroneously picked up by the next thing the reactor
# does) while the deferred runs in the reactor event loop, we reset the logcontext
# and add a callback to the deferred to restore it so the caller's logcontext is
# active when the deferred completes.
# ok, we can't be sure that a yield won't block, so let's reset the
# logcontext, and add a callback to the deferred to restore it.
prev_context = set_current_context(SENTINEL_CONTEXT)
deferred.addBoth(_set_context_cb, prev_context)
return deferred

View File

@@ -43,7 +43,7 @@ from typing import (
)
import attr
from packaging.version import parse as parse_version
from pkg_resources import parse_version
from prometheus_client import (
CollectorRegistry,
Counter,
@@ -73,6 +73,8 @@ logger = logging.getLogger(__name__)
METRICS_PREFIX = "/_synapse/metrics"
all_gauges: Dict[str, Collector] = {}
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
SERVER_NAME_LABEL = "server_name"
@@ -161,110 +163,42 @@ class LaterGauge(Collector):
name: str
desc: str
labelnames: Optional[StrSequence] = attr.ib(hash=False)
_instance_id_to_hook_map: Dict[
Optional[str], # instance_id
Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
],
] = attr.ib(factory=dict, hash=False)
"""
Map from homeserver instance_id to a callback. Each callback should either return a
value (if there are no labels for this metric), or dict mapping from a label tuple
to a value.
We use `instance_id` instead of `server_name` because it's possible to have multiple
workers running in the same process with the same `server_name`.
"""
# callback: should either return a value (if there are no labels for this metric),
# or dict mapping from a label tuple to a value
caller: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
]
def collect(self) -> Iterable[Metric]:
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
# (we don't enforce it here, one level up).
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]
for homeserver_instance_id, hook in self._instance_id_to_hook_map.items():
try:
hook_result = hook()
except Exception:
logger.exception(
"Exception running callback for LaterGauge(%s) for homeserver_instance_id=%s",
self.name,
homeserver_instance_id,
)
# Continue to return the rest of the metrics that aren't broken
continue
try:
calls = self.caller()
except Exception:
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
yield g
return
if isinstance(hook_result, (int, float)):
g.add_metric([], hook_result)
else:
for k, v in hook_result.items():
g.add_metric(k, v)
if isinstance(calls, (int, float)):
g.add_metric([], calls)
else:
for k, v in calls.items():
g.add_metric(k, v)
yield g
def register_hook(
self,
*,
homeserver_instance_id: Optional[str],
hook: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
],
) -> None:
"""
Register a callback/hook that will be called to generate a metric samples for
the gauge.
Args:
homeserver_instance_id: The unique ID for this Synapse process instance
(`hs.get_instance_id()`) that this hook is associated with. This can be used
later to lookup all hooks associated with a given server name in order to
unregister them. This should only be omitted for global hooks that work
across all homeservers.
hook: A callback that should either return a value (if there are no
labels for this metric), or dict mapping from a label tuple to a value
"""
# We shouldn't have multiple hooks registered for the same homeserver `instance_id`.
existing_hook = self._instance_id_to_hook_map.get(homeserver_instance_id)
assert existing_hook is None, (
f"LaterGauge(name={self.name}) hook already registered for homeserver_instance_id={homeserver_instance_id}. "
"This is likely a Synapse bug and you forgot to unregister the previous hooks for "
"the server (especially in tests)."
)
self._instance_id_to_hook_map[homeserver_instance_id] = hook
def unregister_hooks_for_homeserver_instance_id(
self, homeserver_instance_id: str
) -> None:
"""
Unregister all hooks associated with the given homeserver `instance_id`. This should be
called when a homeserver is shutdown to avoid extra hooks sitting around.
Args:
homeserver_instance_id: The unique ID for this Synapse process instance to
unregister hooks for (`hs.get_instance_id()`).
"""
self._instance_id_to_hook_map.pop(homeserver_instance_id, None)
def __attrs_post_init__(self) -> None:
self._register()
def _register(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
# We shouldn't have multiple metrics with the same name. Typically, metrics
# should be created globally so you shouldn't be running into this and this will
# catch any stupid mistakes. The `REGISTRY.register(self)` call above will also
# raise an error if the metric already exists but to make things explicit, we'll
# also check here.
existing_gauge = all_later_gauges_to_clean_up_on_shutdown.get(self.name)
assert existing_gauge is None, f"LaterGauge(name={self.name}) already exists. "
# Keep track of the gauge so we can clean it up later.
all_later_gauges_to_clean_up_on_shutdown[self.name] = self
all_later_gauges_to_clean_up_on_shutdown: Dict[str, LaterGauge] = {}
"""
Track all `LaterGauge` instances so we can remove any associated hooks during homeserver
shutdown.
"""
all_gauges[self.name] = self
# `MetricsEntry` only makes sense when it is a `Protocol`,
@@ -316,7 +250,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
# Protects access to _registrations
self._lock = threading.Lock()
REGISTRY.register(self)
self._register_with_collector()
def register(
self,
@@ -407,6 +341,14 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
gauge.add_metric(labels=key, value=getattr(metrics, name))
yield gauge
def _register_with_collector(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
"""

View File

@@ -228,11 +228,6 @@ def run_as_background_process(
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
normal synapse async function).
Because the returned Deferred does not follow the synapse logcontext rules, awaiting
the result of this function will result in the log context being cleared (bad). In
order to properly await the result of this function and maintain the current log
context, use `make_deferred_yieldable`.
Args:
desc: a description for this background process type
server_name: The homeserver name that this background process is being run for
@@ -285,18 +280,6 @@ def run_as_background_process(
name=desc, **{SERVER_NAME_LABEL: server_name}
).dec()
# To explain how the log contexts work here:
# - When this function is called, the current context is stored (using
# `PreserveLoggingContext`), we kick off the background task, and we restore the
# original context before returning (also part of `PreserveLoggingContext`).
# - When the background task finishes, we don't want to leak our background context
# into the reactor which would erroneously get attached to the next operation
# picked up by the event loop. We use `PreserveLoggingContext` to set the
# `sentinel` context and means the new `BackgroundProcessLoggingContext` will
# remember the `sentinel` context as its previous context to return to when it
# exits and yields control back to the reactor.
#
# TODO: Why can't we simplify to using `return run_in_background(run)`?
with PreserveLoggingContext():
# Note that we return a Deferred here so that it can be used in a
# looping_call and other places that expect a Deferred.

View File

@@ -86,24 +86,6 @@ users_woken_by_stream_counter = Counter(
labelnames=["stream", SERVER_NAME_LABEL],
)
notifier_listeners_gauge = LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
notifier_rooms_gauge = LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
notifier_users_gauge = LaterGauge(
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
T = TypeVar("T")
@@ -299,20 +281,28 @@ class Notifier:
)
}
notifier_listeners_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(), hook=count_listeners
LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=count_listeners,
)
notifier_rooms_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): count(
bool, list(self.room_to_user_streams.values())
)
},
)
notifier_users_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(self.user_to_user_stream)},
LaterGauge(
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_user_stream)},
)
def add_replication_callback(self, cb: Callable[[], None]) -> None:

View File

@@ -91,7 +91,7 @@ def _rule_to_template(rule: PushRule) -> Optional[Dict[str, Any]]:
unscoped_rule_id = _rule_id_from_namespaced(rule.rule_id)
template_name = _priority_class_to_template_name(rule.priority_class)
if template_name in ["override", "underride", "postcontent"]:
if template_name in ["override", "underride"]:
templaterule = {"conditions": rule.conditions, "actions": rule.actions}
elif template_name in ["sender", "room"]:
templaterule = {"actions": rule.actions}

View File

@@ -19,14 +19,10 @@
#
#
# Integer literals for push rule `kind`s
# This is used to store them in the database.
PRIORITY_CLASS_MAP = {
"underride": 1,
"sender": 2,
"room": 3,
# MSC4306
"postcontent": 6,
"content": 4,
"override": 5,
}

View File

@@ -106,18 +106,6 @@ user_ip_cache_counter = Counter(
"synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL]
)
tcp_resource_total_connections_gauge = LaterGauge(
name="synapse_replication_tcp_resource_total_connections",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
tcp_command_queue_gauge = LaterGauge(
name="synapse_replication_tcp_command_queue",
desc="Number of inbound RDATA/POSITION commands queued for processing",
labelnames=["stream_name", SERVER_NAME_LABEL],
)
# the type of the entries in _command_queues_by_stream
_StreamCommandQueue = Deque[
@@ -255,9 +243,11 @@ class ReplicationCommandHandler:
# outgoing replication commands to.)
self._connections: List[IReplicationConnection] = []
tcp_resource_total_connections_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(self._connections)},
LaterGauge(
name="synapse_replication_tcp_resource_total_connections",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._connections)},
)
# When POSITION or RDATA commands arrive, we stick them in a queue and process
@@ -276,9 +266,11 @@ class ReplicationCommandHandler:
# from that connection.
self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}
tcp_command_queue_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
LaterGauge(
name="synapse_replication_tcp_command_queue",
desc="Number of inbound RDATA/POSITION commands queued for processing",
labelnames=["stream_name", SERVER_NAME_LABEL],
caller=lambda: {
(stream_name, self.server_name): len(queue)
for stream_name, queue in self._command_queues_by_stream.items()
},

View File

@@ -527,10 +527,7 @@ pending_commands = LaterGauge(
name="synapse_replication_tcp_protocol_pending_commands",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
)
pending_commands.register_hook(
homeserver_instance_id=None,
hook=lambda: {
caller=lambda: {
(p.name, p.server_name): len(p.pending_commands) for p in connected_connections
},
)
@@ -547,10 +544,7 @@ transport_send_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_send_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
)
transport_send_buffer.register_hook(
homeserver_instance_id=None,
hook=lambda: {
caller=lambda: {
(p.name, p.server_name): transport_buffer_size(p) for p in connected_connections
},
)
@@ -577,10 +571,7 @@ tcp_transport_kernel_send_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_kernel_send_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
)
tcp_transport_kernel_send_buffer.register_hook(
homeserver_instance_id=None,
hook=lambda: {
caller=lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, False)
for p in connected_connections
},
@@ -591,10 +582,7 @@ tcp_transport_kernel_read_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_kernel_read_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
)
tcp_transport_kernel_read_buffer.register_hook(
homeserver_instance_id=None,
hook=lambda: {
caller=lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, True)
for p in connected_connections
},

View File

@@ -92,9 +92,9 @@ class ExperimentalFeaturesRestServlet(RestServlet):
user_features = {}
for feature in ExperimentalFeature:
if feature in enabled_features:
user_features[feature.value] = True
user_features[feature] = True
else:
user_features[feature.value] = False
user_features[feature] = False
return HTTPStatus.OK, {"features": user_features}
async def on_PUT(

View File

@@ -109,12 +109,6 @@ class ProfileFieldRestServlet(RestServlet):
self.hs = hs
self.profile_handler = hs.get_profile_handler()
self.auth = hs.get_auth()
if hs.config.experimental.msc4133_enabled:
self.PATTERNS.append(
re.compile(
r"^/_matrix/client/unstable/uk\.tcpip\.msc4133/profile/(?P<user_id>[^/]*)/(?P<field_name>[^/]*)"
)
)
async def on_GET(
self, request: SynapseRequest, user_id: str, field_name: str

View File

@@ -19,11 +19,9 @@
#
#
from http import HTTPStatus
from typing import TYPE_CHECKING, List, Tuple, Union
from synapse.api.errors import (
Codes,
NotFoundError,
StoreError,
SynapseError,
@@ -241,15 +239,6 @@ def _rule_spec_from_path(path: List[str]) -> RuleSpec:
def _rule_tuple_from_request_object(
rule_template: str, rule_id: str, req_obj: JsonDict
) -> Tuple[List[JsonDict], List[Union[str, JsonDict]]]:
if rule_template == "postcontent":
# postcontent is from MSC4306, which says that clients
# cannot create their own postcontent rules right now.
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"user-defined rules using `postcontent` are not accepted",
errcode=Codes.INVALID_PARAM,
)
if rule_template in ["override", "underride"]:
if "conditions" not in req_obj:
raise InvalidRuleException("Missing 'conditions'")

View File

@@ -42,6 +42,7 @@ from synapse.handlers.sync import (
KnockedSyncResult,
SyncConfig,
SyncResult,
SyncVersion,
)
from synapse.http.server import HttpServer
from synapse.http.servlet import (
@@ -266,6 +267,7 @@ class SyncRestServlet(RestServlet):
sync_result = await self.sync_handler.wait_for_sync_for_user(
requester,
sync_config,
SyncVersion.SYNC_V2,
request_key,
since_token=since_token,
timeout=timeout,
@@ -630,6 +632,177 @@ class SyncRestServlet(RestServlet):
return result
class SlidingSyncE2eeRestServlet(RestServlet):
"""
API endpoint for MSC3575 Sliding Sync `/sync/e2ee`. This is being introduced as part
of Sliding Sync but doesn't have any sliding window component. It's just a way to
get E2EE events without having to sit through a big initial sync (`/sync` v2). And
we can avoid encryption events being backed up by the main sync response.
Having To-Device messages split out to this sync endpoint also helps when clients
need to have 2 or more sync streams open at a time, e.g a push notification process
and a main process. This can cause the two processes to race to fetch the To-Device
events, resulting in the need for complex synchronisation rules to ensure the token
is correctly and atomically exchanged between processes.
GET parameters::
timeout(int): How long to wait for new events in milliseconds.
since(batch_token): Batch token when asking for incremental deltas.
Response JSON::
{
"next_batch": // batch token for the next /sync
"to_device": {
// list of to-device events
"events": [
{
"content: { "algorithm": "m.olm.v1.curve25519-aes-sha2", "ciphertext": { ... }, "org.matrix.msgid": "abcd", "session_id": "abcd" },
"type": "m.room.encrypted",
"sender": "@alice:example.com",
}
// ...
]
},
"device_lists": {
"changed": ["@alice:example.com"],
"left": ["@bob:example.com"]
},
"device_one_time_keys_count": {
"signed_curve25519": 50
},
"device_unused_fallback_key_types": [
"signed_curve25519"
]
}
"""
PATTERNS = client_patterns(
"/org.matrix.msc3575/sync/e2ee$", releases=[], v1=False, unstable=True
)
def __init__(self, hs: "HomeServer"):
super().__init__()
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
self.sync_handler = hs.get_sync_handler()
# Filtering only matters for the `device_lists` because it requires a bunch of
# derived information from rooms (see how `_generate_sync_entry_for_rooms()`
# prepares a bunch of data for `_generate_sync_entry_for_device_list()`).
self.only_member_events_filter_collection = FilterCollection(
self.hs,
{
"room": {
# We only care about membership events for the `device_lists`.
# Membership will tell us whether a user has joined/left a room and
# if there are new devices to encrypt for.
"timeline": {
"types": ["m.room.member"],
},
"state": {
"types": ["m.room.member"],
},
# We don't want any extra account_data generated because it's not
# returned by this endpoint. This helps us avoid work in
# `_generate_sync_entry_for_rooms()`
"account_data": {
"not_types": ["*"],
},
# We don't want any extra ephemeral data generated because it's not
# returned by this endpoint. This helps us avoid work in
# `_generate_sync_entry_for_rooms()`
"ephemeral": {
"not_types": ["*"],
},
},
# We don't want any extra account_data generated because it's not
# returned by this endpoint. (This is just here for good measure)
"account_data": {
"not_types": ["*"],
},
# We don't want any extra presence data generated because it's not
# returned by this endpoint. (This is just here for good measure)
"presence": {
"not_types": ["*"],
},
},
)
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req_experimental_feature(
request, allow_guest=True, feature=ExperimentalFeature.MSC3575
)
user = requester.user
device_id = requester.device_id
timeout = parse_integer(request, "timeout", default=0)
since = parse_string(request, "since")
sync_config = SyncConfig(
user=user,
filter_collection=self.only_member_events_filter_collection,
is_guest=requester.is_guest,
device_id=device_id,
use_state_after=False, # We don't return any rooms so this flag is a no-op
)
since_token = None
if since is not None:
since_token = await StreamToken.from_string(self.store, since)
# Request cache key
request_key = (
SyncVersion.E2EE_SYNC,
user,
timeout,
since,
)
# Gather data for the response
sync_result = await self.sync_handler.wait_for_sync_for_user(
requester,
sync_config,
SyncVersion.E2EE_SYNC,
request_key,
since_token=since_token,
timeout=timeout,
full_state=False,
)
# The client may have disconnected by now; don't bother to serialize the
# response if so.
if request._disconnected:
logger.info("Client has disconnected; not serializing response.")
return 200, {}
response: JsonDict = defaultdict(dict)
response["next_batch"] = await sync_result.next_batch.to_string(self.store)
if sync_result.to_device:
response["to_device"] = {"events": sync_result.to_device}
if sync_result.device_lists.changed:
response["device_lists"]["changed"] = list(sync_result.device_lists.changed)
if sync_result.device_lists.left:
response["device_lists"]["left"] = list(sync_result.device_lists.left)
# We always include this because https://github.com/vector-im/element-android/issues/3725
# The spec isn't terribly clear on when this can be omitted and how a client would tell
# the difference between "no keys present" and "nothing changed" in terms of whole field
# absent / individual key type entry absent
# Corresponding synapse issue: https://github.com/matrix-org/synapse/issues/10456
response["device_one_time_keys_count"] = sync_result.device_one_time_keys_count
# https://github.com/matrix-org/matrix-doc/blob/54255851f642f84a4f1aaf7bc063eebe3d76752b/proposals/2732-olm-fallback-keys.md
# states that this field should always be included, as long as the server supports the feature.
response["device_unused_fallback_key_types"] = (
sync_result.device_unused_fallback_key_types
)
return 200, response
class SlidingSyncRestServlet(RestServlet):
"""
API endpoint for MSC3575 Sliding Sync `/sync`. Allows for clients to request a
@@ -1081,3 +1254,4 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
SyncRestServlet(hs).register(http_server)
SlidingSyncRestServlet(hs).register(http_server)
SlidingSyncE2eeRestServlet(hs).register(http_server)

View File

@@ -175,7 +175,6 @@ class VersionsRestServlet(RestServlet):
"org.matrix.simplified_msc3575": msc3575_enabled,
# Arbitrary key-value profile fields.
"uk.tcpip.msc4133": self.config.experimental.msc4133_enabled,
"uk.tcpip.msc4133.stable": True,
# MSC4155: Invite filtering
"org.matrix.msc4155": self.config.experimental.msc4155_enabled,
# MSC4306: Support for thread subscriptions

View File

@@ -129,10 +129,7 @@ from synapse.http.client import (
)
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.media.media_repository import MediaRepository
from synapse.metrics import (
all_later_gauges_to_clean_up_on_shutdown,
register_threadpool,
)
from synapse.metrics import register_threadpool
from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager
from synapse.module_api import ModuleApi
from synapse.module_api.callbacks import ModuleApiCallbacks
@@ -366,36 +363,11 @@ class HomeServer(metaclass=abc.ABCMeta):
self.datastores = Databases(self.DATASTORE_CLASS, self)
logger.info("Finished setting up.")
def __del__(self) -> None:
"""
Called when an the homeserver is garbage collected.
Make sure we actually do some clean-up, rather than leak data.
"""
self.cleanup()
def cleanup(self) -> None:
"""
WIP: Clean-up any references to the homeserver and stop any running related
processes, timers, loops, replication stream, etc.
This should be called wherever you care about the HomeServer being completely
garbage collected like in tests. It's not necessary to call if you plan to just
shut down the whole Python process anyway.
Can be called multiple times.
"""
logger.info("Received cleanup request for %s.", self.hostname)
# TODO: Stop background processes, timers, loops, replication stream, etc.
# Cleanup metrics associated with the homeserver
for later_gauge in all_later_gauges_to_clean_up_on_shutdown.values():
later_gauge.unregister_hooks_for_homeserver_instance_id(
self.get_instance_id()
)
logger.info("Cleanup complete for %s.", self.hostname)
# Register background tasks required by this server. This must be done
# somewhat manually due to the background tasks not being registered
# unless handlers are instantiated.
if self.config.worker.run_background_tasks:
self.setup_background_tasks()
def start_listening(self) -> None: # noqa: B027 (no-op by design)
"""Start the HTTP, manhole, metrics, etc listeners
@@ -404,7 +376,7 @@ class HomeServer(metaclass=abc.ABCMeta):
appropriate listeners.
"""
def start_background_tasks(self) -> None:
def setup_background_tasks(self) -> None:
"""
Some handlers have side effects on instantiation (like registering
background updates). This function causes them to be fetched, and

View File

@@ -682,6 +682,8 @@ class StateStorageController:
- the stream id which these results go up to
- list of current_state_delta_stream rows. If it is empty, we are
up to date.
A maximum of 100 rows will be returned.
"""
# FIXME(faster_joins): what do we do here?
# https://github.com/matrix-org/synapse/issues/13008

View File

@@ -61,7 +61,7 @@ from synapse.logging.context import (
current_context,
make_deferred_yieldable,
)
from synapse.metrics import SERVER_NAME_LABEL, register_threadpool
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge, register_threadpool
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.background_updates import BackgroundUpdater
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
@@ -611,6 +611,12 @@ class DatabasePool:
)
self.updates = BackgroundUpdater(hs, self)
LaterGauge(
name="synapse_background_update_status",
desc="Background update status",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self.updates.get_status()},
)
self._previous_txn_total_time = 0.0
self._current_txn_total_time = 0.0

View File

@@ -22,7 +22,6 @@
import logging
from typing import TYPE_CHECKING, Generic, List, Optional, Type, TypeVar
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool, make_conn
from synapse.storage.databases.main.events import PersistEventsStore
@@ -41,13 +40,6 @@ logger = logging.getLogger(__name__)
DataStoreT = TypeVar("DataStoreT", bound=SQLBaseStore, covariant=True)
background_update_status = LaterGauge(
name="synapse_background_update_status",
desc="Background update status",
labelnames=["database_name", SERVER_NAME_LABEL],
)
class Databases(Generic[DataStoreT]):
"""The various databases.
@@ -151,15 +143,6 @@ class Databases(Generic[DataStoreT]):
db_conn.close()
# Track the background update status for each database
background_update_status.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
(database.name(), server_name): database.updates.get_status()
for database in self.databases
},
)
# Sanity check that we have actually configured all the required stores.
if not main:
raise Exception("No 'main' database configured")

View File

@@ -182,6 +182,21 @@ class DelayedEventsStore(SQLBaseStore):
"restart_delayed_event", restart_delayed_event_txn
)
async def get_count_of_delayed_events(self) -> int:
"""Returns the number of pending delayed events in the DB."""
def _get_count_of_delayed_events(txn: LoggingTransaction) -> int:
sql = "SELECT count(*) FROM delayed_events"
txn.execute(sql)
resp = txn.fetchone()
return resp[0] if resp is not None else 0
return await self.db_pool.runInteraction(
"get_count_of_delayed_events",
_get_count_of_delayed_events,
)
async def get_all_delayed_events_for_user(
self,
user_localpart: str,

View File

@@ -2135,6 +2135,39 @@ class EventsWorkerStore(SQLBaseStore):
return rows, to_token, True
async def get_senders_for_event_ids(
self, event_ids: Collection[str]
) -> Dict[str, Optional[str]]:
"""
Given a sequence of event IDs, return the sender associated with each.
Args:
event_ids: A collection of event IDs as strings.
Returns:
A dict of event ID -> sender of the event.
If a given event ID does not exist in the `events` table, then no entry
for that event ID will be returned.
"""
def _get_senders_for_event_ids(
txn: LoggingTransaction,
) -> Dict[str, Optional[str]]:
rows = self.db_pool.simple_select_many_txn(
txn=txn,
table="events",
column="event_id",
iterable=event_ids,
keyvalues={},
retcols=["event_id", "sender"],
)
return dict(rows)
return await self.db_pool.runInteraction(
"get_senders_for_event_ids", _get_senders_for_event_ids
)
@cached(max_entries=5000)
async def get_event_ordering(self, event_id: str, room_id: str) -> Tuple[int, int]:
res = await self.db_pool.simple_select_one(

View File

@@ -84,13 +84,6 @@ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership"
_POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000
federation_known_servers_gauge = LaterGauge(
name="synapse_federation_known_servers",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
@attr.s(frozen=True, slots=True, auto_attribs=True)
class EventIdMembership:
"""Returned by `get_membership_from_event_ids`"""
@@ -123,9 +116,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
1,
self._count_known_servers,
)
federation_known_servers_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): self._known_servers_count},
LaterGauge(
name="synapse_federation_known_servers",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self._known_servers_count},
)
@wrap_as_background_process("_count_known_servers")

View File

@@ -94,6 +94,8 @@ class StateDeltasStore(SQLBaseStore):
- the stream id which these results go up to
- list of current_state_delta_stream rows. If it is empty, we are
up to date.
A maximum of 100 rows will be returned.
"""
prev_stream_id = int(prev_stream_id)

View File

@@ -131,28 +131,22 @@ def _get_counts_from_rate_limiter_instance(
# We track the number of affected hosts per time-period so we can
# differentiate one really noisy homeserver from a general
# ratelimit tuning problem across the federation.
sleep_affected_hosts_gauge = LaterGauge(
LaterGauge(
name="synapse_rate_limit_sleep_affected_hosts",
desc="Number of hosts that had requests put to sleep",
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
)
sleep_affected_hosts_gauge.register_hook(
homeserver_instance_id=None,
hook=lambda: _get_counts_from_rate_limiter_instance(
caller=lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum(
ratelimiter.should_sleep()
for ratelimiter in rate_limiter_instance.ratelimiters.values()
)
),
)
reject_affected_hosts_gauge = LaterGauge(
LaterGauge(
name="synapse_rate_limit_reject_affected_hosts",
desc="Number of hosts that had requests rejected",
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
)
reject_affected_hosts_gauge.register_hook(
homeserver_instance_id=None,
hook=lambda: _get_counts_from_rate_limiter_instance(
caller=lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum(
ratelimiter.should_reject()
for ratelimiter in rate_limiter_instance.ratelimiters.values()

21
synapse/util/sentinel.py Normal file
View File

@@ -0,0 +1,21 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
import enum
class Sentinel(enum.Enum):
# defining a sentinel in this way allows mypy to correctly handle the
# type of a dictionary lookup and subsequent type narrowing.
UNSET_SENTINEL = object()

View File

@@ -44,13 +44,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
running_tasks_gauge = LaterGauge(
name="synapse_scheduler_running_tasks",
desc="The number of concurrent running tasks handled by the TaskScheduler",
labelnames=[SERVER_NAME_LABEL],
)
class TaskScheduler:
"""
This is a simple task scheduler designed for resumable tasks. Normally,
@@ -137,9 +130,11 @@ class TaskScheduler:
TaskScheduler.SCHEDULE_INTERVAL_MS,
)
running_tasks_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(self._running_tasks)},
LaterGauge(
name="synapse_scheduler_running_tasks",
desc="The number of concurrent running tasks handled by the TaskScheduler",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._running_tasks)},
)
def register_action(

View File

@@ -35,7 +35,7 @@ from synapse.config._base import RootConfig
from synapse.config.auto_accept_invites import AutoAcceptInvitesConfig
from synapse.events.auto_accept_invites import InviteAutoAccepter
from synapse.federation.federation_base import event_from_pdu_json
from synapse.handlers.sync import JoinedSyncResult, SyncRequestKey
from synapse.handlers.sync import JoinedSyncResult, SyncRequestKey, SyncVersion
from synapse.module_api import ModuleApi
from synapse.rest import admin
from synapse.rest.client import login, room
@@ -548,6 +548,7 @@ def sync_join(
testcase.hs.get_sync_handler().wait_for_sync_for_user(
requester,
sync_config,
SyncVersion.SYNC_V2,
generate_request_key(),
since_token,
)

View File

@@ -36,7 +36,7 @@ from synapse.server import HomeServer
from synapse.types import JsonDict, StreamToken, create_requester
from synapse.util import Clock
from tests.handlers.test_sync import SyncRequestKey, generate_sync_config
from tests.handlers.test_sync import SyncRequestKey, SyncVersion, generate_sync_config
from tests.unittest import (
FederatingHomeserverTestCase,
HomeserverTestCase,
@@ -532,6 +532,7 @@ def sync_presence(
testcase.hs.get_sync_handler().wait_for_sync_for_user(
requester,
sync_config,
SyncVersion.SYNC_V2,
generate_request_key(),
since_token,
)

View File

@@ -37,6 +37,7 @@ from synapse.handlers.sync import (
SyncConfig,
SyncRequestKey,
SyncResult,
SyncVersion,
TimelineBatch,
)
from synapse.rest import admin
@@ -112,6 +113,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
requester,
sync_config,
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -122,6 +124,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
requester,
sync_config,
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
),
ResourceLimitError,
@@ -139,6 +142,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
requester,
sync_config,
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
),
ResourceLimitError,
@@ -163,6 +167,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
sync_config=generate_sync_config(
user, device_id="dev", use_state_after=self.use_state_after
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -198,6 +203,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
sync_config=generate_sync_config(
user, use_state_after=self.use_state_after
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -212,6 +218,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
sync_config=generate_sync_config(
user, device_id="dev", use_state_after=self.use_state_after
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=initial_result.next_batch,
)
@@ -245,6 +252,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
sync_config=generate_sync_config(
user, use_state_after=self.use_state_after
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -259,6 +267,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
sync_config=generate_sync_config(
user, device_id="dev", use_state_after=self.use_state_after
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=initial_result.next_batch,
)
@@ -301,6 +310,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
create_requester(owner),
generate_sync_config(owner, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -326,6 +336,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
eve_requester,
eve_sync_config,
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -352,6 +363,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
eve_requester,
eve_sync_config,
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=eve_sync_after_ban.next_batch,
)
@@ -364,6 +376,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
eve_requester,
eve_sync_config,
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=None,
)
@@ -398,6 +411,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(alice, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -427,6 +441,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
),
use_state_after=self.use_state_after,
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=initial_sync_result.next_batch,
)
@@ -472,6 +487,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(alice, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -511,6 +527,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
),
use_state_after=self.use_state_after,
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=initial_sync_result.next_batch,
)
@@ -559,6 +576,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(alice, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -585,6 +603,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
),
use_state_after=self.use_state_after,
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=initial_sync_result.next_batch,
)
@@ -624,6 +643,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
),
use_state_after=self.use_state_after,
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=incremental_sync.next_batch,
)
@@ -697,6 +717,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(alice, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -722,6 +743,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
),
use_state_after=self.use_state_after,
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -747,6 +769,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(alice, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=initial_sync_result.next_batch,
)
@@ -810,6 +833,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
bob_requester,
generate_sync_config(bob, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -843,6 +867,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
filter_collection=FilterCollection(self.hs, filter_dict),
use_state_after=self.use_state_after,
),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=None if initial_sync else initial_sync_result.next_batch,
)
@@ -942,6 +967,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
create_requester(user),
generate_sync_config(user, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -990,6 +1016,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
create_requester(user2),
generate_sync_config(user2, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -1015,6 +1042,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
create_requester(user),
generate_sync_config(user, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
)
)
@@ -1051,6 +1079,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
create_requester(user),
generate_sync_config(user, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=since_token,
timeout=0,
@@ -1105,6 +1134,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.sync_handler.wait_for_sync_for_user(
create_requester(user),
generate_sync_config(user, use_state_after=self.use_state_after),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=since_token,
timeout=0,

View File

@@ -18,18 +18,11 @@
# [This file includes modifications made by New Vector Limited]
#
#
from typing import Dict, NoReturn, Protocol, Tuple
from typing import Dict, Protocol, Tuple
from prometheus_client.core import Sample
from synapse.metrics import (
REGISTRY,
SERVER_NAME_LABEL,
InFlightGauge,
LaterGauge,
all_later_gauges_to_clean_up_on_shutdown,
generate_latest,
)
from synapse.metrics import REGISTRY, InFlightGauge, generate_latest
from synapse.util.caches.deferred_cache import DeferredCache
from tests import unittest
@@ -292,95 +285,6 @@ class CacheMetricsTests(unittest.HomeserverTestCase):
self.assertEqual(hs2_cache_max_size_metric_value, "777.0")
class LaterGaugeTests(unittest.HomeserverTestCase):
def setUp(self) -> None:
super().setUp()
self.later_gauge = LaterGauge(
name="foo",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
def tearDown(self) -> None:
super().tearDown()
REGISTRY.unregister(self.later_gauge)
all_later_gauges_to_clean_up_on_shutdown.pop(self.later_gauge.name, None)
def test_later_gauge_multiple_servers(self) -> None:
"""
Test that LaterGauge metrics are reported correctly across multiple servers. We
will have an metrics entry for each homeserver that is labeled with the
`server_name` label.
"""
self.later_gauge.register_hook(
homeserver_instance_id="123", hook=lambda: {("hs1",): 1}
)
self.later_gauge.register_hook(
homeserver_instance_id="456", hook=lambda: {("hs2",): 2}
)
metrics_map = get_latest_metrics()
# Find the metrics from both homeservers
hs1_metric = 'foo{server_name="hs1"}'
hs1_metric_value = metrics_map.get(hs1_metric)
self.assertIsNotNone(
hs1_metric_value,
f"Missing metric {hs1_metric} in metrics {metrics_map}",
)
self.assertEqual(hs1_metric_value, "1.0")
hs2_metric = 'foo{server_name="hs2"}'
hs2_metric_value = metrics_map.get(hs2_metric)
self.assertIsNotNone(
hs2_metric_value,
f"Missing metric {hs2_metric} in metrics {metrics_map}",
)
self.assertEqual(hs2_metric_value, "2.0")
def test_later_gauge_hook_exception(self) -> None:
"""
Test that LaterGauge metrics are collected across multiple servers even if one
hooks is throwing an exception.
"""
def raise_exception() -> NoReturn:
raise Exception("fake error generating data")
# Make the hook for hs1 throw an exception
self.later_gauge.register_hook(
homeserver_instance_id="123", hook=raise_exception
)
# Metrics from hs2 still work fine
self.later_gauge.register_hook(
homeserver_instance_id="456", hook=lambda: {("hs2",): 2}
)
metrics_map = get_latest_metrics()
# Since we encountered an exception while trying to collect metrics from hs1, we
# don't expect to see it here.
hs1_metric = 'foo{server_name="hs1"}'
hs1_metric_value = metrics_map.get(hs1_metric)
self.assertIsNone(
hs1_metric_value,
(
"Since we encountered an exception while trying to collect metrics from hs1"
f"we don't expect to see it the metrics_map {metrics_map}"
),
)
# We should still see metrics from hs2 though
hs2_metric = 'foo{server_name="hs2"}'
hs2_metric_value = metrics_map.get(hs2_metric)
self.assertIsNotNone(
hs2_metric_value,
f"Missing metric {hs2_metric} in cache metrics {metrics_map}",
)
self.assertEqual(hs2_metric_value, "2.0")
def get_latest_metrics() -> Dict[str, str]:
"""
Collect the latest metrics from the registry and parse them into an easy to use map.

View File

@@ -19,7 +19,6 @@
#
#
from http import HTTPStatus
from typing import Any, Optional
from unittest.mock import AsyncMock, patch
@@ -31,7 +30,7 @@ from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
from synapse.api.room_versions import RoomVersions
from synapse.push.bulk_push_rule_evaluator import BulkPushRuleEvaluator
from synapse.rest import admin
from synapse.rest.client import login, push_rule, register, room
from synapse.rest.client import login, register, room
from synapse.server import HomeServer
from synapse.types import JsonDict, create_requester
from synapse.util import Clock
@@ -45,7 +44,6 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
room.register_servlets,
login.register_servlets,
register.register_servlets,
push_rule.register_servlets,
]
def prepare(
@@ -496,135 +494,6 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
)
)
@override_config({"experimental_features": {"msc4306_enabled": True}})
def test_thread_subscriptions_suppression_after_keyword_mention_overrides(
self,
) -> None:
"""
Tests one of the purposes of the `postcontent` push rule section:
When a keyword mention is configured (in the `content` section),
it does not get suppressed by the thread being unsubscribed.
"""
# add a keyword mention to alice's push rules
channel = self.make_request(
"PUT",
"/_matrix/client/v3/pushrules/global/content/biscuits",
{"pattern": "biscuits", "actions": ["notify"]},
access_token=self.token,
)
self.assertEqual(channel.code, HTTPStatus.OK)
bulk_evaluator = BulkPushRuleEvaluator(self.hs)
(thread_root_id,) = self.helper.send_messages(self.room_id, 1, tok=self.token)
self.assertFalse(
self._create_and_process(
bulk_evaluator,
{
"msgtype": "m.text",
"body": "do you want some cookies?",
"m.relates_to": {
"rel_type": RelationTypes.THREAD,
"event_id": thread_root_id,
},
},
type="m.room.message",
),
"alice is not subscribed to thread and does not have a mention on 'cookies' so should not be notified",
)
self.assertTrue(
self._create_and_process(
bulk_evaluator,
{
"msgtype": "m.text",
"body": "biscuits are available in the kitchen",
"m.relates_to": {
"rel_type": RelationTypes.THREAD,
"event_id": thread_root_id,
},
},
type="m.room.message",
),
"alice is not subscribed to thread but DOES have a mention on 'biscuits' so should be notified",
)
@override_config({"experimental_features": {"msc4306_enabled": True}})
def test_thread_subscriptions_notification_before_keywords_and_mentions(
self,
) -> None:
"""
Tests one of the purposes of the `postcontent` push rule section:
When a room is set to (what is commonly known as) 'keywords & mentions', we still receive notifications
for messages in threads that we are subscribed to.
Effectively making this 'keywords, mentions & subscriptions'
"""
# add a 'keywords & mentions' setting to the room alice's push rules
# In case this rule isn't clear: by adding a rule in the `room` section that does nothing,
# it stops execution of the push rules before we fall through to the `underride` section,
# where intuitively many kinds of messages will ambiently generate notifications.
# Mentions and keywords are triggered before the `room` block, so this doesn't suppress those.
channel = self.make_request(
"PUT",
f"/_matrix/client/v3/pushrules/global/room/{self.room_id}",
{"actions": []},
access_token=self.token,
)
self.assertEqual(channel.code, HTTPStatus.OK)
bulk_evaluator = BulkPushRuleEvaluator(self.hs)
(thread_root_id,) = self.helper.send_messages(self.room_id, 1, tok=self.token)
# sanity check that our mentions still work
self.assertFalse(
self._create_and_process(
bulk_evaluator,
{
"msgtype": "m.text",
"body": "this is a plain message with no mention",
},
type="m.room.message",
),
"alice should not be notified (mentions & keywords room setting)",
)
self.assertTrue(
self._create_and_process(
bulk_evaluator,
{
"msgtype": "m.text",
"body": "this is a message that mentions alice",
},
type="m.room.message",
),
"alice should be notified (mentioned)",
)
# let's have alice subscribe to the thread
self.get_success(
self.hs.get_datastores().main.subscribe_user_to_thread(
self.alice,
self.room_id,
thread_root_id,
automatic_event_orderings=None,
)
)
self.assertTrue(
self._create_and_process(
bulk_evaluator,
{
"msgtype": "m.text",
"body": "some message in the thread",
"m.relates_to": {
"rel_type": RelationTypes.THREAD,
"event_id": thread_root_id,
},
},
type="m.room.message",
),
"alice is subscribed to thread so should be notified",
)
def test_with_disabled_thread_subscriptions(self) -> None:
"""
Test what happens with threaded events when MSC4306 is disabled.

View File

@@ -18,12 +18,12 @@
#
#
import email.message
import importlib.resources as importlib_resources
import os
from http import HTTPStatus
from typing import Any, Dict, List, Sequence, Tuple
import attr
import pkg_resources
from parameterized import parameterized
from twisted.internet.defer import Deferred
@@ -59,12 +59,11 @@ class EmailPusherTests(HomeserverTestCase):
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
config = self.default_config()
templates = (
importlib_resources.files("synapse").joinpath("res").joinpath("templates")
)
config["email"] = {
"enable_notifs": True,
"template_dir": os.path.abspath(str(templates)),
"template_dir": os.path.abspath(
pkg_resources.resource_filename("synapse", "res/templates")
),
"expiry_template_html": "notice_expiry.html",
"expiry_template_text": "notice_expiry.txt",
"notif_template_html": "notif_mail.html",

View File

@@ -32,6 +32,7 @@ from synapse.config.workers import InstanceTcpLocationConfig, InstanceUnixLocati
from synapse.http.site import SynapseRequest, SynapseSite
from synapse.replication.http import ReplicationRestResource
from synapse.replication.tcp.client import ReplicationDataHandler
from synapse.replication.tcp.handler import ReplicationCommandHandler
from synapse.replication.tcp.protocol import (
ClientReplicationStreamProtocol,
ServerReplicationStreamProtocol,
@@ -96,7 +97,7 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
self.test_handler = self._build_replication_data_handler()
self.worker_hs._replication_data_handler = self.test_handler # type: ignore[attr-defined]
repl_handler = self.worker_hs.get_replication_command_handler()
repl_handler = ReplicationCommandHandler(self.worker_hs)
self.client = ClientReplicationStreamProtocol(
self.worker_hs,
"client",

View File

@@ -18,7 +18,6 @@
# [This file includes modifications made by New Vector Limited]
#
#
import importlib.resources as importlib_resources
import os
import re
from email.parser import Parser
@@ -26,6 +25,8 @@ from http import HTTPStatus
from typing import Any, Dict, List, Optional, Union
from unittest.mock import Mock
import pkg_resources
from twisted.internet.interfaces import IReactorTCP
from twisted.internet.testing import MemoryReactor
@@ -58,12 +59,11 @@ class PasswordResetTestCase(unittest.HomeserverTestCase):
config = self.default_config()
# Email config.
templates = (
importlib_resources.files("synapse").joinpath("res").joinpath("templates")
)
config["email"] = {
"enable_notifs": False,
"template_dir": os.path.abspath(str(templates)),
"template_dir": os.path.abspath(
pkg_resources.resource_filename("synapse", "res/templates")
),
"smtp_host": "127.0.0.1",
"smtp_port": 20,
"require_transport_security": False,
@@ -798,12 +798,11 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
config = self.default_config()
# Email config.
templates = (
importlib_resources.files("synapse").joinpath("res").joinpath("templates")
)
config["email"] = {
"enable_notifs": False,
"template_dir": os.path.abspath(str(templates)),
"template_dir": os.path.abspath(
pkg_resources.resource_filename("synapse", "res/templates")
),
"smtp_host": "127.0.0.1",
"smtp_port": 20,
"require_transport_security": False,

View File

@@ -18,8 +18,6 @@
# [This file includes modifications made by New Vector Limited]
#
#
from http import HTTPStatus
import synapse
from synapse.api.errors import Codes
from synapse.rest.client import login, push_rule, room
@@ -488,23 +486,3 @@ class PushRuleAttributesTestCase(HomeserverTestCase):
},
channel.json_body,
)
def test_no_user_defined_postcontent_rules(self) -> None:
"""
Tests that clients are not permitted to create MSC4306 `postcontent` rules.
"""
self.register_user("bob", "pass")
token = self.login("bob", "pass")
channel = self.make_request(
"PUT",
"/pushrules/global/postcontent/some.user.rule",
{},
access_token=token,
)
self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST)
self.assertEqual(
Codes.INVALID_PARAM,
channel.json_body["errcode"],
)

View File

@@ -20,11 +20,12 @@
#
#
import datetime
import importlib.resources as importlib_resources
import os
from typing import Any, Dict, List, Tuple
from unittest.mock import AsyncMock
import pkg_resources
from twisted.internet.testing import MemoryReactor
import synapse.rest.admin
@@ -980,12 +981,11 @@ class AccountValidityRenewalByEmailTestCase(unittest.HomeserverTestCase):
# Email config.
templates = (
importlib_resources.files("synapse").joinpath("res").joinpath("templates")
)
config["email"] = {
"enable_notifs": True,
"template_dir": os.path.abspath(str(templates)),
"template_dir": os.path.abspath(
pkg_resources.resource_filename("synapse", "res/templates")
),
"expiry_template_html": "notice_expiry.html",
"expiry_template_text": "notice_expiry.txt",
"notif_template_html": "notif_mail.html",

View File

@@ -18,13 +18,27 @@
# [This file includes modifications made by New Vector Limited]
#
#
from parameterized import parameterized_class
from synapse.api.constants import EduTypes
from synapse.rest import admin
from synapse.rest.client import login, sendtodevice, sync
from synapse.types import JsonDict
from tests.unittest import HomeserverTestCase, override_config
@parameterized_class(
("sync_endpoint", "experimental_features"),
[
("/sync", {}),
(
"/_matrix/client/unstable/org.matrix.msc3575/sync/e2ee",
# Enable sliding sync
{"msc3575_enabled": True},
),
],
)
class SendToDeviceTestCase(HomeserverTestCase):
"""
Test `/sendToDevice` will deliver messages across to people receiving them over `/sync`.
@@ -34,6 +48,9 @@ class SendToDeviceTestCase(HomeserverTestCase):
experimental_features: The experimental features homeserver config to use.
"""
sync_endpoint: str
experimental_features: JsonDict
servlets = [
admin.register_servlets,
login.register_servlets,
@@ -41,6 +58,11 @@ class SendToDeviceTestCase(HomeserverTestCase):
sync.register_servlets,
]
def default_config(self) -> JsonDict:
config = super().default_config()
config["experimental_features"] = self.experimental_features
return config
def test_user_to_user(self) -> None:
"""A to-device message from one user to another should get delivered"""
@@ -61,7 +83,7 @@ class SendToDeviceTestCase(HomeserverTestCase):
self.assertEqual(chan.code, 200, chan.result)
# check it appears
channel = self.make_request("GET", "/sync", access_token=user2_tok)
channel = self.make_request("GET", self.sync_endpoint, access_token=user2_tok)
self.assertEqual(channel.code, 200, channel.result)
expected_result = {
"events": [
@@ -77,7 +99,7 @@ class SendToDeviceTestCase(HomeserverTestCase):
# it should re-appear if we do another sync because the to-device message is not
# deleted until we acknowledge it by sending a `?since=...` parameter in the
# next sync request corresponding to the `next_batch` value from the response.
channel = self.make_request("GET", "/sync", access_token=user2_tok)
channel = self.make_request("GET", self.sync_endpoint, access_token=user2_tok)
self.assertEqual(channel.code, 200, channel.result)
self.assertEqual(channel.json_body["to_device"], expected_result)
@@ -85,7 +107,7 @@ class SendToDeviceTestCase(HomeserverTestCase):
sync_token = channel.json_body["next_batch"]
channel = self.make_request(
"GET",
f"/sync?since={sync_token}",
f"{self.sync_endpoint}?since={sync_token}",
access_token=user2_tok,
)
self.assertEqual(channel.code, 200, channel.result)
@@ -111,7 +133,7 @@ class SendToDeviceTestCase(HomeserverTestCase):
self.assertEqual(chan.code, 200, chan.result)
# now sync: we should get two of the three (because burst_count=2)
channel = self.make_request("GET", "/sync", access_token=user2_tok)
channel = self.make_request("GET", self.sync_endpoint, access_token=user2_tok)
self.assertEqual(channel.code, 200, channel.result)
msgs = channel.json_body["to_device"]["events"]
self.assertEqual(len(msgs), 2)
@@ -141,7 +163,7 @@ class SendToDeviceTestCase(HomeserverTestCase):
# ... which should arrive
channel = self.make_request(
"GET",
f"/sync?since={sync_token}",
f"{self.sync_endpoint}?since={sync_token}",
access_token=user2_tok,
)
self.assertEqual(channel.code, 200, channel.result)
@@ -176,7 +198,7 @@ class SendToDeviceTestCase(HomeserverTestCase):
)
# now sync: we should get two of the three
channel = self.make_request("GET", "/sync", access_token=user2_tok)
channel = self.make_request("GET", self.sync_endpoint, access_token=user2_tok)
self.assertEqual(channel.code, 200, channel.result)
msgs = channel.json_body["to_device"]["events"]
self.assertEqual(len(msgs), 2)
@@ -211,7 +233,7 @@ class SendToDeviceTestCase(HomeserverTestCase):
# ... which should arrive
channel = self.make_request(
"GET",
f"/sync?since={sync_token}",
f"{self.sync_endpoint}?since={sync_token}",
access_token=user2_tok,
)
self.assertEqual(channel.code, 200, channel.result)
@@ -236,7 +258,7 @@ class SendToDeviceTestCase(HomeserverTestCase):
user2_tok = self.login("u2", "pass", "d2")
# Do an initial sync
channel = self.make_request("GET", "/sync", access_token=user2_tok)
channel = self.make_request("GET", self.sync_endpoint, access_token=user2_tok)
self.assertEqual(channel.code, 200, channel.result)
sync_token = channel.json_body["next_batch"]
@@ -253,7 +275,7 @@ class SendToDeviceTestCase(HomeserverTestCase):
channel = self.make_request(
"GET",
f"/sync?since={sync_token}&timeout=300000",
f"{self.sync_endpoint}?since={sync_token}&timeout=300000",
access_token=user2_tok,
)
self.assertEqual(channel.code, 200, channel.result)
@@ -263,7 +285,7 @@ class SendToDeviceTestCase(HomeserverTestCase):
channel = self.make_request(
"GET",
f"/sync?since={sync_token}&timeout=300000",
f"{self.sync_endpoint}?since={sync_token}&timeout=300000",
access_token=user2_tok,
)
self.assertEqual(channel.code, 200, channel.result)

View File

@@ -22,7 +22,7 @@ import json
import logging
from typing import List
from parameterized import parameterized
from parameterized import parameterized, parameterized_class
from twisted.internet.testing import MemoryReactor
@@ -702,11 +702,29 @@ class SyncCacheTestCase(unittest.HomeserverTestCase):
self.assertEqual(channel.code, 200, channel.json_body)
@parameterized_class(
("sync_endpoint", "experimental_features"),
[
("/sync", {}),
(
"/_matrix/client/unstable/org.matrix.msc3575/sync/e2ee",
# Enable sliding sync
{"msc3575_enabled": True},
),
],
)
class DeviceListSyncTestCase(unittest.HomeserverTestCase):
"""
Tests regarding device list (`device_lists`) changes.
Attributes:
sync_endpoint: The endpoint under test to use for syncing.
experimental_features: The experimental features homeserver config to use.
"""
sync_endpoint: str
experimental_features: JsonDict
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
@@ -715,6 +733,11 @@ class DeviceListSyncTestCase(unittest.HomeserverTestCase):
devices.register_servlets,
]
def default_config(self) -> JsonDict:
config = super().default_config()
config["experimental_features"] = self.experimental_features
return config
def test_receiving_local_device_list_changes(self) -> None:
"""Tests that a local users that share a room receive each other's device list
changes.
@@ -744,7 +767,7 @@ class DeviceListSyncTestCase(unittest.HomeserverTestCase):
# Now have Bob initiate an initial sync (in order to get a since token)
channel = self.make_request(
"GET",
"/sync",
self.sync_endpoint,
access_token=bob_access_token,
)
self.assertEqual(channel.code, 200, channel.json_body)
@@ -754,7 +777,7 @@ class DeviceListSyncTestCase(unittest.HomeserverTestCase):
# which we hope will happen as a result of Alice updating their device list.
bob_sync_channel = self.make_request(
"GET",
f"/sync?since={next_batch_token}&timeout=30000",
f"{self.sync_endpoint}?since={next_batch_token}&timeout=30000",
access_token=bob_access_token,
# Start the request, then continue on.
await_result=False,
@@ -801,7 +824,7 @@ class DeviceListSyncTestCase(unittest.HomeserverTestCase):
# Have Bob initiate an initial sync (in order to get a since token)
channel = self.make_request(
"GET",
"/sync",
self.sync_endpoint,
access_token=bob_access_token,
)
self.assertEqual(channel.code, 200, channel.json_body)
@@ -811,7 +834,7 @@ class DeviceListSyncTestCase(unittest.HomeserverTestCase):
# which we hope will happen as a result of Alice updating their device list.
bob_sync_channel = self.make_request(
"GET",
f"/sync?since={next_batch_token}&timeout=1000",
f"{self.sync_endpoint}?since={next_batch_token}&timeout=1000",
access_token=bob_access_token,
# Start the request, then continue on.
await_result=False,
@@ -850,7 +873,9 @@ class DeviceListSyncTestCase(unittest.HomeserverTestCase):
)
# Request an initial sync
channel = self.make_request("GET", "/sync", access_token=alice_access_token)
channel = self.make_request(
"GET", self.sync_endpoint, access_token=alice_access_token
)
self.assertEqual(channel.code, 200, channel.json_body)
next_batch = channel.json_body["next_batch"]
@@ -858,7 +883,7 @@ class DeviceListSyncTestCase(unittest.HomeserverTestCase):
# It won't return until something has happened
incremental_sync_channel = self.make_request(
"GET",
f"/sync?since={next_batch}&timeout=30000",
f"{self.sync_endpoint}?since={next_batch}&timeout=30000",
access_token=alice_access_token,
await_result=False,
)
@@ -888,6 +913,17 @@ class DeviceListSyncTestCase(unittest.HomeserverTestCase):
)
@parameterized_class(
("sync_endpoint", "experimental_features"),
[
("/sync", {}),
(
"/_matrix/client/unstable/org.matrix.msc3575/sync/e2ee",
# Enable sliding sync
{"msc3575_enabled": True},
),
],
)
class DeviceOneTimeKeysSyncTestCase(unittest.HomeserverTestCase):
"""
Tests regarding device one time keys (`device_one_time_keys_count`) changes.
@@ -897,6 +933,9 @@ class DeviceOneTimeKeysSyncTestCase(unittest.HomeserverTestCase):
experimental_features: The experimental features homeserver config to use.
"""
sync_endpoint: str
experimental_features: JsonDict
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
@@ -904,6 +943,11 @@ class DeviceOneTimeKeysSyncTestCase(unittest.HomeserverTestCase):
devices.register_servlets,
]
def default_config(self) -> JsonDict:
config = super().default_config()
config["experimental_features"] = self.experimental_features
return config
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.e2e_keys_handler = hs.get_e2e_keys_handler()
@@ -920,7 +964,9 @@ class DeviceOneTimeKeysSyncTestCase(unittest.HomeserverTestCase):
)
# Request an initial sync
channel = self.make_request("GET", "/sync", access_token=alice_access_token)
channel = self.make_request(
"GET", self.sync_endpoint, access_token=alice_access_token
)
self.assertEqual(channel.code, 200, channel.json_body)
# Check for those one time key counts
@@ -965,7 +1011,9 @@ class DeviceOneTimeKeysSyncTestCase(unittest.HomeserverTestCase):
)
# Request an initial sync
channel = self.make_request("GET", "/sync", access_token=alice_access_token)
channel = self.make_request(
"GET", self.sync_endpoint, access_token=alice_access_token
)
self.assertEqual(channel.code, 200, channel.json_body)
# Check for those one time key counts
@@ -976,6 +1024,17 @@ class DeviceOneTimeKeysSyncTestCase(unittest.HomeserverTestCase):
)
@parameterized_class(
("sync_endpoint", "experimental_features"),
[
("/sync", {}),
(
"/_matrix/client/unstable/org.matrix.msc3575/sync/e2ee",
# Enable sliding sync
{"msc3575_enabled": True},
),
],
)
class DeviceUnusedFallbackKeySyncTestCase(unittest.HomeserverTestCase):
"""
Tests regarding device one time keys (`device_unused_fallback_key_types`) changes.
@@ -985,6 +1044,9 @@ class DeviceUnusedFallbackKeySyncTestCase(unittest.HomeserverTestCase):
experimental_features: The experimental features homeserver config to use.
"""
sync_endpoint: str
experimental_features: JsonDict
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
@@ -992,6 +1054,11 @@ class DeviceUnusedFallbackKeySyncTestCase(unittest.HomeserverTestCase):
devices.register_servlets,
]
def default_config(self) -> JsonDict:
config = super().default_config()
config["experimental_features"] = self.experimental_features
return config
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = self.hs.get_datastores().main
self.e2e_keys_handler = hs.get_e2e_keys_handler()
@@ -1011,7 +1078,9 @@ class DeviceUnusedFallbackKeySyncTestCase(unittest.HomeserverTestCase):
)
# Request an initial sync
channel = self.make_request("GET", "/sync", access_token=alice_access_token)
channel = self.make_request(
"GET", self.sync_endpoint, access_token=alice_access_token
)
self.assertEqual(channel.code, 200, channel.json_body)
# Check for those one time key counts
@@ -1053,7 +1122,9 @@ class DeviceUnusedFallbackKeySyncTestCase(unittest.HomeserverTestCase):
self.assertEqual(fallback_res, ["alg1"], fallback_res)
# Request an initial sync
channel = self.make_request("GET", "/sync", access_token=alice_access_token)
channel = self.make_request(
"GET", self.sync_endpoint, access_token=alice_access_token
)
self.assertEqual(channel.code, 200, channel.json_body)
# Check for the unused fallback key types

View File

@@ -1145,9 +1145,6 @@ def setup_test_homeserver(
reactor=reactor,
)
# Register the cleanup hook
cleanup_func(hs.cleanup)
# Install @cache_in_self attributes
for key, val in kwargs.items():
setattr(hs, "_" + key, val)
@@ -1160,16 +1157,6 @@ def setup_test_homeserver(
with patch("synapse.storage.database.make_pool", side_effect=make_fake_db_pool):
hs.setup()
# Register background tasks required by this server. This must be done
# somewhat manually due to the background tasks not being registered
# unless handlers are instantiated.
#
# Since, we don't have to worry about `daemonize` (forking the process) in tests, we
# can just start the background tasks straight away after `hs.setup`. (compare this
# with where we call `hs.start_background_tasks()` outside of the test environment).
if hs.config.worker.run_background_tasks:
hs.start_background_tasks()
# Since we've changed the databases to run DB transactions on the same
# thread, we need to stop the event fetcher hogging that one thread.
hs.get_datastores().main.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING = False

View File

@@ -20,7 +20,7 @@
#
import logging
from typing import List, Optional
from typing import Dict, List, Optional
from twisted.internet.testing import MemoryReactor
@@ -39,6 +39,77 @@ from tests.unittest import HomeserverTestCase
logger = logging.getLogger(__name__)
class EventsTestCase(HomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]
def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
self._store = self.hs.get_datastores().main
def test_get_senders_for_event_ids(self) -> None:
"""Tests the `get_senders_for_event_ids` storage function."""
users_and_tokens: Dict[str, str] = {}
for localpart_suffix in range(10):
localpart = f"user_{localpart_suffix}"
user_id = self.register_user(localpart, "rabbit")
token = self.login(localpart, "rabbit")
users_and_tokens[user_id] = token
room_creator_user_id = self.register_user("room_creator", "rabbit")
room_creator_token = self.login("room_creator", "rabbit")
users_and_tokens[room_creator_user_id] = room_creator_token
# Create a room and invite some users.
room_id = self.helper.create_room_as(
room_creator_user_id, tok=room_creator_token
)
event_ids_to_senders: Dict[str, str] = {}
for user_id, token in users_and_tokens.items():
if user_id == room_creator_user_id:
continue
self.helper.invite(
room=room_id,
targ=user_id,
tok=room_creator_token,
)
# Have the user accept the invite and join the room.
self.helper.join(
room=room_id,
user=user_id,
tok=token,
)
# Have the user send an event.
response = self.helper.send_event(
room_id=room_id,
type="m.room.message",
content={
"msgtype": "m.text",
"body": f"hello, I'm {user_id}!",
},
tok=token,
)
# Record the event ID and sender.
event_id = response["event_id"]
event_ids_to_senders[event_id] = user_id
# Check that `get_senders_for_event_ids` returns the correct data.
response = self.get_success(
self._store.get_senders_for_event_ids(list(event_ids_to_senders.keys()))
)
self.assert_dict(event_ids_to_senders, response)
class ExtremPruneTestCase(HomeserverTestCase):
servlets = [
admin.register_servlets,