Compare commits

...

19 Commits

Author SHA1 Message Date
Erik Johnston
40ec6110b0 Newsfile 2024-10-07 11:39:01 +01:00
Erik Johnston
b8e4f8427e Add missing license header 2024-10-07 11:37:42 +01:00
V02460
e8e0f0fad7 Add config option redis.password_path (#17717)
Adds the option to load the Redis password from a file, instead of
giving it in the config directly. The code is similar to how it’s done
for `registration_shared_secret_path`. I changed the example in the
documentation to represent the best practice regarding the handling of
secrets.

Reading secrets from files has the security advantage of separating the
secrets from the config. It also simplifies secrets management in
Kubernetes.
2024-10-07 09:46:51 +01:00
Henrique
beb7a951f4 docs: add note about PYTHONMALLOC for accurate jemalloc memory tracking (#17709)
Added a note in the documentation suggesting that users may set
`PYTHONMALLOC=malloc` when using `jemalloc`. This allows jemalloc to
track memory usage more accurately by bypassing Python's internal
small-object allocator (`pymalloc`), helping to ensure that
`cache_autotuning` functions as expected.

This doc change aims to provide more clarity for users configuring
jemalloc with Synapse.


Based on:
4ac783549c/synapse/metrics/jemalloc.py (L198-L201)
2024-10-07 08:37:39 +00:00
dependabot[bot]
d34f827ed8 Bump python-multipart from 0.0.10 to 0.0.12 (#17772) 2024-10-07 09:14:30 +01:00
Andrew Ferrazzutti
9920417723 Don't say MSC4140 is supported when it's disabled (#17780) 2024-10-04 13:42:34 +01:00
Andrew Morgan
316d635906 Fix NAME attribute of ReplicationRemovePusherRestServlet (#17779) 2024-10-04 09:53:35 +01:00
Dirk Klimpel
8bbe66a9b9 explain load balancing for federation_sender_instances (#17776)
Adding information on how the load is distributed for
`federation_sender_instances`.

Thx to @devonh for the information.

causal source:
c2e5e9e67c/synapse/config/_base.py (L946-L989)

### Pull Request Checklist

<!-- Please read
https://element-hq.github.io/synapse/latest/development/contributing_guide.html
before submitting your pull request -->

* [x] Pull request is based on the develop branch
* [x] Pull request includes a [changelog
file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog).
The entry should:
- Be a short description of your change which makes sense to users.
"Fixed a bug that prevented receiving messages from other servers."
instead of "Moved X method from `EventStore` to `EventWorkerStore`.".
  - Use markdown where necessary, mostly for `code blocks`.
  - End with either a period (.) or an exclamation mark (!).
  - Start with a capital letter.
- Feel free to credit yourself, by adding a sentence "Contributed by
@github_username." or "Contributed by [Your Name]." to the end of the
entry.
* [x] [Code
style](https://element-hq.github.io/synapse/latest/code_style.html) is
correct
(run the
[linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))

---------

Co-authored-by: Devon Hudson <devon.dmytro@gmail.com>
2024-10-03 22:01:33 +00:00
Andrew Morgan
d4e3ad04cd Merge branch 'master' into develop 2024-10-01 12:18:22 +01:00
Andrew Morgan
55c0391cc8 1.116.0 2024-10-01 11:14:13 +01:00
Erik Johnston
81e0f57800 Fix perf when streams don't change often (#17767)
There is a bug with the `StreamChangeCache` where it would incorrectly
return that all entities had changed if asked for entities changed
*since* the earliest stream position.

Note that for streams we use the inequalities: `$min_stream_id <
stream_id <= $max_stream_id`, i.e. when we ask the stream change cache
for all things that have changed since `$stream_id` we don't care for
events that happened *at* `$stream_id`.

Specifically: `_earliest_known_stream_pos` is the position at which we
know that we'll have entries for all changes since that point, we can
use the cache for any stream IDs that equal
`_earliest_known_stream_pos`.

`_earliest_known_stream_pos` is set in three places:
- On startup we set it either to:
  - the current maximum stream ID, with not prefilled values; or
  - the minimum of the latest N values we pulled from the DB
- When we evict items from the bottom, we set it to the stream ID of the
evicted items.

This was changed in https://github.com/matrix-org/synapse/pull/14435,
but I think we were overly conservative there.

---------

Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
2024-09-30 13:52:33 +01:00
Erik Johnston
ae4862c38f Optimise notifier mk2 (#17766)
Based on #17765.

Basically the idea is to reduce the overhead of calling
`ObservableDeferred` in a loop. The two gains are: a) just using a list
of deferreds rather than the machinery of `ObservableDeferred`, and b)
only calling `PreseverLoggingContext` once.

`PreseverLoggingContext` in particular is expensive to call a lot as
each time it needs to call `get_thread_resource_usage` twice, so that it
an update the CPU metrics of the log context.
2024-09-30 13:32:31 +01:00
dependabot[bot]
602956ef64 Bump ruff from 0.6.7 to 0.6.8 (#17774) 2024-09-30 13:08:56 +01:00
dependabot[bot]
444b565c76 Bump phonenumbers from 8.13.45 to 8.13.46 (#17773) 2024-09-30 13:07:57 +01:00
dependabot[bot]
8068f31146 Bump regex from 1.10.6 to 1.11.0 (#17770) 2024-09-30 13:06:43 +01:00
Erik Johnston
5210565c12 Reduce overhead of sliding sync E2EE loops (#17771)
Mainly toning down logging and only calling
`get_membership_from_event_ids` if something has changed.
2024-09-30 13:00:14 +01:00
Erik Johnston
de955293cf Add fast path for sliding sync streams that only ask for extensions (#17768)
Principally useful for EX e2ee sliding sync connections.
2024-09-30 12:59:50 +01:00
Erik Johnston
93889eb2e7 Optimise notifier (#17765)
The notifier is quite inefficient when it has to wake up many user
streams all at once

From a silly benchmark this takes the time to notify 1M user streams
from ~30s to ~5s
2024-09-30 12:58:13 +01:00
Erik Johnston
ece66ba61c Minor perf speed up for large accounts on SSS (#17751)
This works as instead of passing *all* rooms to `record_sent_rooms` we
only need to pass rooms that were previously not in the LIVE state.

This came from a py-spy where we were spending ~10% CPU calling these
functions. Note that `record_sent_rooms` is a no-op for rooms that are
already in the `LIVE` state, so we only need to call them for
`PREVIOUSLY` or `INITIAL` rooms.
2024-09-30 12:58:02 +01:00
34 changed files with 333 additions and 132 deletions

View File

@@ -1,3 +1,10 @@
# Synapse 1.116.0 (2024-10-01)
No significant changes since 1.116.0rc2.
# Synapse 1.116.0rc2 (2024-09-26)
### Features

12
Cargo.lock generated
View File

@@ -444,9 +444,9 @@ dependencies = [
[[package]]
name = "regex"
version = "1.10.6"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619"
checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8"
dependencies = [
"aho-corasick",
"memchr",
@@ -456,9 +456,9 @@ dependencies = [
[[package]]
name = "regex-automata"
version = "0.4.6"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea"
checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3"
dependencies = [
"aho-corasick",
"memchr",
@@ -467,9 +467,9 @@ dependencies = [
[[package]]
name = "regex-syntax"
version = "0.8.3"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "ryu"

1
changelog.d/17709.doc Normal file
View File

@@ -0,0 +1 @@
Add documentation note about PYTHONMALLOC for accurate jemalloc memory tracking. Contributed by @hensg.

View File

@@ -0,0 +1 @@
Add config option `redis.password_path`.

1
changelog.d/17751.misc Normal file
View File

@@ -0,0 +1 @@
Minor performance increase for large accounts using sliding sync.

1
changelog.d/17765.misc Normal file
View File

@@ -0,0 +1 @@
Increase performance of the notifier when there are many syncing users.

1
changelog.d/17766.misc Normal file
View File

@@ -0,0 +1 @@
Increase performance of the notifier when there are many syncing users.

1
changelog.d/17767.misc Normal file
View File

@@ -0,0 +1 @@
Fix performance of streams that don't change often.

1
changelog.d/17768.misc Normal file
View File

@@ -0,0 +1 @@
Improve performance of sliding sync connections that do not ask for any rooms.

1
changelog.d/17771.misc Normal file
View File

@@ -0,0 +1 @@
Reduce overhead of sliding sync E2EE loops.

1
changelog.d/17776.doc Normal file
View File

@@ -0,0 +1 @@
Explain how load balancing works for `federation_sender_instances`.

1
changelog.d/17779.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a rare bug introduced in v1.29.0 where invalidating a user's access token from a worker could raise an error.

1
changelog.d/17780.bugfix Normal file
View File

@@ -0,0 +1 @@
In the response to `GET /_matrix/client/versions`, set the `unstable_features` flag for MSC4140 to `false` when server configuration disables support for delayed events.

1
changelog.d/17793.misc Normal file
View File

@@ -0,0 +1 @@
Add missing license header.

6
debian/changelog vendored
View File

@@ -1,3 +1,9 @@
matrix-synapse-py3 (1.116.0) stable; urgency=medium
* New Synapse release 1.116.0.
-- Synapse Packaging team <packages@matrix.org> Tue, 01 Oct 2024 11:14:07 +0100
matrix-synapse-py3 (1.116.0~rc2) stable; urgency=medium
* New synapse release 1.116.0rc2.

View File

@@ -255,6 +255,8 @@ line to `/etc/default/matrix-synapse`:
LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2
*Note*: You may need to set `PYTHONMALLOC=malloc` to ensure that `jemalloc` can accurately calculate memory usage. By default, Python uses its internal small-object allocator, which may interfere with jemalloc's ability to track memory consumption correctly. This could prevent the [cache_autotuning](../configuration/config_documentation.md#caches-and-associated-values) feature from functioning as expected, as the Python allocator may not reach the memory threshold set by `max_cache_memory_usage`, thus not triggering the cache eviction process.
This made a significant difference on Python 2.7 - it's unclear how
much of an improvement it provides on Python 3.x.

View File

@@ -4368,7 +4368,13 @@ It is possible to scale the processes that handle sending outbound federation re
by running a [`generic_worker`](../../workers.md#synapseappgeneric_worker) and adding it's [`worker_name`](#worker_name) to
a `federation_sender_instances` map. Doing so will remove handling of this function from
the main process. Multiple workers can be added to this map, in which case the work is
balanced across them.
balanced across them.
The way that the load balancing works is any outbound federation request will be assigned
to a federation sender worker based on the hash of the destination server name. This
means that all requests being sent to the same destination will be processed by the same
worker instance. Multiple `federation_sender_instances` are useful if there is a federation
with multiple servers.
This configuration setting must be shared between all workers handling federation
sending, and if changed all federation sender workers must be stopped at the same time
@@ -4518,6 +4524,9 @@ This setting has the following sub-options:
* `path`: The full path to a local Unix socket file. **If this is used, `host` and
`port` are ignored.** Defaults to `/tmp/redis.sock'
* `password`: Optional password if configured on the Redis instance.
* `password_path`: Alternative to `password`, reading the password from an
external file. The file should be a plain text file, containing only the
password. Synapse reads the password from the given file once at startup.
* `dbid`: Optional redis dbid if needs to connect to specific redis logical db.
* `use_tls`: Whether to use tls connection. Defaults to false.
* `certificate_file`: Optional path to the certificate file
@@ -4531,13 +4540,16 @@ This setting has the following sub-options:
_Changed in Synapse 1.85.0: Added path option to use a local Unix socket_
_Changed in Synapse 1.116.0: Added password\_path_
Example configuration:
```yaml
redis:
enabled: true
host: localhost
port: 6379
password: <secret_password>
password_path: <path_to_the_password_file>
# OR password: <secret_password>
dbid: <dbid>
#use_tls: True
#certificate_file: <path_to_the_certificate_file>

52
poetry.lock generated
View File

@@ -1447,13 +1447,13 @@ dev = ["jinja2"]
[[package]]
name = "phonenumbers"
version = "8.13.45"
version = "8.13.46"
description = "Python version of Google's common library for parsing, formatting, storing and validating international phone numbers."
optional = false
python-versions = "*"
files = [
{file = "phonenumbers-8.13.45-py2.py3-none-any.whl", hash = "sha256:bf05ec20fcd13f0d53e43a34ed7bd1c8be26a72b88fce4b8c64fca5b4641987a"},
{file = "phonenumbers-8.13.45.tar.gz", hash = "sha256:53679a95b6060fd5e15467759252c87933d8566d6a5be00995a579eb0e02435b"},
{file = "phonenumbers-8.13.46-py2.py3-none-any.whl", hash = "sha256:519422d407af066fdbf98e179ea2e214487060f26526d67871f817eefbbb2134"},
{file = "phonenumbers-8.13.46.tar.gz", hash = "sha256:94bf18ba9725bb6868d29473b13f78ef01e2585c5cb561ec0200be7676e77452"},
]
[[package]]
@@ -1974,13 +1974,13 @@ six = ">=1.5"
[[package]]
name = "python-multipart"
version = "0.0.10"
version = "0.0.12"
description = "A streaming multipart parser for Python"
optional = false
python-versions = ">=3.8"
files = [
{file = "python_multipart-0.0.10-py3-none-any.whl", hash = "sha256:2b06ad9e8d50c7a8db80e3b56dab590137b323410605af2be20d62a5f1ba1dc8"},
{file = "python_multipart-0.0.10.tar.gz", hash = "sha256:46eb3c6ce6fdda5fb1a03c7e11d490e407c6930a2703fe7aef4da71c374688fa"},
{file = "python_multipart-0.0.12-py3-none-any.whl", hash = "sha256:43dcf96cf65888a9cd3423544dd0d75ac10f7aa0c3c28a175bbcd00c9ce1aebf"},
{file = "python_multipart-0.0.12.tar.gz", hash = "sha256:045e1f98d719c1ce085ed7f7e1ef9d8ccc8c02ba02b5566d5f7521410ced58cb"},
]
[[package]]
@@ -2277,29 +2277,29 @@ files = [
[[package]]
name = "ruff"
version = "0.6.7"
version = "0.6.8"
description = "An extremely fast Python linter and code formatter, written in Rust."
optional = false
python-versions = ">=3.7"
files = [
{file = "ruff-0.6.7-py3-none-linux_armv6l.whl", hash = "sha256:08277b217534bfdcc2e1377f7f933e1c7957453e8a79764d004e44c40db923f2"},
{file = "ruff-0.6.7-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:c6707a32e03b791f4448dc0dce24b636cbcdee4dd5607adc24e5ee73fd86c00a"},
{file = "ruff-0.6.7-py3-none-macosx_11_0_arm64.whl", hash = "sha256:533d66b7774ef224e7cf91506a7dafcc9e8ec7c059263ec46629e54e7b1f90ab"},
{file = "ruff-0.6.7-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:17a86aac6f915932d259f7bec79173e356165518859f94649d8c50b81ff087e9"},
{file = "ruff-0.6.7-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:b3f8822defd260ae2460ea3832b24d37d203c3577f48b055590a426a722d50ef"},
{file = "ruff-0.6.7-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9ba4efe5c6dbbb58be58dd83feedb83b5e95c00091bf09987b4baf510fee5c99"},
{file = "ruff-0.6.7-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:525201b77f94d2b54868f0cbe5edc018e64c22563da6c5c2e5c107a4e85c1c0d"},
{file = "ruff-0.6.7-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:8854450839f339e1049fdbe15d875384242b8e85d5c6947bb2faad33c651020b"},
{file = "ruff-0.6.7-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:2f0b62056246234d59cbf2ea66e84812dc9ec4540518e37553513392c171cb18"},
{file = "ruff-0.6.7-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6b1462fa56c832dc0cea5b4041cfc9c97813505d11cce74ebc6d1aae068de36b"},
{file = "ruff-0.6.7-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:02b083770e4cdb1495ed313f5694c62808e71764ec6ee5db84eedd82fd32d8f5"},
{file = "ruff-0.6.7-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:0c05fd37013de36dfa883a3854fae57b3113aaa8abf5dea79202675991d48624"},
{file = "ruff-0.6.7-py3-none-musllinux_1_2_i686.whl", hash = "sha256:f49c9caa28d9bbfac4a637ae10327b3db00f47d038f3fbb2195c4d682e925b14"},
{file = "ruff-0.6.7-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:a0e1655868164e114ba43a908fd2d64a271a23660195017c17691fb6355d59bb"},
{file = "ruff-0.6.7-py3-none-win32.whl", hash = "sha256:a939ca435b49f6966a7dd64b765c9df16f1faed0ca3b6f16acdf7731969deb35"},
{file = "ruff-0.6.7-py3-none-win_amd64.whl", hash = "sha256:590445eec5653f36248584579c06252ad2e110a5d1f32db5420de35fb0e1c977"},
{file = "ruff-0.6.7-py3-none-win_arm64.whl", hash = "sha256:b28f0d5e2f771c1fe3c7a45d3f53916fc74a480698c4b5731f0bea61e52137c8"},
{file = "ruff-0.6.7.tar.gz", hash = "sha256:44e52129d82266fa59b587e2cd74def5637b730a69c4542525dfdecfaae38bd5"},
{file = "ruff-0.6.8-py3-none-linux_armv6l.whl", hash = "sha256:77944bca110ff0a43b768f05a529fecd0706aac7bcce36d7f1eeb4cbfca5f0f2"},
{file = "ruff-0.6.8-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:27b87e1801e786cd6ede4ada3faa5e254ce774de835e6723fd94551464c56b8c"},
{file = "ruff-0.6.8-py3-none-macosx_11_0_arm64.whl", hash = "sha256:cd48f945da2a6334f1793d7f701725a76ba93bf3d73c36f6b21fb04d5338dcf5"},
{file = "ruff-0.6.8-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:677e03c00f37c66cea033274295a983c7c546edea5043d0c798833adf4cf4c6f"},
{file = "ruff-0.6.8-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:9f1476236b3eacfacfc0f66aa9e6cd39f2a624cb73ea99189556015f27c0bdeb"},
{file = "ruff-0.6.8-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6f5a2f17c7d32991169195d52a04c95b256378bbf0de8cb98478351eb70d526f"},
{file = "ruff-0.6.8-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:5fd0d4b7b1457c49e435ee1e437900ced9b35cb8dc5178921dfb7d98d65a08d0"},
{file = "ruff-0.6.8-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:f8034b19b993e9601f2ddf2c517451e17a6ab5cdb1c13fdff50c1442a7171d87"},
{file = "ruff-0.6.8-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:6cfb227b932ba8ef6e56c9f875d987973cd5e35bc5d05f5abf045af78ad8e098"},
{file = "ruff-0.6.8-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6ef0411eccfc3909269fed47c61ffebdcb84a04504bafa6b6df9b85c27e813b0"},
{file = "ruff-0.6.8-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:007dee844738c3d2e6c24ab5bc7d43c99ba3e1943bd2d95d598582e9c1b27750"},
{file = "ruff-0.6.8-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:ce60058d3cdd8490e5e5471ef086b3f1e90ab872b548814e35930e21d848c9ce"},
{file = "ruff-0.6.8-py3-none-musllinux_1_2_i686.whl", hash = "sha256:1085c455d1b3fdb8021ad534379c60353b81ba079712bce7a900e834859182fa"},
{file = "ruff-0.6.8-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:70edf6a93b19481affd287d696d9e311388d808671bc209fb8907b46a8c3af44"},
{file = "ruff-0.6.8-py3-none-win32.whl", hash = "sha256:792213f7be25316f9b46b854df80a77e0da87ec66691e8f012f887b4a671ab5a"},
{file = "ruff-0.6.8-py3-none-win_amd64.whl", hash = "sha256:ec0517dc0f37cad14a5319ba7bba6e7e339d03fbf967a6d69b0907d61be7a263"},
{file = "ruff-0.6.8-py3-none-win_arm64.whl", hash = "sha256:8d3bb2e3fbb9875172119021a13eed38849e762499e3cfde9588e4b4d70968dc"},
{file = "ruff-0.6.8.tar.gz", hash = "sha256:a5bf44b1aa0adaf6d9d20f86162b34f7c593bfedabc51239953e446aefc8ce18"},
]
[[package]]
@@ -3114,4 +3114,4 @@ user-search = ["pyicu"]
[metadata]
lock-version = "2.0"
python-versions = "^3.8.0"
content-hash = "93c267fac3428b764f954e6faa17937b9c97b1ed2bdafc41dd8f6cb5d2ce085b"
content-hash = "304d03b74d2886def69ae44ce5afaed21318db9f09aae91281e0f182e1660ffd"

View File

@@ -97,7 +97,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry]
name = "matrix-synapse"
version = "1.116.0rc2"
version = "1.116.0"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "AGPL-3.0-or-later"
@@ -320,7 +320,7 @@ all = [
# failing on new releases. Keeping lower bounds loose here means that dependabot
# can bump versions without having to update the content-hash in the lockfile.
# This helps prevents merge conflicts when running a batch of dependabot updates.
ruff = "0.6.7"
ruff = "0.6.8"
# Type checking only works with the pydantic.v1 compat module from pydantic v2
pydantic = "^2"

View File

@@ -338,7 +338,7 @@ class MSC3861DelegatedAuth(BaseAuth):
logger.exception("Failed to introspect token")
raise SynapseError(503, "Unable to introspect the access token")
logger.info(f"Introspection result: {introspection_result!r}")
logger.debug("Introspection result: %r", introspection_result)
# TODO: introspection verification should be more extensive, especially:
# - verify the audience

View File

@@ -21,10 +21,15 @@
from typing import Any
from synapse.config._base import Config
from synapse.config._base import Config, ConfigError, read_file
from synapse.types import JsonDict
from synapse.util.check_dependencies import check_requirements
CONFLICTING_PASSWORD_OPTS_ERROR = """\
You have configured both `redis.password` and `redis.password_path`.
These are mutually incompatible.
"""
class RedisConfig(Config):
section = "redis"
@@ -43,6 +48,17 @@ class RedisConfig(Config):
self.redis_path = redis_config.get("path", None)
self.redis_dbid = redis_config.get("dbid", None)
self.redis_password = redis_config.get("password")
redis_password_path = redis_config.get("password_path")
if redis_password_path:
if self.redis_password:
raise ConfigError(CONFLICTING_PASSWORD_OPTS_ERROR)
self.redis_password = read_file(
redis_password_path,
(
"redis",
"password_path",
),
).strip()
self.redis_use_tls = redis_config.get("use_tls", False)
self.redis_certificate = redis_config.get("certificate_file", None)

View File

@@ -48,6 +48,7 @@ from synapse.metrics.background_process_metrics import (
wrap_as_background_process,
)
from synapse.storage.databases.main.client_ips import DeviceLastConnectionInfo
from synapse.storage.databases.main.roommember import EventIdMembership
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.types import (
DeviceListUpdates,
@@ -222,7 +223,6 @@ class DeviceWorkerHandler:
return changed
@trace
@measure_func("device.get_user_ids_changed")
@cancellable
async def get_user_ids_changed(
self, user_id: str, from_token: StreamToken
@@ -290,9 +290,11 @@ class DeviceWorkerHandler:
memberships_to_fetch.add(delta.prev_event_id)
# Fetch all the memberships for the membership events
event_id_to_memberships = await self.store.get_membership_from_event_ids(
memberships_to_fetch
)
event_id_to_memberships: Mapping[str, Optional[EventIdMembership]] = {}
if memberships_to_fetch:
event_id_to_memberships = await self.store.get_membership_from_event_ids(
memberships_to_fetch
)
joined_invited_knocked = (
Membership.JOIN,
@@ -349,7 +351,6 @@ class DeviceWorkerHandler:
return device_list_updates
@measure_func("_generate_sync_entry_for_device_list")
async def generate_sync_entry_for_device_list(
self,
user_id: str,

View File

@@ -572,7 +572,8 @@ class SlidingSyncExtensionHandler:
# Now record which rooms are now up to data, and which rooms have
# pending updates to send.
new_connection_state.account_data.record_sent_rooms(relevant_room_ids)
new_connection_state.account_data.record_sent_rooms(previously_rooms.keys())
new_connection_state.account_data.record_sent_rooms(initial_rooms)
missing_updates = (
all_updates_since_the_from_token.keys() - relevant_room_ids
)
@@ -763,9 +764,10 @@ class SlidingSyncExtensionHandler:
room_id_to_receipt_map[room_id] = {"type": type, "content": content}
# Now we update the per-connection state to track which receipts we have
# and haven't sent down.
new_connection_state.receipts.record_sent_rooms(relevant_room_ids)
# Update the per-connection state to track which rooms we have sent
# all the receipts for.
new_connection_state.receipts.record_sent_rooms(previously_rooms.keys())
new_connection_state.receipts.record_sent_rooms(initial_rooms)
if from_token:
# Now find the set of rooms that may have receipts that we're not sending

View File

@@ -123,6 +123,19 @@ class SlidingSyncInterestedRooms:
newly_left_rooms: AbstractSet[str]
dm_room_ids: AbstractSet[str]
@staticmethod
def empty() -> "SlidingSyncInterestedRooms":
return SlidingSyncInterestedRooms(
lists={},
relevant_room_map={},
relevant_rooms_to_send_map={},
all_rooms=set(),
room_membership_for_user_map={},
newly_joined_rooms=set(),
newly_left_rooms=set(),
dm_room_ids=set(),
)
def filter_membership_for_sync(
*,
@@ -181,6 +194,14 @@ class SlidingSyncRoomLists:
from_token: Optional[StreamToken],
) -> SlidingSyncInterestedRooms:
"""Fetch the set of rooms that match the request"""
has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
has_room_subscriptions = (
sync_config.room_subscriptions is not None
and len(sync_config.room_subscriptions) > 0
)
if not has_lists and not has_room_subscriptions:
return SlidingSyncInterestedRooms.empty()
if await self.store.have_finished_sliding_sync_background_jobs():
return await self._compute_interested_rooms_new_tables(

View File

@@ -1039,7 +1039,7 @@ class _MultipartParserProtocol(protocol.Protocol):
self.deferred = deferred
self.boundary = boundary
self.max_length = max_length
self.parser = None
self.parser: Optional[multipart.MultipartParser] = None
self.multipart_response = MultipartResponse()
self.has_redirect = False
self.in_json = False
@@ -1097,7 +1097,7 @@ class _MultipartParserProtocol(protocol.Protocol):
self.deferred.errback()
self.file_length += end - start
callbacks = {
callbacks: "multipart.multipart.MultipartCallbacks" = {
"on_header_field": on_header_field,
"on_header_value": on_header_value,
"on_part_data": on_part_data,
@@ -1113,7 +1113,7 @@ class _MultipartParserProtocol(protocol.Protocol):
self.transport.abortConnection()
try:
self.parser.write(incoming_data) # type: ignore[attr-defined]
self.parser.write(incoming_data)
except Exception as e:
logger.warning(f"Exception writing to multipart parser: {e}")
self.deferred.errback()

View File

@@ -41,6 +41,7 @@ import attr
from prometheus_client import Counter
from twisted.internet import defer
from twisted.internet.defer import Deferred
from synapse.api.constants import EduTypes, EventTypes, HistoryVisibility, Membership
from synapse.api.errors import AuthError
@@ -52,6 +53,7 @@ from synapse.logging.opentracing import log_kv, start_active_span
from synapse.metrics import LaterGauge
from synapse.streams.config import PaginationConfig
from synapse.types import (
ISynapseReactor,
JsonDict,
MultiWriterStreamToken,
PersistedEventPosition,
@@ -61,8 +63,11 @@ from synapse.types import (
StreamToken,
UserID,
)
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.async_helpers import (
timeout_deferred,
)
from synapse.util.metrics import Measure
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
@@ -89,18 +94,6 @@ def count(func: Callable[[T], bool], it: Iterable[T]) -> int:
return n
class _NotificationListener:
"""This represents a single client connection to the events stream.
The events stream handler will have yielded to the deferred, so to
notify the handler it is sufficient to resolve the deferred.
"""
__slots__ = ["deferred"]
def __init__(self, deferred: "defer.Deferred"):
self.deferred = deferred
class _NotifierUserStream:
"""This represents a user connected to the event stream.
It tracks the most recent stream token for that user.
@@ -113,59 +106,49 @@ class _NotifierUserStream:
def __init__(
self,
reactor: ISynapseReactor,
user_id: str,
rooms: StrCollection,
current_token: StreamToken,
time_now_ms: int,
):
self.reactor = reactor
self.user_id = user_id
self.rooms = set(rooms)
self.current_token = current_token
# The last token for which we should wake up any streams that have a
# token that comes before it. This gets updated every time we get poked.
# We start it at the current token since if we get any streams
# that have a token from before we have no idea whether they should be
# woken up or not, so lets just wake them up.
self.last_notified_token = current_token
self.current_token = current_token
self.last_notified_ms = time_now_ms
self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred(
defer.Deferred()
)
# Set of listeners that we need to wake up when there has been a change.
self.listeners: Set[Deferred[StreamToken]] = set()
def notify(
def update_and_fetch_deferreds(
self,
stream_key: StreamKeyType,
stream_id: Union[int, RoomStreamToken, MultiWriterStreamToken],
current_token: StreamToken,
time_now_ms: int,
) -> None:
"""Notify any listeners for this user of a new event from an
event source.
) -> Collection["Deferred[StreamToken]"]:
"""Update the stream for this user because of a new event from an
event source, and return the set of deferreds to wake up.
Args:
stream_key: The stream the event came from.
stream_id: The new id for the stream the event came from.
current_token: The new current token.
time_now_ms: The current time in milliseconds.
Returns:
The set of deferreds that need to be called.
"""
self.current_token = self.current_token.copy_and_advance(stream_key, stream_id)
self.last_notified_token = self.current_token
self.current_token = current_token
self.last_notified_ms = time_now_ms
notify_deferred = self.notify_deferred
log_kv(
{
"notify": self.user_id,
"stream": stream_key,
"stream_id": stream_id,
"listeners": self.count_listeners(),
}
)
listeners = self.listeners
self.listeners = set()
users_woken_by_stream_counter.labels(stream_key).inc()
with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())
notify_deferred.callback(self.current_token)
return listeners
def remove(self, notifier: "Notifier") -> None:
"""Remove this listener from all the indexes in the Notifier
@@ -179,9 +162,9 @@ class _NotifierUserStream:
notifier.user_to_user_stream.pop(self.user_id)
def count_listeners(self) -> int:
return len(self.notify_deferred.observers())
return len(self.listeners)
def new_listener(self, token: StreamToken) -> _NotificationListener:
def new_listener(self, token: StreamToken) -> "Deferred[StreamToken]":
"""Returns a deferred that is resolved when there is a new token
greater than the given token.
@@ -191,10 +174,17 @@ class _NotifierUserStream:
"""
# Immediately wake up stream if something has already since happened
# since their last token.
if self.last_notified_token != token:
return _NotificationListener(defer.succeed(self.current_token))
else:
return _NotificationListener(self.notify_deferred.observe())
if token != self.current_token:
return defer.succeed(self.current_token)
# Create a new deferred and add it to the set of listeners. We add a
# cancel handler to remove it from the set again, to handle timeouts.
deferred: "Deferred[StreamToken]" = Deferred(
canceller=lambda d: self.listeners.discard(d)
)
self.listeners.add(deferred)
return deferred
@attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -247,6 +237,7 @@ class Notifier:
# List of callbacks to be notified when a lock is released
self._lock_released_callback: List[Callable[[str, str, str], None]] = []
self.reactor = hs.get_reactor()
self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
self._pusher_pool = hs.get_pusherpool()
@@ -342,14 +333,25 @@ class Notifier:
# Wake up all related user stream notifiers
user_streams = self.room_to_user_streams.get(room_id, set())
time_now_ms = self.clock.time_msec()
current_token = self.event_sources.get_current_token()
listeners: List["Deferred[StreamToken]"] = []
for user_stream in user_streams:
try:
user_stream.notify(
StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
listeners.extend(
user_stream.update_and_fetch_deferreds(current_token, time_now_ms)
)
except Exception:
logger.exception("Failed to notify listener")
with PreserveLoggingContext():
for listener in listeners:
listener.callback(current_token)
users_woken_by_stream_counter.labels(StreamKeyType.UN_PARTIAL_STATED_ROOMS).inc(
len(user_streams)
)
# Poke the replication so that other workers also see the write to
# the un-partial-stated rooms stream.
self.notify_replication()
@@ -519,12 +521,16 @@ class Notifier:
rooms = rooms or []
with Measure(self.clock, "on_new_event"):
user_streams = set()
user_streams: Set[_NotifierUserStream] = set()
log_kv(
{
"waking_up_explicit_users": len(users),
"waking_up_explicit_rooms": len(rooms),
"users": shortstr(users),
"rooms": shortstr(rooms),
"stream": stream_key,
"stream_id": new_token,
}
)
@@ -544,12 +550,27 @@ class Notifier:
)
time_now_ms = self.clock.time_msec()
current_token = self.event_sources.get_current_token()
listeners: List["Deferred[StreamToken]"] = []
for user_stream in user_streams:
try:
user_stream.notify(stream_key, new_token, time_now_ms)
listeners.extend(
user_stream.update_and_fetch_deferreds(
current_token, time_now_ms
)
)
except Exception:
logger.exception("Failed to notify listener")
# We resolve all these deferreds in one go so that we only need to
# call `PreserveLoggingContext` once, as it has a bunch of overhead
# (to calculate performance stats)
with PreserveLoggingContext():
for listener in listeners:
listener.callback(current_token)
users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
self.notify_replication()
# Notify appservices.
@@ -586,6 +607,7 @@ class Notifier:
if room_ids is None:
room_ids = await self.store.get_rooms_for_user(user_id)
user_stream = _NotifierUserStream(
reactor=self.reactor,
user_id=user_id,
rooms=room_ids,
current_token=current_token,
@@ -608,8 +630,8 @@ class Notifier:
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
listener.deferred = timeout_deferred(
listener.deferred,
listener = timeout_deferred(
listener,
(end_time - now) / 1000.0,
self.hs.get_reactor(),
)
@@ -622,7 +644,7 @@ class Notifier:
)
with PreserveLoggingContext():
await listener.deferred
await listener
log_kv(
{

View File

@@ -48,7 +48,7 @@ class ReplicationRemovePusherRestServlet(ReplicationEndpoint):
"""
NAME = "add_user_account_data"
NAME = "remove_pusher"
PATH_ARGS = ("user_id",)
CACHE = False

View File

@@ -172,7 +172,7 @@ class VersionsRestServlet(RestServlet):
)
),
# MSC4140: Delayed events
"org.matrix.msc4140": True,
"org.matrix.msc4140": bool(self.config.server.max_event_delay_ms),
# MSC4151: Report room API (Client-Server API)
"org.matrix.msc4151": self.config.experimental.msc4151_enabled,
# Simplified sliding sync

View File

@@ -1,3 +1,16 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2024 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
CREATE TABLE delayed_events (
delay_id TEXT NOT NULL,
user_localpart TEXT NOT NULL,

View File

@@ -142,9 +142,9 @@ class StreamChangeCache:
"""
assert isinstance(stream_pos, int)
# _cache is not valid at or before the earliest known stream position, so
# _cache is not valid before the earliest known stream position, so
# return that the entity has changed.
if stream_pos <= self._earliest_known_stream_pos:
if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
return True
@@ -186,7 +186,7 @@ class StreamChangeCache:
This will be all entities if the given stream position is at or earlier
than the earliest known stream position.
"""
if not self._cache or stream_pos <= self._earliest_known_stream_pos:
if not self._cache or stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
return set(entities)
@@ -238,9 +238,9 @@ class StreamChangeCache:
"""
assert isinstance(stream_pos, int)
# _cache is not valid at or before the earliest known stream position, so
# _cache is not valid before the earliest known stream position, so
# return that an entity has changed.
if stream_pos <= self._earliest_known_stream_pos:
if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
return True
@@ -270,9 +270,9 @@ class StreamChangeCache:
"""
assert isinstance(stream_pos, int)
# _cache is not valid at or before the earliest known stream position, so
# _cache is not valid before the earliest known stream position, so
# return None to mark that it is unknown if an entity has changed.
if stream_pos <= self._earliest_known_stream_pos:
if stream_pos < self._earliest_known_stream_pos:
return AllEntitiesChangedResult(None)
changed_entities: List[EntityType] = []

View File

@@ -19,13 +19,23 @@
# [This file includes modifications made by New Vector Limited]
#
#
import tempfile
from typing import Callable
import yaml
from parameterized import parameterized
from synapse.config import ConfigError
from synapse.config._base import RootConfig
from synapse.config.homeserver import HomeServerConfig
from tests.config.utils import ConfigFileTestCase
try:
import hiredis
except ImportError:
hiredis = None # type: ignore
class ConfigLoadingFileTestCase(ConfigFileTestCase):
def test_load_fails_if_server_name_missing(self) -> None:
@@ -116,3 +126,49 @@ class ConfigLoadingFileTestCase(ConfigFileTestCase):
self.add_lines_to_config(["trust_identity_server_for_password_resets: true"])
with self.assertRaises(ConfigError):
HomeServerConfig.load_config("", ["-c", self.config_file])
@parameterized.expand(
[
"turn_shared_secret_path: /does/not/exist",
"registration_shared_secret_path: /does/not/exist",
*["redis:\n enabled: true\n password_path: /does/not/exist"]
* (hiredis is not None),
]
)
def test_secret_files_missing(self, config_str: str) -> None:
self.generate_config()
self.add_lines_to_config(["", config_str])
with self.assertRaises(ConfigError):
HomeServerConfig.load_config("", ["-c", self.config_file])
@parameterized.expand(
[
(
"turn_shared_secret_path: {}",
lambda c: c.voip.turn_shared_secret,
),
(
"registration_shared_secret_path: {}",
lambda c: c.registration.registration_shared_secret,
),
*[
(
"redis:\n enabled: true\n password_path: {}",
lambda c: c.redis.redis_password,
)
]
* (hiredis is not None),
]
)
def test_secret_files_existing(
self, config_line: str, get_secret: Callable[[RootConfig], str]
) -> None:
self.generate_config_and_remove_lines_containing("registration_shared_secret")
with tempfile.NamedTemporaryFile(buffering=0) as secret_file:
secret_file.write(b"53C237")
self.add_lines_to_config(["", config_line.format(secret_file.name)])
config = HomeServerConfig.load_config("", ["-c", self.config_file])
self.assertEqual(get_secret(config), "53C237")

View File

@@ -8,11 +8,12 @@ from parameterized import parameterized
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.errors import Codes
from synapse.rest.client import delayed_events, room
from synapse.rest.client import delayed_events, room, versions
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.util import Clock
from tests import unittest
from tests.unittest import HomeserverTestCase
PATH_PREFIX = "/_matrix/client/unstable/org.matrix.msc4140/delayed_events"
@@ -21,6 +22,21 @@ _HS_NAME = "red"
_EVENT_TYPE = "com.example.test"
class DelayedEventsUnstableSupportTestCase(HomeserverTestCase):
servlets = [versions.register_servlets]
def test_false_by_default(self) -> None:
channel = self.make_request("GET", "/_matrix/client/versions")
self.assertEqual(channel.code, 200, channel.result)
self.assertFalse(channel.json_body["unstable_features"]["org.matrix.msc4140"])
@unittest.override_config({"max_event_delay_duration": "24h"})
def test_true_if_enabled(self) -> None:
channel = self.make_request("GET", "/_matrix/client/versions")
self.assertEqual(channel.code, 200, channel.result)
self.assertTrue(channel.json_body["unstable_features"]["org.matrix.msc4140"])
class DelayedEventsTestCase(HomeserverTestCase):
"""Tests getting and managing delayed events."""

View File

@@ -282,22 +282,33 @@ class SyncTypingTests(unittest.HomeserverTestCase):
self.assertEqual(200, channel.code)
next_batch = channel.json_body["next_batch"]
# This should time out! But it does not, because our stream token is
# ahead, and therefore it's saying the typing (that we've actually
# already seen) is new, since it's got a token above our new, now-reset
# stream token.
channel = self.make_request("GET", sync_url % (access_token, next_batch))
self.assertEqual(200, channel.code)
next_batch = channel.json_body["next_batch"]
# Clear the typing information, so that it doesn't think everything is
# in the future.
# in the future. This happens automatically when the typing stream
# resets.
typing._reset()
# Now it SHOULD fail as it never completes!
# Nothing new, so we time out.
with self.assertRaises(TimedOutException):
self.make_request("GET", sync_url % (access_token, next_batch))
# Sync and start typing again.
sync_channel = self.make_request(
"GET", sync_url % (access_token, next_batch), await_result=False
)
self.assertFalse(sync_channel.is_finished())
channel = self.make_request(
"PUT",
typing_url % (room, other_user_id, other_access_token),
b'{"typing": true, "timeout": 30000}',
)
self.assertEqual(200, channel.code)
# Sync should now return.
sync_channel.await_result()
self.assertEqual(200, sync_channel.code)
next_batch = sync_channel.json_body["next_batch"]
class SyncKnockTestCase(KnockingStrippedStateEventHelperMixin):
servlets = [

View File

@@ -53,8 +53,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
# return True, whether it's a known entity or not.
self.assertTrue(cache.has_entity_changed("user@foo.com", 0))
self.assertTrue(cache.has_entity_changed("not@here.website", 0))
self.assertTrue(cache.has_entity_changed("user@foo.com", 3))
self.assertTrue(cache.has_entity_changed("not@here.website", 3))
self.assertTrue(cache.has_entity_changed("user@foo.com", 2))
self.assertTrue(cache.has_entity_changed("not@here.website", 2))
def test_entity_has_changed_pops_off_start(self) -> None:
"""
@@ -76,9 +76,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
self.assertTrue("user@foo.com" not in cache._entity_to_key)
self.assertEqual(
cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"]
cache.get_all_entities_changed(2).entities,
["bar@baz.net", "user@elsewhere.org"],
)
self.assertFalse(cache.get_all_entities_changed(2).hit)
self.assertFalse(cache.get_all_entities_changed(1).hit)
self.assertTrue(cache.get_all_entities_changed(2).hit)
# If we update an existing entity, it keeps the two existing entities
cache.entity_has_changed("bar@baz.net", 5)
@@ -89,7 +91,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
cache.get_all_entities_changed(3).entities,
["user@elsewhere.org", "bar@baz.net"],
)
self.assertFalse(cache.get_all_entities_changed(2).hit)
self.assertFalse(cache.get_all_entities_changed(1).hit)
self.assertTrue(cache.get_all_entities_changed(2).hit)
def test_get_all_entities_changed(self) -> None:
"""
@@ -114,7 +117,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
self.assertEqual(
cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"]
)
self.assertFalse(cache.get_all_entities_changed(1).hit)
self.assertFalse(cache.get_all_entities_changed(0).hit)
self.assertTrue(cache.get_all_entities_changed(1).hit)
# ... later, things gest more updates
cache.entity_has_changed("user@foo.com", 5)
@@ -149,7 +153,7 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
# With no entities, it returns True for the past, present, and False for
# the future.
self.assertTrue(cache.has_any_entity_changed(0))
self.assertTrue(cache.has_any_entity_changed(1))
self.assertFalse(cache.has_any_entity_changed(1))
self.assertFalse(cache.has_any_entity_changed(2))
# We add an entity