mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-07 01:20:16 +00:00
Compare commits
43 Commits
erikj/test
...
hs/use-mal
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7ea2cd1ce9 | ||
|
|
451f25172a | ||
|
|
91143bb24e | ||
|
|
47806b0869 | ||
|
|
a683028d81 | ||
|
|
7562d887e1 | ||
|
|
affaffb0ab | ||
|
|
63fb220e5f | ||
|
|
27c375f812 | ||
|
|
f4833e0c06 | ||
|
|
28c6841102 | ||
|
|
652a6b094d | ||
|
|
d1473f7362 | ||
|
|
dc6366a9bd | ||
|
|
86fb71431c | ||
|
|
b378d98c8f | ||
|
|
7967b36efe | ||
|
|
03318a766c | ||
|
|
2b2985b5cf | ||
|
|
51065c44bb | ||
|
|
6c84778549 | ||
|
|
765473567c | ||
|
|
b65ecaff9b | ||
|
|
4df26abf28 | ||
|
|
25f43faa70 | ||
|
|
8771b1337d | ||
|
|
eba431c539 | ||
|
|
a8803e2b6e | ||
|
|
ac88aca7f7 | ||
|
|
24f07a83e6 | ||
|
|
70f0ffd2fc | ||
|
|
d783880083 | ||
|
|
37623e3382 | ||
|
|
e2a443550e | ||
|
|
ef889c98a6 | ||
|
|
1fb9a2d0bf | ||
|
|
de8f0a03a3 | ||
|
|
d0aee697ac | ||
|
|
d5305000f1 | ||
|
|
e9eb3549d3 | ||
|
|
a61b13c0a1 | ||
|
|
0644ac0989 | ||
|
|
56c4b47df3 |
@@ -1,36 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from synapse.storage.engines import create_engine
|
||||
|
||||
logger = logging.getLogger("create_postgres_db")
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Create a PostgresEngine.
|
||||
db_engine = create_engine({"name": "psycopg2", "args": {}})
|
||||
|
||||
# Connect to postgres to create the base database.
|
||||
# We use "postgres" as a database because it's bound to exist and the "synapse" one
|
||||
# doesn't exist yet.
|
||||
db_conn = db_engine.module.connect(
|
||||
user="postgres", host="postgres", password="postgres", dbname="postgres"
|
||||
)
|
||||
db_conn.autocommit = True
|
||||
cur = db_conn.cursor()
|
||||
cur.execute("CREATE DATABASE synapse;")
|
||||
cur.close()
|
||||
db_conn.close()
|
||||
31
.buildkite/scripts/postgres_exec.py
Executable file
31
.buildkite/scripts/postgres_exec.py
Executable file
@@ -0,0 +1,31 @@
|
||||
#!/usr/bin/env python
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import sys
|
||||
|
||||
import psycopg2
|
||||
|
||||
# a very simple replacment for `psql`, to make up for the lack of the postgres client
|
||||
# libraries in the synapse docker image.
|
||||
|
||||
# We use "postgres" as a database because it's bound to exist and the "synapse" one
|
||||
# doesn't exist yet.
|
||||
db_conn = psycopg2.connect(
|
||||
user="postgres", host="postgres", password="postgres", dbname="postgres"
|
||||
)
|
||||
db_conn.autocommit = True
|
||||
cur = db_conn.cursor()
|
||||
for c in sys.argv[1:]:
|
||||
cur.execute(c)
|
||||
@@ -1,10 +1,10 @@
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# Test script for 'synapse_port_db', which creates a virtualenv, installs Synapse along
|
||||
# with additional dependencies needed for the test (such as coverage or the PostgreSQL
|
||||
# driver), update the schema of the test SQLite database and run background updates on it,
|
||||
# create an empty test database in PostgreSQL, then run the 'synapse_port_db' script to
|
||||
# test porting the SQLite database to the PostgreSQL database (with coverage).
|
||||
# Test script for 'synapse_port_db'.
|
||||
# - sets up synapse and deps
|
||||
# - runs the port script on a prepopulated test sqlite db
|
||||
# - also runs it against an new sqlite db
|
||||
|
||||
|
||||
set -xe
|
||||
cd `dirname $0`/../..
|
||||
@@ -22,15 +22,32 @@ echo "--- Generate the signing key"
|
||||
# Generate the server's signing key.
|
||||
python -m synapse.app.homeserver --generate-keys -c .buildkite/sqlite-config.yaml
|
||||
|
||||
echo "--- Prepare the databases"
|
||||
echo "--- Prepare test database"
|
||||
|
||||
# Make sure the SQLite3 database is using the latest schema and has no pending background update.
|
||||
scripts-dev/update_database --database-config .buildkite/sqlite-config.yaml
|
||||
|
||||
# Create the PostgreSQL database.
|
||||
./.buildkite/scripts/create_postgres_db.py
|
||||
./.buildkite/scripts/postgres_exec.py "CREATE DATABASE synapse"
|
||||
|
||||
echo "+++ Run synapse_port_db"
|
||||
|
||||
# Run the script
|
||||
echo "+++ Run synapse_port_db against test database"
|
||||
coverage run scripts/synapse_port_db --sqlite-database .buildkite/test_db.db --postgres-config .buildkite/postgres-config.yaml
|
||||
|
||||
#####
|
||||
|
||||
# Now do the same again, on an empty database.
|
||||
|
||||
echo "--- Prepare empty SQLite database"
|
||||
|
||||
# we do this by deleting the sqlite db, and then doing the same again.
|
||||
rm .buildkite/test_db.db
|
||||
|
||||
scripts-dev/update_database --database-config .buildkite/sqlite-config.yaml
|
||||
|
||||
# re-create the PostgreSQL database.
|
||||
./.buildkite/scripts/postgres_exec.py \
|
||||
"DROP DATABASE synapse" \
|
||||
"CREATE DATABASE synapse"
|
||||
|
||||
echo "+++ Run synapse_port_db against empty database"
|
||||
coverage run scripts/synapse_port_db --sqlite-database .buildkite/test_db.db --postgres-config .buildkite/postgres-config.yaml
|
||||
|
||||
2
.github/workflows/tests.yml
vendored
2
.github/workflows/tests.yml
vendored
@@ -273,7 +273,7 @@ jobs:
|
||||
python-version: ${{ matrix.python-version }}
|
||||
- name: Patch Buildkite-specific test scripts
|
||||
run: |
|
||||
sed -i -e 's/host="postgres"/host="localhost"/' .buildkite/scripts/create_postgres_db.py
|
||||
sed -i -e 's/host="postgres"/host="localhost"/' .buildkite/scripts/postgres_exec.py
|
||||
sed -i -e 's/host: postgres/host: localhost/' .buildkite/postgres-config.yaml
|
||||
sed -i -e 's|/src/||' .buildkite/{sqlite,postgres}-config.yaml
|
||||
sed -i -e 's/\$TOP/\$GITHUB_WORKSPACE/' .coveragerc
|
||||
|
||||
99
CHANGES.md
99
CHANGES.md
@@ -1,3 +1,102 @@
|
||||
Synapse 1.34.0rc1 (2021-05-12)
|
||||
==============================
|
||||
|
||||
This release deprecates the `room_invite_state_types` configuration setting. See the [upgrade notes](https://github.com/matrix-org/synapse/blob/release-v1.34.0/UPGRADE.rst#upgrading-to-v1340) for instructions on updating your configuration file to use the new `room_prejoin_state` setting.
|
||||
|
||||
This release also deprecates the `POST /_synapse/admin/v1/rooms/<room_id>/delete` admin API route. Server administrators are encouraged to update their scripts to use the new `DELETE /_synapse/admin/v1/rooms/<room_id>` route instead.
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Add experimental option to track memory usage of the caches. ([\#9881](https://github.com/matrix-org/synapse/issues/9881))
|
||||
- Add support for `DELETE /_synapse/admin/v1/rooms/<room_id>`. ([\#9889](https://github.com/matrix-org/synapse/issues/9889))
|
||||
- Add limits to how often Synapse will GC, ensuring that large servers do not end up GC thrashing if `gc_thresholds` has not been correctly set. ([\#9902](https://github.com/matrix-org/synapse/issues/9902))
|
||||
- Improve performance of sending events for worker-based deployments using Redis. ([\#9905](https://github.com/matrix-org/synapse/issues/9905), [\#9950](https://github.com/matrix-org/synapse/issues/9950), [\#9951](https://github.com/matrix-org/synapse/issues/9951))
|
||||
- Improve performance after joining a large room when presence is enabled. ([\#9910](https://github.com/matrix-org/synapse/issues/9910), [\#9916](https://github.com/matrix-org/synapse/issues/9916))
|
||||
- Support stable identifiers for [MSC1772](https://github.com/matrix-org/matrix-doc/pull/1772) Spaces. `m.space.child` events will now be taken into account when populating the experimental spaces summary response. Please see [the upgrade notes](https://github.com/matrix-org/synapse/blob/release-v1.34.0/UPGRADE.rst#upgrading-to-v1340) if you have customised `room_invite_state_types` in your configuration. ([\#9915](https://github.com/matrix-org/synapse/issues/9915), [\#9966](https://github.com/matrix-org/synapse/issues/9966))
|
||||
- Improve performance of backfilling in large rooms. ([\#9935](https://github.com/matrix-org/synapse/issues/9935))
|
||||
- Add a config option to allow you to prevent device display names from being shared over federation. Contributed by @aaronraimist. ([\#9945](https://github.com/matrix-org/synapse/issues/9945))
|
||||
- Update support for [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946): Spaces Summary. ([\#9947](https://github.com/matrix-org/synapse/issues/9947), [\#9954](https://github.com/matrix-org/synapse/issues/9954))
|
||||
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix a bug introduced in v1.32.0 where the associated connection was improperly logged for SQL logging statements. ([\#9895](https://github.com/matrix-org/synapse/issues/9895))
|
||||
- Correct the type hint for the `user_may_create_room_alias` method of spam checkers. It is provided a `RoomAlias`, not a `str`. ([\#9896](https://github.com/matrix-org/synapse/issues/9896))
|
||||
- Fix bug where user directory could get out of sync if room visibility and membership changed in quick succession. ([\#9910](https://github.com/matrix-org/synapse/issues/9910))
|
||||
- Include the `origin_server_ts` property in the experimental [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946) support to allow clients to properly sort rooms. ([\#9928](https://github.com/matrix-org/synapse/issues/9928))
|
||||
- Fix bugs introduced in v1.23.0 which made the PostgreSQL port script fail when run with a newly-created SQLite database. ([\#9930](https://github.com/matrix-org/synapse/issues/9930))
|
||||
- Fix a bug introduced in Synapse 1.29.0 which caused `m.room_key_request` to-device messages sent from one user to another to be dropped. ([\#9961](https://github.com/matrix-org/synapse/issues/9961), [\#9965](https://github.com/matrix-org/synapse/issues/9965))
|
||||
- Fix a bug introduced in v1.27.0 preventing users and appservices exempt from ratelimiting from creating rooms with many invitees. ([\#9968](https://github.com/matrix-org/synapse/issues/9968))
|
||||
|
||||
|
||||
Updates to the Docker image
|
||||
---------------------------
|
||||
|
||||
- Add `startup_delay` to docker healthcheck to reduce waiting time for coming online and update the documentation with extra options. Contributed by @Maquis196. ([\#9913](https://github.com/matrix-org/synapse/issues/9913))
|
||||
|
||||
|
||||
Improved Documentation
|
||||
----------------------
|
||||
|
||||
- Add `port` argument to the Postgres database sample config section. ([\#9911](https://github.com/matrix-org/synapse/issues/9911))
|
||||
|
||||
|
||||
Deprecations and Removals
|
||||
-------------------------
|
||||
|
||||
- Mark as deprecated `POST /_synapse/admin/v1/rooms/<room_id>/delete`. ([\#9889](https://github.com/matrix-org/synapse/issues/9889))
|
||||
|
||||
|
||||
Internal Changes
|
||||
----------------
|
||||
|
||||
- Reduce the length of Synapse's access tokens. ([\#5588](https://github.com/matrix-org/synapse/issues/5588))
|
||||
- Export jemalloc stats to Prometheus if it is being used. ([\#9882](https://github.com/matrix-org/synapse/issues/9882))
|
||||
- Add type hints to presence handler. ([\#9885](https://github.com/matrix-org/synapse/issues/9885))
|
||||
- Reduce memory usage of the LRU caches. ([\#9886](https://github.com/matrix-org/synapse/issues/9886))
|
||||
- Add type hints to the `synapse.handlers` module. ([\#9896](https://github.com/matrix-org/synapse/issues/9896))
|
||||
- Time response time for external cache requests. ([\#9904](https://github.com/matrix-org/synapse/issues/9904))
|
||||
- Minor fixes to the `make_full_schema.sh` script. ([\#9931](https://github.com/matrix-org/synapse/issues/9931))
|
||||
- Move database schema files into a common directory. ([\#9932](https://github.com/matrix-org/synapse/issues/9932))
|
||||
- Add debug logging for lost/delayed to-device messages. ([\#9959](https://github.com/matrix-org/synapse/issues/9959))
|
||||
|
||||
|
||||
Synapse 1.33.2 (2021-05-11)
|
||||
===========================
|
||||
|
||||
Due to the security issue highlighted below, server administrators are encouraged to update Synapse. We are not aware of these vulnerabilities being exploited in the wild.
|
||||
|
||||
Security advisory
|
||||
-----------------
|
||||
|
||||
This release fixes a denial of service attack ([CVE-2021-29471](https://github.com/matrix-org/synapse/security/advisories/GHSA-x345-32rc-8h85)) against Synapse's push rules implementation. Server admins are encouraged to upgrade.
|
||||
|
||||
Internal Changes
|
||||
----------------
|
||||
|
||||
- Unpin attrs dependency. ([\#9946](https://github.com/matrix-org/synapse/issues/9946))
|
||||
|
||||
|
||||
Synapse 1.33.1 (2021-05-06)
|
||||
===========================
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix bug where `/sync` would break if using the latest version of `attrs` dependency, by pinning to a previous version. ([\#9937](https://github.com/matrix-org/synapse/issues/9937))
|
||||
|
||||
|
||||
Synapse 1.33.0 (2021-05-05)
|
||||
===========================
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Build Debian packages for Ubuntu 21.04 (Hirsute Hippo). ([\#9909](https://github.com/matrix-org/synapse/issues/9909))
|
||||
|
||||
|
||||
Synapse 1.33.0rc2 (2021-04-29)
|
||||
==============================
|
||||
|
||||
|
||||
29
UPGRADE.rst
29
UPGRADE.rst
@@ -85,6 +85,35 @@ for example:
|
||||
wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
||||
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
||||
|
||||
Upgrading to v1.34.0
|
||||
====================
|
||||
|
||||
`room_invite_state_types` configuration setting
|
||||
-----------------------------------------------
|
||||
|
||||
The ``room_invite_state_types`` configuration setting has been deprecated and
|
||||
replaced with ``room_prejoin_state``. See the `sample configuration file <https://github.com/matrix-org/synapse/blob/v1.34.0/docs/sample_config.yaml#L1515>`_.
|
||||
|
||||
If you have set ``room_invite_state_types`` to the default value you should simply
|
||||
remove it from your configuration file. The default value used to be:
|
||||
|
||||
.. code:: yaml
|
||||
|
||||
room_invite_state_types:
|
||||
- "m.room.join_rules"
|
||||
- "m.room.canonical_alias"
|
||||
- "m.room.avatar"
|
||||
- "m.room.encryption"
|
||||
- "m.room.name"
|
||||
|
||||
If you have customised this value by adding addition state types, you should
|
||||
remove ``room_invite_state_types`` and configure ``additional_event_types`` with
|
||||
your customisations.
|
||||
|
||||
If you have customised this value by removing state types, you should rename
|
||||
``room_invite_state_types`` to ``additional_event_types``, and set
|
||||
``disable_default_event_types`` to ``true``.
|
||||
|
||||
Upgrading to v1.33.0
|
||||
====================
|
||||
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
Add experimental option to track memory usage of the caches.
|
||||
@@ -1 +0,0 @@
|
||||
Export jemalloc stats to Prometheus if it is being used.
|
||||
@@ -1 +0,0 @@
|
||||
Add type hints to presence handler.
|
||||
@@ -1 +0,0 @@
|
||||
Reduce memory usage of the LRU caches.
|
||||
@@ -1 +0,0 @@
|
||||
Add support for `DELETE /_synapse/admin/v1/rooms/<room_id>`.
|
||||
@@ -1 +0,0 @@
|
||||
Mark as deprecated `POST /_synapse/admin/v1/rooms/<room_id>/delete`.
|
||||
@@ -1 +0,0 @@
|
||||
Fix a bug introduced in v1.32.0 where the associated connection was improperly logged for SQL logging statements.
|
||||
@@ -1 +0,0 @@
|
||||
Correct the type hint for the `user_may_create_room_alias` method of spam checkers. It is provided a `RoomAlias`, not a `str`.
|
||||
@@ -1 +0,0 @@
|
||||
Add type hints to the `synapse.handlers` module.
|
||||
@@ -1 +0,0 @@
|
||||
Add limits to how often Synapse will GC, ensuring that large servers do not end up GC thrashing if `gc_thresholds` has not been correctly set.
|
||||
@@ -1 +0,0 @@
|
||||
Time response time for external cache requests.
|
||||
@@ -1 +0,0 @@
|
||||
Fix bug where user directory could get out of sync if room visibility and membership changed in quick succession.
|
||||
@@ -1 +0,0 @@
|
||||
Improve performance after joining a large room when presence is enabled.
|
||||
@@ -1 +0,0 @@
|
||||
Add `port` argument to the Postgres database sample config section.
|
||||
@@ -1 +0,0 @@
|
||||
Improve performance of handling presence when joining large rooms.
|
||||
1
changelog.d/xxxx.misc
Normal file
1
changelog.d/xxxx.misc
Normal file
@@ -0,0 +1 @@
|
||||
Ensure python uses `malloc` when running Synapse in Docker.
|
||||
18
debian/changelog
vendored
18
debian/changelog
vendored
@@ -1,3 +1,21 @@
|
||||
matrix-synapse-py3 (1.33.2) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.33.2.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 11 May 2021 11:17:59 +0100
|
||||
|
||||
matrix-synapse-py3 (1.33.1) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.33.1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Thu, 06 May 2021 14:06:33 +0100
|
||||
|
||||
matrix-synapse-py3 (1.33.0) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.33.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Wed, 05 May 2021 14:15:27 +0100
|
||||
|
||||
matrix-synapse-py3 (1.32.2) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.32.2.
|
||||
|
||||
@@ -88,5 +88,5 @@ EXPOSE 8008/tcp 8009/tcp 8448/tcp
|
||||
|
||||
ENTRYPOINT ["/start.py"]
|
||||
|
||||
HEALTHCHECK --interval=1m --timeout=5s \
|
||||
HEALTHCHECK --start-period=5s --interval=15s --timeout=5s \
|
||||
CMD curl -fSs http://localhost:8008/health || exit 1
|
||||
|
||||
@@ -191,6 +191,16 @@ whilst running the above `docker run` commands.
|
||||
```
|
||||
--no-healthcheck
|
||||
```
|
||||
|
||||
## Disabling the healthcheck in docker-compose file
|
||||
|
||||
If you wish to disable the healthcheck via docker-compose, append the following to your service configuration.
|
||||
|
||||
```
|
||||
healthcheck:
|
||||
disable: true
|
||||
```
|
||||
|
||||
## Setting custom healthcheck on docker run
|
||||
|
||||
If you wish to point the healthcheck at a different port with docker command, add the following
|
||||
@@ -202,14 +212,15 @@ If you wish to point the healthcheck at a different port with docker command, ad
|
||||
## Setting the healthcheck in docker-compose file
|
||||
|
||||
You can add the following to set a custom healthcheck in a docker compose file.
|
||||
You will need version >2.1 for this to work.
|
||||
You will need docker-compose version >2.1 for this to work.
|
||||
|
||||
```
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-fSs", "http://localhost:8008/health"]
|
||||
interval: 1m
|
||||
timeout: 10s
|
||||
interval: 15s
|
||||
timeout: 5s
|
||||
retries: 3
|
||||
start_period: 5s
|
||||
```
|
||||
|
||||
## Using jemalloc
|
||||
|
||||
@@ -218,6 +218,7 @@ def main(args, environ):
|
||||
|
||||
if os.path.isfile(jemallocpath):
|
||||
environ["LD_PRELOAD"] = jemallocpath
|
||||
environ["PYTHONMALLOC"] = "malloc"
|
||||
else:
|
||||
log("Could not find %s, will not use" % (jemallocpath,))
|
||||
|
||||
|
||||
@@ -741,6 +741,12 @@ acme:
|
||||
#
|
||||
#allow_profile_lookup_over_federation: false
|
||||
|
||||
# Uncomment to disable device display name lookup over federation. By default, the
|
||||
# Federation API allows other homeservers to obtain device display names of any user
|
||||
# on this homeserver. Defaults to 'true'.
|
||||
#
|
||||
#allow_device_name_lookup_over_federation: false
|
||||
|
||||
|
||||
## Caching ##
|
||||
|
||||
@@ -1515,6 +1521,7 @@ room_prejoin_state:
|
||||
# - m.room.avatar
|
||||
# - m.room.encryption
|
||||
# - m.room.name
|
||||
# - m.room.create
|
||||
#
|
||||
# Uncomment the following to disable these defaults (so that only the event
|
||||
# types listed in 'additional_event_types' are shared). Defaults to 'false'.
|
||||
|
||||
@@ -21,9 +21,10 @@ DISTS = (
|
||||
"debian:buster",
|
||||
"debian:bullseye",
|
||||
"debian:sid",
|
||||
"ubuntu:bionic",
|
||||
"ubuntu:focal",
|
||||
"ubuntu:groovy",
|
||||
"ubuntu:bionic", # 18.04 LTS (our EOL forced by Py36 on 2021-12-23)
|
||||
"ubuntu:focal", # 20.04 LTS (our EOL forced by Py38 on 2024-10-14)
|
||||
"ubuntu:groovy", # 20.10 (EOL 2021-07-07)
|
||||
"ubuntu:hirsute", # 21.04 (EOL 2022-01-05)
|
||||
)
|
||||
|
||||
DESC = '''\
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/usr/bin/env python2
|
||||
#!/usr/bin/env python
|
||||
|
||||
import sys
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
# It does so by having Synapse generate an up-to-date SQLite DB, then running
|
||||
# synapse_port_db to convert it to Postgres. It then dumps the contents of both.
|
||||
|
||||
POSTGRES_HOST="localhost"
|
||||
export PGHOST="localhost"
|
||||
POSTGRES_DB_NAME="synapse_full_schema.$$"
|
||||
|
||||
SQLITE_FULL_SCHEMA_OUTPUT_FILE="full.sql.sqlite"
|
||||
@@ -32,7 +32,7 @@ usage() {
|
||||
while getopts "p:co:h" opt; do
|
||||
case $opt in
|
||||
p)
|
||||
POSTGRES_USERNAME=$OPTARG
|
||||
export PGUSER=$OPTARG
|
||||
;;
|
||||
c)
|
||||
# Print all commands that are being executed
|
||||
@@ -69,7 +69,7 @@ if [ ${#unsatisfied_requirements} -ne 0 ]; then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ -z "$POSTGRES_USERNAME" ]; then
|
||||
if [ -z "$PGUSER" ]; then
|
||||
echo "No postgres username supplied"
|
||||
usage
|
||||
exit 1
|
||||
@@ -84,8 +84,9 @@ fi
|
||||
# Create the output directory if it doesn't exist
|
||||
mkdir -p "$OUTPUT_DIR"
|
||||
|
||||
read -rsp "Postgres password for '$POSTGRES_USERNAME': " POSTGRES_PASSWORD
|
||||
read -rsp "Postgres password for '$PGUSER': " PGPASSWORD
|
||||
echo ""
|
||||
export PGPASSWORD
|
||||
|
||||
# Exit immediately if a command fails
|
||||
set -e
|
||||
@@ -131,9 +132,9 @@ report_stats: false
|
||||
database:
|
||||
name: "psycopg2"
|
||||
args:
|
||||
user: "$POSTGRES_USERNAME"
|
||||
host: "$POSTGRES_HOST"
|
||||
password: "$POSTGRES_PASSWORD"
|
||||
user: "$PGUSER"
|
||||
host: "$PGHOST"
|
||||
password: "$PGPASSWORD"
|
||||
database: "$POSTGRES_DB_NAME"
|
||||
|
||||
# Suppress the key server warning.
|
||||
@@ -150,7 +151,7 @@ scripts-dev/update_database --database-config "$SQLITE_CONFIG"
|
||||
|
||||
# Create the PostgreSQL database.
|
||||
echo "Creating postgres database..."
|
||||
createdb $POSTGRES_DB_NAME
|
||||
createdb --lc-collate=C --lc-ctype=C --template=template0 "$POSTGRES_DB_NAME"
|
||||
|
||||
echo "Copying data from SQLite3 to Postgres with synapse_port_db..."
|
||||
if [ -z "$COVERAGE" ]; then
|
||||
@@ -181,7 +182,7 @@ DROP TABLE user_directory_search_docsize;
|
||||
DROP TABLE user_directory_search_stat;
|
||||
"
|
||||
sqlite3 "$SQLITE_DB" <<< "$SQL"
|
||||
psql $POSTGRES_DB_NAME -U "$POSTGRES_USERNAME" -w <<< "$SQL"
|
||||
psql "$POSTGRES_DB_NAME" -w <<< "$SQL"
|
||||
|
||||
echo "Dumping SQLite3 schema to '$OUTPUT_DIR/$SQLITE_FULL_SCHEMA_OUTPUT_FILE'..."
|
||||
sqlite3 "$SQLITE_DB" ".dump" > "$OUTPUT_DIR/$SQLITE_FULL_SCHEMA_OUTPUT_FILE"
|
||||
|
||||
@@ -913,10 +913,11 @@ class Porter(object):
|
||||
(curr_forward_id + 1,),
|
||||
)
|
||||
|
||||
txn.execute(
|
||||
"ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s",
|
||||
(curr_backward_id + 1,),
|
||||
)
|
||||
if curr_backward_id:
|
||||
txn.execute(
|
||||
"ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s",
|
||||
(curr_backward_id + 1,),
|
||||
)
|
||||
|
||||
await self.postgres_store.db_pool.runInteraction(
|
||||
"_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos,
|
||||
@@ -954,10 +955,11 @@ class Porter(object):
|
||||
(curr_chain_id,),
|
||||
)
|
||||
|
||||
await self.postgres_store.db_pool.runInteraction(
|
||||
"_setup_event_auth_chain_id", r,
|
||||
)
|
||||
|
||||
if curr_chain_id is not None:
|
||||
await self.postgres_store.db_pool.runInteraction(
|
||||
"_setup_event_auth_chain_id",
|
||||
r,
|
||||
)
|
||||
|
||||
|
||||
##############################################
|
||||
|
||||
@@ -47,7 +47,7 @@ try:
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
__version__ = "1.33.0rc2"
|
||||
__version__ = "1.34.0rc1"
|
||||
|
||||
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
||||
# We import here so that we don't have to install a bunch of deps when
|
||||
|
||||
@@ -110,13 +110,18 @@ class EventTypes:
|
||||
|
||||
Dummy = "org.matrix.dummy_event"
|
||||
|
||||
SpaceChild = "m.space.child"
|
||||
SpaceParent = "m.space.parent"
|
||||
MSC1772_SPACE_CHILD = "org.matrix.msc1772.space.child"
|
||||
MSC1772_SPACE_PARENT = "org.matrix.msc1772.space.parent"
|
||||
|
||||
|
||||
class ToDeviceEventTypes:
|
||||
RoomKeyRequest = "m.room_key_request"
|
||||
|
||||
|
||||
class EduTypes:
|
||||
Presence = "m.presence"
|
||||
RoomKeyRequest = "m.room_key_request"
|
||||
|
||||
|
||||
class RejectedReason:
|
||||
@@ -174,6 +179,7 @@ class EventContentFields:
|
||||
SELF_DESTRUCT_AFTER = "org.matrix.self_destruct_after"
|
||||
|
||||
# cf https://github.com/matrix-org/matrix-doc/pull/1772
|
||||
ROOM_TYPE = "type"
|
||||
MSC1772_ROOM_TYPE = "org.matrix.msc1772.type"
|
||||
|
||||
|
||||
|
||||
@@ -23,7 +23,6 @@ from jsonschema import FormatChecker
|
||||
from synapse.api.constants import EventContentFields
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.api.presence import UserPresenceState
|
||||
from synapse.events import EventBase
|
||||
from synapse.types import RoomID, UserID
|
||||
|
||||
FILTER_SCHEMA = {
|
||||
@@ -291,13 +290,6 @@ class Filter:
|
||||
ev_type = "m.presence"
|
||||
contains_url = False
|
||||
labels = [] # type: List[str]
|
||||
elif isinstance(event, EventBase):
|
||||
sender = event.sender
|
||||
room_id = event.room_id
|
||||
ev_type = event.type
|
||||
content = event.content
|
||||
contains_url = isinstance(content.get("url"), str)
|
||||
labels = content.get(EventContentFields.LABELS, [])
|
||||
else:
|
||||
sender = event.get("sender", None)
|
||||
if not sender:
|
||||
|
||||
@@ -57,6 +57,7 @@ class Ratelimiter:
|
||||
rate_hz: Optional[float] = None,
|
||||
burst_count: Optional[int] = None,
|
||||
update: bool = True,
|
||||
n_actions: int = 1,
|
||||
_time_now_s: Optional[int] = None,
|
||||
) -> Tuple[bool, float]:
|
||||
"""Can the entity (e.g. user or IP address) perform the action?
|
||||
@@ -76,6 +77,9 @@ class Ratelimiter:
|
||||
burst_count: How many actions that can be performed before being limited.
|
||||
Overrides the value set during instantiation if set.
|
||||
update: Whether to count this check as performing the action
|
||||
n_actions: The number of times the user wants to do this action. If the user
|
||||
cannot do all of the actions, the user's action count is not incremented
|
||||
at all.
|
||||
_time_now_s: The current time. Optional, defaults to the current time according
|
||||
to self.clock. Only used by tests.
|
||||
|
||||
@@ -124,17 +128,20 @@ class Ratelimiter:
|
||||
time_delta = time_now_s - time_start
|
||||
performed_count = action_count - time_delta * rate_hz
|
||||
if performed_count < 0:
|
||||
# Allow, reset back to count 1
|
||||
allowed = True
|
||||
performed_count = 0
|
||||
time_start = time_now_s
|
||||
action_count = 1.0
|
||||
elif performed_count > burst_count - 1.0:
|
||||
|
||||
# This check would be easier read as performed_count + n_actions > burst_count,
|
||||
# but performed_count might be a very precise float (with lots of numbers
|
||||
# following the point) in which case Python might round it up when adding it to
|
||||
# n_actions. Writing it this way ensures it doesn't happen.
|
||||
if performed_count > burst_count - n_actions:
|
||||
# Deny, we have exceeded our burst count
|
||||
allowed = False
|
||||
else:
|
||||
# We haven't reached our limit yet
|
||||
allowed = True
|
||||
action_count += 1.0
|
||||
action_count = performed_count + n_actions
|
||||
|
||||
if update:
|
||||
self.actions[key] = (action_count, time_start, rate_hz)
|
||||
@@ -182,6 +189,7 @@ class Ratelimiter:
|
||||
rate_hz: Optional[float] = None,
|
||||
burst_count: Optional[int] = None,
|
||||
update: bool = True,
|
||||
n_actions: int = 1,
|
||||
_time_now_s: Optional[int] = None,
|
||||
):
|
||||
"""Checks if an action can be performed. If not, raises a LimitExceededError
|
||||
@@ -201,6 +209,9 @@ class Ratelimiter:
|
||||
burst_count: How many actions that can be performed before being limited.
|
||||
Overrides the value set during instantiation if set.
|
||||
update: Whether to count this check as performing the action
|
||||
n_actions: The number of times the user wants to do this action. If the user
|
||||
cannot do all of the actions, the user's action count is not incremented
|
||||
at all.
|
||||
_time_now_s: The current time. Optional, defaults to the current time according
|
||||
to self.clock. Only used by tests.
|
||||
|
||||
@@ -216,6 +227,7 @@ class Ratelimiter:
|
||||
rate_hz=rate_hz,
|
||||
burst_count=burst_count,
|
||||
update=update,
|
||||
n_actions=n_actions,
|
||||
_time_now_s=time_now_s,
|
||||
)
|
||||
|
||||
|
||||
@@ -88,10 +88,6 @@ class ApiConfig(Config):
|
||||
if not room_prejoin_state_config.get("disable_default_event_types"):
|
||||
yield from _DEFAULT_PREJOIN_STATE_TYPES
|
||||
|
||||
if self.spaces_enabled:
|
||||
# MSC1772 suggests adding m.room.create to the prejoin state
|
||||
yield EventTypes.Create
|
||||
|
||||
yield from room_prejoin_state_config.get("additional_event_types", [])
|
||||
|
||||
|
||||
@@ -109,6 +105,8 @@ _DEFAULT_PREJOIN_STATE_TYPES = [
|
||||
EventTypes.RoomAvatar,
|
||||
EventTypes.RoomEncryption,
|
||||
EventTypes.Name,
|
||||
# Per MSC1772.
|
||||
EventTypes.Create,
|
||||
]
|
||||
|
||||
|
||||
|
||||
@@ -44,6 +44,10 @@ class FederationConfig(Config):
|
||||
"allow_profile_lookup_over_federation", True
|
||||
)
|
||||
|
||||
self.allow_device_name_lookup_over_federation = config.get(
|
||||
"allow_device_name_lookup_over_federation", True
|
||||
)
|
||||
|
||||
def generate_config_section(self, config_dir_path, server_name, **kwargs):
|
||||
return """\
|
||||
## Federation ##
|
||||
@@ -75,6 +79,12 @@ class FederationConfig(Config):
|
||||
# on this homeserver. Defaults to 'true'.
|
||||
#
|
||||
#allow_profile_lookup_over_federation: false
|
||||
|
||||
# Uncomment to disable device display name lookup over federation. By default, the
|
||||
# Federation API allows other homeservers to obtain device display names of any user
|
||||
# on this homeserver. Defaults to 'true'.
|
||||
#
|
||||
#allow_device_name_lookup_over_federation: false
|
||||
"""
|
||||
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ import os
|
||||
import warnings
|
||||
from datetime import datetime
|
||||
from hashlib import sha256
|
||||
from typing import List, Optional
|
||||
from typing import List, Optional, Pattern
|
||||
|
||||
from unpaddedbase64 import encode_base64
|
||||
|
||||
@@ -124,7 +124,7 @@ class TlsConfig(Config):
|
||||
fed_whitelist_entries = []
|
||||
|
||||
# Support globs (*) in whitelist values
|
||||
self.federation_certificate_verification_whitelist = [] # type: List[str]
|
||||
self.federation_certificate_verification_whitelist = [] # type: List[Pattern]
|
||||
for entry in fed_whitelist_entries:
|
||||
try:
|
||||
entry_regex = glob_to_regex(entry.encode("ascii").decode("ascii"))
|
||||
|
||||
@@ -48,7 +48,7 @@ def check_event_content_hash(
|
||||
|
||||
# some malformed events lack a 'hashes'. Protect against it being missing
|
||||
# or a weird type by basically treating it the same as an unhashed event.
|
||||
hashes = getattr(event, "hashes", None)
|
||||
hashes = event.get("hashes")
|
||||
# nb it might be a frozendict or a dict
|
||||
if not isinstance(hashes, collections.abc.Mapping):
|
||||
raise SynapseError(
|
||||
|
||||
@@ -16,7 +16,8 @@
|
||||
import abc
|
||||
import logging
|
||||
import urllib
|
||||
from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple
|
||||
from collections import defaultdict
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
|
||||
|
||||
import attr
|
||||
from signedjson.key import (
|
||||
@@ -41,18 +42,17 @@ from synapse.api.errors import (
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.config.key import TrustedKeyServer
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.utils import prune_event_dict
|
||||
from synapse.logging.context import (
|
||||
PreserveLoggingContext,
|
||||
make_deferred_yieldable,
|
||||
preserve_fn,
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage.keys import FetchKeyResult
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.async_helpers import Linearizer, yieldable_gather_results
|
||||
from synapse.util.async_helpers import yieldable_gather_results
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -74,6 +74,8 @@ class VerifyJsonRequest:
|
||||
minimum_valid_until_ts: time at which we require the signing key to
|
||||
be valid. (0 implies we don't care)
|
||||
|
||||
request_name: The name of the request.
|
||||
|
||||
key_ids: The set of key_ids to that could be used to verify the JSON object
|
||||
|
||||
key_ready (Deferred[str, str, nacl.signing.VerifyKey]):
|
||||
@@ -86,93 +88,20 @@ class VerifyJsonRequest:
|
||||
"""
|
||||
|
||||
server_name = attr.ib(type=str)
|
||||
json_object_callback = attr.ib(type=Callable[[], JsonDict])
|
||||
json_object = attr.ib(type=JsonDict)
|
||||
minimum_valid_until_ts = attr.ib(type=int)
|
||||
key_ids = attr.ib(type=List[str])
|
||||
request_name = attr.ib(type=str)
|
||||
key_ids = attr.ib(init=False, type=List[str])
|
||||
key_ready = attr.ib(default=attr.Factory(defer.Deferred), type=defer.Deferred)
|
||||
|
||||
@staticmethod
|
||||
def from_json_object(
|
||||
server_name: str, minimum_valid_until_ms: int, json_object: JsonDict
|
||||
):
|
||||
key_ids = signature_ids(json_object, server_name)
|
||||
return VerifyJsonRequest(
|
||||
server_name, lambda: json_object, minimum_valid_until_ms, key_ids
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def from_event(
|
||||
server_name: str,
|
||||
minimum_valid_until_ms: int,
|
||||
event: EventBase,
|
||||
):
|
||||
key_ids = list(event.signatures.get(server_name, []))
|
||||
return VerifyJsonRequest(
|
||||
server_name,
|
||||
lambda: prune_event_dict(event.room_version, event.get_pdu_json()),
|
||||
minimum_valid_until_ms,
|
||||
key_ids,
|
||||
)
|
||||
def __attrs_post_init__(self):
|
||||
self.key_ids = signature_ids(self.json_object, self.server_name)
|
||||
|
||||
|
||||
class KeyLookupError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
class _QueueValue:
|
||||
server_name = attr.ib(type=str)
|
||||
minimum_valid_until_ts = attr.ib(type=int)
|
||||
key_ids = attr.ib(type=List[str])
|
||||
|
||||
|
||||
class _Queue:
|
||||
def __init__(self, name, clock, process_items):
|
||||
self._name = name
|
||||
self._clock = clock
|
||||
self._is_processing = False
|
||||
self._next_values = []
|
||||
|
||||
self.process_items = process_items
|
||||
|
||||
async def add_to_queue(self, value: _QueueValue) -> Dict[str, FetchKeyResult]:
|
||||
d = defer.Deferred()
|
||||
self._next_values.append((value, d))
|
||||
|
||||
if not self._is_processing:
|
||||
run_as_background_process(self._name, self._unsafe_process)
|
||||
|
||||
return await make_deferred_yieldable(d)
|
||||
|
||||
async def _unsafe_process(self):
|
||||
try:
|
||||
if self._is_processing:
|
||||
return
|
||||
|
||||
self._is_processing = True
|
||||
|
||||
while self._next_values:
|
||||
# We purposefully defer to the next loop.
|
||||
await self._clock.sleep(0)
|
||||
|
||||
next_values = self._next_values
|
||||
self._next_values = []
|
||||
|
||||
try:
|
||||
values = [value for value, _ in next_values]
|
||||
results = await self.process_items(values)
|
||||
|
||||
for value, deferred in next_values:
|
||||
with PreserveLoggingContext():
|
||||
deferred.callback(results.get(value.server_name, {}))
|
||||
|
||||
except Exception as e:
|
||||
for _, deferred in next_values:
|
||||
deferred.errback(e)
|
||||
|
||||
finally:
|
||||
self._is_processing = False
|
||||
|
||||
|
||||
class Keyring:
|
||||
def __init__(
|
||||
self, hs: "HomeServer", key_fetchers: "Optional[Iterable[KeyFetcher]]" = None
|
||||
@@ -187,7 +116,12 @@ class Keyring:
|
||||
)
|
||||
self._key_fetchers = key_fetchers
|
||||
|
||||
self._server_queue = Linearizer("keyring_server")
|
||||
# map from server name to Deferred. Has an entry for each server with
|
||||
# an ongoing key download; the Deferred completes once the download
|
||||
# completes.
|
||||
#
|
||||
# These are regular, logcontext-agnostic Deferreds.
|
||||
self.key_downloads = {} # type: Dict[str, defer.Deferred]
|
||||
|
||||
def verify_json_for_server(
|
||||
self,
|
||||
@@ -196,150 +130,365 @@ class Keyring:
|
||||
validity_time: int,
|
||||
request_name: str,
|
||||
) -> defer.Deferred:
|
||||
request = VerifyJsonRequest.from_json_object(
|
||||
server_name,
|
||||
validity_time,
|
||||
json_object,
|
||||
)
|
||||
return defer.ensureDeferred(self._verify_object(request))
|
||||
"""Verify that a JSON object has been signed by a given server
|
||||
|
||||
Args:
|
||||
server_name: name of the server which must have signed this object
|
||||
|
||||
json_object: object to be checked
|
||||
|
||||
validity_time: timestamp at which we require the signing key to
|
||||
be valid. (0 implies we don't care)
|
||||
|
||||
request_name: an identifier for this json object (eg, an event id)
|
||||
for logging.
|
||||
|
||||
Returns:
|
||||
Deferred[None]: completes if the the object was correctly signed, otherwise
|
||||
errbacks with an error
|
||||
"""
|
||||
req = VerifyJsonRequest(server_name, json_object, validity_time, request_name)
|
||||
requests = (req,)
|
||||
return make_deferred_yieldable(self._verify_objects(requests)[0])
|
||||
|
||||
def verify_json_objects_for_server(
|
||||
self, server_and_json: Iterable[Tuple[str, dict, int, str]]
|
||||
) -> List[defer.Deferred]:
|
||||
return [
|
||||
defer.ensureDeferred(
|
||||
run_in_background(
|
||||
self._verify_object,
|
||||
VerifyJsonRequest.from_json_object(
|
||||
server_name,
|
||||
validity_time,
|
||||
json_object,
|
||||
),
|
||||
)
|
||||
)
|
||||
"""Bulk verifies signatures of json objects, bulk fetching keys as
|
||||
necessary.
|
||||
|
||||
Args:
|
||||
server_and_json:
|
||||
Iterable of (server_name, json_object, validity_time, request_name)
|
||||
tuples.
|
||||
|
||||
validity_time is a timestamp at which the signing key must be
|
||||
valid.
|
||||
|
||||
request_name is an identifier for this json object (eg, an event id)
|
||||
for logging.
|
||||
|
||||
Returns:
|
||||
List<Deferred[None]>: for each input triplet, a deferred indicating success
|
||||
or failure to verify each json object's signature for the given
|
||||
server_name. The deferreds run their callbacks in the sentinel
|
||||
logcontext.
|
||||
"""
|
||||
return self._verify_objects(
|
||||
VerifyJsonRequest(server_name, json_object, validity_time, request_name)
|
||||
for server_name, json_object, validity_time, request_name in server_and_json
|
||||
]
|
||||
)
|
||||
|
||||
def verify_events_for_server(
|
||||
self, server_and_json: Iterable[Tuple[str, EventBase, int]]
|
||||
def _verify_objects(
|
||||
self, verify_requests: Iterable[VerifyJsonRequest]
|
||||
) -> List[defer.Deferred]:
|
||||
return [
|
||||
run_in_background(
|
||||
self._verify_object,
|
||||
VerifyJsonRequest.from_event(
|
||||
server_name,
|
||||
validity_time,
|
||||
event,
|
||||
),
|
||||
"""Does the work of verify_json_[objects_]for_server
|
||||
|
||||
|
||||
Args:
|
||||
verify_requests: Iterable of verification requests.
|
||||
|
||||
Returns:
|
||||
List<Deferred[None]>: for each input item, a deferred indicating success
|
||||
or failure to verify each json object's signature for the given
|
||||
server_name. The deferreds run their callbacks in the sentinel
|
||||
logcontext.
|
||||
"""
|
||||
# a list of VerifyJsonRequests which are awaiting a key lookup
|
||||
key_lookups = []
|
||||
handle = preserve_fn(_handle_key_deferred)
|
||||
|
||||
def process(verify_request: VerifyJsonRequest) -> defer.Deferred:
|
||||
"""Process an entry in the request list
|
||||
|
||||
Adds a key request to key_lookups, and returns a deferred which
|
||||
will complete or fail (in the sentinel context) when verification completes.
|
||||
"""
|
||||
if not verify_request.key_ids:
|
||||
return defer.fail(
|
||||
SynapseError(
|
||||
400,
|
||||
"Not signed by %s" % (verify_request.server_name,),
|
||||
Codes.UNAUTHORIZED,
|
||||
)
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
"Verifying %s for %s with key_ids %s, min_validity %i",
|
||||
verify_request.request_name,
|
||||
verify_request.server_name,
|
||||
verify_request.key_ids,
|
||||
verify_request.minimum_valid_until_ts,
|
||||
)
|
||||
for server_name, event, validity_time in server_and_json
|
||||
]
|
||||
|
||||
async def _verify_object(self, verify_request: VerifyJsonRequest):
|
||||
# TODO: Use a batching thing.
|
||||
with (await self._server_queue.queue(verify_request.server_name)):
|
||||
found_keys: Dict[str, FetchKeyResult] = {}
|
||||
missing_key_ids = set(verify_request.key_ids)
|
||||
for fetcher in self._key_fetchers:
|
||||
if not missing_key_ids:
|
||||
break
|
||||
# add the key request to the queue, but don't start it off yet.
|
||||
key_lookups.append(verify_request)
|
||||
|
||||
keys = await fetcher.get_keys(
|
||||
verify_request.server_name,
|
||||
list(missing_key_ids),
|
||||
# now run _handle_key_deferred, which will wait for the key request
|
||||
# to complete and then do the verification.
|
||||
#
|
||||
# We want _handle_key_request to log to the right context, so we
|
||||
# wrap it with preserve_fn (aka run_in_background)
|
||||
return handle(verify_request)
|
||||
|
||||
results = [process(r) for r in verify_requests]
|
||||
|
||||
if key_lookups:
|
||||
run_in_background(self._start_key_lookups, key_lookups)
|
||||
|
||||
return results
|
||||
|
||||
async def _start_key_lookups(
|
||||
self, verify_requests: List[VerifyJsonRequest]
|
||||
) -> None:
|
||||
"""Sets off the key fetches for each verify request
|
||||
|
||||
Once each fetch completes, verify_request.key_ready will be resolved.
|
||||
|
||||
Args:
|
||||
verify_requests:
|
||||
"""
|
||||
|
||||
try:
|
||||
# map from server name to a set of outstanding request ids
|
||||
server_to_request_ids = {} # type: Dict[str, Set[int]]
|
||||
|
||||
for verify_request in verify_requests:
|
||||
server_name = verify_request.server_name
|
||||
request_id = id(verify_request)
|
||||
server_to_request_ids.setdefault(server_name, set()).add(request_id)
|
||||
|
||||
# Wait for any previous lookups to complete before proceeding.
|
||||
await self.wait_for_previous_lookups(server_to_request_ids.keys())
|
||||
|
||||
# take out a lock on each of the servers by sticking a Deferred in
|
||||
# key_downloads
|
||||
for server_name in server_to_request_ids.keys():
|
||||
self.key_downloads[server_name] = defer.Deferred()
|
||||
logger.debug("Got key lookup lock on %s", server_name)
|
||||
|
||||
# When we've finished fetching all the keys for a given server_name,
|
||||
# drop the lock by resolving the deferred in key_downloads.
|
||||
def drop_server_lock(server_name):
|
||||
d = self.key_downloads.pop(server_name)
|
||||
d.callback(None)
|
||||
|
||||
def lookup_done(res, verify_request):
|
||||
server_name = verify_request.server_name
|
||||
server_requests = server_to_request_ids[server_name]
|
||||
server_requests.remove(id(verify_request))
|
||||
|
||||
# if there are no more requests for this server, we can drop the lock.
|
||||
if not server_requests:
|
||||
logger.debug("Releasing key lookup lock on %s", server_name)
|
||||
drop_server_lock(server_name)
|
||||
|
||||
return res
|
||||
|
||||
for verify_request in verify_requests:
|
||||
verify_request.key_ready.addBoth(lookup_done, verify_request)
|
||||
|
||||
# Actually start fetching keys.
|
||||
self._get_server_verify_keys(verify_requests)
|
||||
except Exception:
|
||||
logger.exception("Error starting key lookups")
|
||||
|
||||
async def wait_for_previous_lookups(self, server_names: Iterable[str]) -> None:
|
||||
"""Waits for any previous key lookups for the given servers to finish.
|
||||
|
||||
Args:
|
||||
server_names: list of servers which we want to look up
|
||||
|
||||
Returns:
|
||||
Resolves once all key lookups for the given servers have
|
||||
completed. Follows the synapse rules of logcontext preservation.
|
||||
"""
|
||||
loop_count = 1
|
||||
while True:
|
||||
wait_on = [
|
||||
(server_name, self.key_downloads[server_name])
|
||||
for server_name in server_names
|
||||
if server_name in self.key_downloads
|
||||
]
|
||||
if not wait_on:
|
||||
break
|
||||
logger.info(
|
||||
"Waiting for existing lookups for %s to complete [loop %i]",
|
||||
[w[0] for w in wait_on],
|
||||
loop_count,
|
||||
)
|
||||
with PreserveLoggingContext():
|
||||
await defer.DeferredList((w[1] for w in wait_on))
|
||||
|
||||
loop_count += 1
|
||||
|
||||
def _get_server_verify_keys(self, verify_requests: List[VerifyJsonRequest]) -> None:
|
||||
"""Tries to find at least one key for each verify request
|
||||
|
||||
For each verify_request, verify_request.key_ready is called back with
|
||||
params (server_name, key_id, VerifyKey) if a key is found, or errbacked
|
||||
with a SynapseError if none of the keys are found.
|
||||
|
||||
Args:
|
||||
verify_requests: list of verify requests
|
||||
"""
|
||||
|
||||
remaining_requests = {rq for rq in verify_requests if not rq.key_ready.called}
|
||||
|
||||
async def do_iterations():
|
||||
try:
|
||||
with Measure(self.clock, "get_server_verify_keys"):
|
||||
for f in self._key_fetchers:
|
||||
if not remaining_requests:
|
||||
return
|
||||
await self._attempt_key_fetches_with_fetcher(
|
||||
f, remaining_requests
|
||||
)
|
||||
|
||||
# look for any requests which weren't satisfied
|
||||
while remaining_requests:
|
||||
verify_request = remaining_requests.pop()
|
||||
rq_str = (
|
||||
"VerifyJsonRequest(server=%s, key_ids=%s, min_valid=%i)"
|
||||
% (
|
||||
verify_request.server_name,
|
||||
verify_request.key_ids,
|
||||
verify_request.minimum_valid_until_ts,
|
||||
)
|
||||
)
|
||||
|
||||
# If we run the errback immediately, it may cancel our
|
||||
# loggingcontext while we are still in it, so instead we
|
||||
# schedule it for the next time round the reactor.
|
||||
#
|
||||
# (this also ensures that we don't get a stack overflow if we
|
||||
# has a massive queue of lookups waiting for this server).
|
||||
self.clock.call_later(
|
||||
0,
|
||||
verify_request.key_ready.errback,
|
||||
SynapseError(
|
||||
401,
|
||||
"Failed to find any key to satisfy %s" % (rq_str,),
|
||||
Codes.UNAUTHORIZED,
|
||||
),
|
||||
)
|
||||
except Exception as err:
|
||||
# we don't really expect to get here, because any errors should already
|
||||
# have been caught and logged. But if we do, let's log the error and make
|
||||
# sure that all of the deferreds are resolved.
|
||||
logger.error("Unexpected error in _get_server_verify_keys: %s", err)
|
||||
with PreserveLoggingContext():
|
||||
for verify_request in remaining_requests:
|
||||
if not verify_request.key_ready.called:
|
||||
verify_request.key_ready.errback(err)
|
||||
|
||||
run_in_background(do_iterations)
|
||||
|
||||
async def _attempt_key_fetches_with_fetcher(
|
||||
self, fetcher: "KeyFetcher", remaining_requests: Set[VerifyJsonRequest]
|
||||
):
|
||||
"""Use a key fetcher to attempt to satisfy some key requests
|
||||
|
||||
Args:
|
||||
fetcher: fetcher to use to fetch the keys
|
||||
remaining_requests: outstanding key requests.
|
||||
Any successfully-completed requests will be removed from the list.
|
||||
"""
|
||||
# The keys to fetch.
|
||||
# server_name -> key_id -> min_valid_ts
|
||||
missing_keys = defaultdict(dict) # type: Dict[str, Dict[str, int]]
|
||||
|
||||
for verify_request in remaining_requests:
|
||||
# any completed requests should already have been removed
|
||||
assert not verify_request.key_ready.called
|
||||
keys_for_server = missing_keys[verify_request.server_name]
|
||||
|
||||
for key_id in verify_request.key_ids:
|
||||
# If we have several requests for the same key, then we only need to
|
||||
# request that key once, but we should do so with the greatest
|
||||
# min_valid_until_ts of the requests, so that we can satisfy all of
|
||||
# the requests.
|
||||
keys_for_server[key_id] = max(
|
||||
keys_for_server.get(key_id, -1),
|
||||
verify_request.minimum_valid_until_ts,
|
||||
)
|
||||
|
||||
for key_id, key in keys.items():
|
||||
if not key:
|
||||
continue
|
||||
results = await fetcher.get_keys(missing_keys)
|
||||
|
||||
if key.valid_until_ts < verify_request.minimum_valid_until_ts:
|
||||
continue
|
||||
|
||||
existing_key = found_keys.get(key_id)
|
||||
if existing_key:
|
||||
if key.valid_until_ts <= existing_key.valid_until_ts:
|
||||
continue
|
||||
|
||||
found_keys[key_id] = key
|
||||
|
||||
missing_key_ids.difference_update(found_keys)
|
||||
|
||||
if missing_key_ids:
|
||||
raise SynapseError(
|
||||
400,
|
||||
"Missing keys for %s: %s"
|
||||
% (verify_request.server_name, missing_key_ids),
|
||||
Codes.UNAUTHORIZED,
|
||||
)
|
||||
completed = []
|
||||
for verify_request in remaining_requests:
|
||||
server_name = verify_request.server_name
|
||||
|
||||
# see if any of the keys we got this time are sufficient to
|
||||
# complete this VerifyJsonRequest.
|
||||
result_keys = results.get(server_name, {})
|
||||
for key_id in verify_request.key_ids:
|
||||
verify_key = found_keys[key_id].verify_key
|
||||
try:
|
||||
json_object = verify_request.json_object_callback()
|
||||
verify_signed_json(
|
||||
json_object,
|
||||
verify_request.server_name,
|
||||
verify_key,
|
||||
)
|
||||
except SignatureVerifyException as e:
|
||||
logger.debug(
|
||||
"Error verifying signature for %s:%s:%s with key %s: %s",
|
||||
verify_request.server_name,
|
||||
verify_key.alg,
|
||||
verify_key.version,
|
||||
encode_verify_key_base64(verify_key),
|
||||
str(e),
|
||||
)
|
||||
raise SynapseError(
|
||||
401,
|
||||
"Invalid signature for server %s with key %s:%s: %s"
|
||||
% (
|
||||
verify_request.server_name,
|
||||
verify_key.alg,
|
||||
verify_key.version,
|
||||
str(e),
|
||||
),
|
||||
Codes.UNAUTHORIZED,
|
||||
)
|
||||
fetch_key_result = result_keys.get(key_id)
|
||||
if not fetch_key_result:
|
||||
# we didn't get a result for this key
|
||||
continue
|
||||
|
||||
if (
|
||||
fetch_key_result.valid_until_ts
|
||||
< verify_request.minimum_valid_until_ts
|
||||
):
|
||||
# key was not valid at this point
|
||||
continue
|
||||
|
||||
# we have a valid key for this request. If we run the callback
|
||||
# immediately, it may cancel our loggingcontext while we are still in
|
||||
# it, so instead we schedule it for the next time round the reactor.
|
||||
#
|
||||
# (this also ensures that we don't get a stack overflow if we had
|
||||
# a massive queue of lookups waiting for this server).
|
||||
logger.debug(
|
||||
"Found key %s:%s for %s",
|
||||
server_name,
|
||||
key_id,
|
||||
verify_request.request_name,
|
||||
)
|
||||
self.clock.call_later(
|
||||
0,
|
||||
verify_request.key_ready.callback,
|
||||
(server_name, key_id, fetch_key_result.verify_key),
|
||||
)
|
||||
completed.append(verify_request)
|
||||
break
|
||||
|
||||
remaining_requests.difference_update(completed)
|
||||
|
||||
|
||||
class KeyFetcher(metaclass=abc.ABCMeta):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._queue = _Queue(self.__class__.__name__, hs.get_clock(), self._fetch_keys)
|
||||
|
||||
async def get_keys(
|
||||
self, server_name: str, key_ids: List[str], minimum_valid_until_ts: int
|
||||
) -> Dict[str, FetchKeyResult]:
|
||||
return await self._queue.add_to_queue(
|
||||
_QueueValue(
|
||||
server_name=server_name,
|
||||
key_ids=key_ids,
|
||||
minimum_valid_until_ts=minimum_valid_until_ts,
|
||||
)
|
||||
)
|
||||
|
||||
@abc.abstractmethod
|
||||
async def _fetch_keys(
|
||||
self, keys_to_fetch: List[_QueueValue]
|
||||
async def get_keys(
|
||||
self, keys_to_fetch: Dict[str, Dict[str, int]]
|
||||
) -> Dict[str, Dict[str, FetchKeyResult]]:
|
||||
pass
|
||||
"""
|
||||
Args:
|
||||
keys_to_fetch:
|
||||
the keys to be fetched. server_name -> key_id -> min_valid_ts
|
||||
|
||||
Returns:
|
||||
Map from server_name -> key_id -> FetchKeyResult
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class StoreKeyFetcher(KeyFetcher):
|
||||
"""KeyFetcher impl which fetches keys from our data store"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
async def _fetch_keys(self, keys_to_fetch: List[_QueueValue]):
|
||||
async def get_keys(
|
||||
self, keys_to_fetch: Dict[str, Dict[str, int]]
|
||||
) -> Dict[str, Dict[str, FetchKeyResult]]:
|
||||
"""see KeyFetcher.get_keys"""
|
||||
|
||||
key_ids_to_fetch = (
|
||||
(queue_value.server_name, key_id)
|
||||
for queue_value in keys_to_fetch
|
||||
for key_id in queue_value.key_ids
|
||||
(server_name, key_id)
|
||||
for server_name, keys_for_server in keys_to_fetch.items()
|
||||
for key_id in keys_for_server.keys()
|
||||
)
|
||||
|
||||
res = await self.store.get_server_verify_keys(key_ids_to_fetch)
|
||||
@@ -351,8 +500,6 @@ class StoreKeyFetcher(KeyFetcher):
|
||||
|
||||
class BaseV2KeyFetcher(KeyFetcher):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
|
||||
self.store = hs.get_datastore()
|
||||
self.config = hs.config
|
||||
|
||||
@@ -460,10 +607,10 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
|
||||
self.client = hs.get_federation_http_client()
|
||||
self.key_servers = self.config.key_servers
|
||||
|
||||
async def _fetch_keys(
|
||||
self, keys_to_fetch: List[_QueueValue]
|
||||
async def get_keys(
|
||||
self, keys_to_fetch: Dict[str, Dict[str, int]]
|
||||
) -> Dict[str, Dict[str, FetchKeyResult]]:
|
||||
"""see KeyFetcher._fetch_keys"""
|
||||
"""see KeyFetcher.get_keys"""
|
||||
|
||||
async def get_key(key_server: TrustedKeyServer) -> Dict:
|
||||
try:
|
||||
@@ -499,12 +646,12 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
|
||||
return union_of_keys
|
||||
|
||||
async def get_server_verify_key_v2_indirect(
|
||||
self, keys_to_fetch: List[_QueueValue], key_server: TrustedKeyServer
|
||||
self, keys_to_fetch: Dict[str, Dict[str, int]], key_server: TrustedKeyServer
|
||||
) -> Dict[str, Dict[str, FetchKeyResult]]:
|
||||
"""
|
||||
Args:
|
||||
keys_to_fetch:
|
||||
the keys to be fetched.
|
||||
the keys to be fetched. server_name -> key_id -> min_valid_ts
|
||||
|
||||
key_server: notary server to query for the keys
|
||||
|
||||
@@ -518,7 +665,7 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
|
||||
perspective_name = key_server.server_name
|
||||
logger.info(
|
||||
"Requesting keys %s from notary server %s",
|
||||
keys_to_fetch,
|
||||
keys_to_fetch.items(),
|
||||
perspective_name,
|
||||
)
|
||||
|
||||
@@ -528,13 +675,11 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
|
||||
path="/_matrix/key/v2/query",
|
||||
data={
|
||||
"server_keys": {
|
||||
queue_value.server_name: {
|
||||
key_id: {
|
||||
"minimum_valid_until_ts": queue_value.minimum_valid_until_ts,
|
||||
}
|
||||
for key_id in queue_value.key_ids
|
||||
server_name: {
|
||||
key_id: {"minimum_valid_until_ts": min_valid_ts}
|
||||
for key_id, min_valid_ts in server_keys.items()
|
||||
}
|
||||
for queue_value in keys_to_fetch
|
||||
for server_name, server_keys in keys_to_fetch.items()
|
||||
}
|
||||
},
|
||||
)
|
||||
@@ -634,8 +779,8 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
|
||||
self.clock = hs.get_clock()
|
||||
self.client = hs.get_federation_http_client()
|
||||
|
||||
async def _fetch_keys(
|
||||
self, keys_to_fetch: List[_QueueValue]
|
||||
async def get_keys(
|
||||
self, keys_to_fetch: Dict[str, Dict[str, int]]
|
||||
) -> Dict[str, Dict[str, FetchKeyResult]]:
|
||||
"""
|
||||
Args:
|
||||
@@ -648,10 +793,8 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
|
||||
|
||||
results = {}
|
||||
|
||||
async def get_key(key_to_fetch_item: _QueueValue) -> None:
|
||||
server_name = key_to_fetch_item.server_name
|
||||
key_ids = key_to_fetch_item.key_ids
|
||||
|
||||
async def get_key(key_to_fetch_item: Tuple[str, Dict[str, int]]) -> None:
|
||||
server_name, key_ids = key_to_fetch_item
|
||||
try:
|
||||
keys = await self.get_server_verify_key_v2_direct(server_name, key_ids)
|
||||
results[server_name] = keys
|
||||
@@ -662,7 +805,7 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
|
||||
except Exception:
|
||||
logger.exception("Error getting keys %s from %s", key_ids, server_name)
|
||||
|
||||
await yieldable_gather_results(get_key, keys_to_fetch)
|
||||
await yieldable_gather_results(get_key, keys_to_fetch.items())
|
||||
return results
|
||||
|
||||
async def get_server_verify_key_v2_direct(
|
||||
@@ -734,3 +877,37 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
|
||||
keys.update(response_keys)
|
||||
|
||||
return keys
|
||||
|
||||
|
||||
async def _handle_key_deferred(verify_request: VerifyJsonRequest) -> None:
|
||||
"""Waits for the key to become available, and then performs a verification
|
||||
|
||||
Args:
|
||||
verify_request:
|
||||
|
||||
Raises:
|
||||
SynapseError if there was a problem performing the verification
|
||||
"""
|
||||
server_name = verify_request.server_name
|
||||
with PreserveLoggingContext():
|
||||
_, key_id, verify_key = await verify_request.key_ready
|
||||
|
||||
json_object = verify_request.json_object
|
||||
|
||||
try:
|
||||
verify_signed_json(json_object, server_name, verify_key)
|
||||
except SignatureVerifyException as e:
|
||||
logger.debug(
|
||||
"Error verifying signature for %s:%s:%s with key %s: %s",
|
||||
server_name,
|
||||
verify_key.alg,
|
||||
verify_key.version,
|
||||
encode_verify_key_base64(verify_key),
|
||||
str(e),
|
||||
)
|
||||
raise SynapseError(
|
||||
401,
|
||||
"Invalid signature for server %s with key %s:%s: %s"
|
||||
% (server_name, verify_key.alg, verify_key.version, str(e)),
|
||||
Codes.UNAUTHORIZED,
|
||||
)
|
||||
|
||||
@@ -418,9 +418,7 @@ def get_send_level(
|
||||
def _can_send_event(event: EventBase, auth_events: StateMap[EventBase]) -> bool:
|
||||
power_levels_event = _get_power_level_event(auth_events)
|
||||
|
||||
send_level = get_send_level(
|
||||
event.type, getattr(event, "state_key", None), power_levels_event
|
||||
)
|
||||
send_level = get_send_level(event.type, event.get("state_key"), power_levels_event)
|
||||
user_level = get_user_power_level(event.user_id, auth_events)
|
||||
|
||||
if user_level < send_level:
|
||||
|
||||
@@ -16,15 +16,12 @@
|
||||
|
||||
import abc
|
||||
import os
|
||||
import zlib
|
||||
from typing import Dict, List, Optional, Tuple, Type, Union
|
||||
from typing import Dict, Optional, Tuple, Type
|
||||
|
||||
import attr
|
||||
from unpaddedbase64 import decode_base64, encode_base64
|
||||
from unpaddedbase64 import encode_base64
|
||||
|
||||
from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
|
||||
from synapse.types import JsonDict, RoomStreamToken
|
||||
from synapse.util import json_decoder, json_encoder
|
||||
from synapse.util.caches import intern_dict
|
||||
from synapse.util.frozenutils import freeze
|
||||
from synapse.util.stringutils import strtobool
|
||||
@@ -40,26 +37,6 @@ from synapse.util.stringutils import strtobool
|
||||
USE_FROZEN_DICTS = strtobool(os.environ.get("SYNAPSE_USE_FROZEN_DICTS", "0"))
|
||||
|
||||
|
||||
_PRESET_ZDICT = b"""{"auth_events":[],"prev_events":[],"type":"m.room.member",m.room.message"room_id":,"sender":,"content":{"msgtype":"m.text","body":""room_version":"creator":"depth":"prev_state":"state_key":""origin":"origin_server_ts":"hashes":{"sha256":"signatures":,"unsigned":{"age_ts":"ed25519"""
|
||||
|
||||
|
||||
def _encode_dict(d: JsonDict) -> bytes:
|
||||
json_bytes = json_encoder.encode(d).encode("utf-8")
|
||||
c = zlib.compressobj(1, zdict=_PRESET_ZDICT)
|
||||
result_bytes = c.compress(json_bytes)
|
||||
result_bytes += c.flush()
|
||||
return result_bytes
|
||||
|
||||
|
||||
def _decode_dict(b: bytes) -> JsonDict:
|
||||
d = zlib.decompressobj(zdict=_PRESET_ZDICT)
|
||||
|
||||
result_bytes = d.decompress(b)
|
||||
result_bytes += d.flush()
|
||||
|
||||
return json_decoder.decode(result_bytes.decode("utf-8"))
|
||||
|
||||
|
||||
class DictProperty:
|
||||
"""An object property which delegates to the `_dict` within its parent object."""
|
||||
|
||||
@@ -228,81 +205,7 @@ class _EventInternalMetadata:
|
||||
return self._dict.get("redacted", False)
|
||||
|
||||
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class _Signatures:
|
||||
_signatures_bytes: bytes
|
||||
|
||||
@staticmethod
|
||||
def from_dict(signature_dict: JsonDict) -> "_Signatures":
|
||||
return _Signatures(_encode_dict(signature_dict))
|
||||
|
||||
def get_dict(self) -> JsonDict:
|
||||
return _decode_dict(self._signatures_bytes)
|
||||
|
||||
def get(self, server_name, default=None):
|
||||
return self.get_dict().get(server_name, default)
|
||||
|
||||
def update(self, other: Union[JsonDict, "_Signatures"]):
|
||||
if isinstance(other, _Signatures):
|
||||
other_dict = _decode_dict(other._signatures_bytes)
|
||||
else:
|
||||
other_dict = other
|
||||
|
||||
signatures = self.get_dict()
|
||||
signatures.update(other_dict)
|
||||
self._signatures_bytes = _encode_dict(signatures)
|
||||
|
||||
|
||||
class _SmallListV1(str):
|
||||
__slots__ = []
|
||||
|
||||
def get(self):
|
||||
return self.split(",")
|
||||
|
||||
@staticmethod
|
||||
def create(event_ids):
|
||||
return _SmallListV1(",".join(event_ids))
|
||||
|
||||
|
||||
class _SmallListV2_V3(bytes):
|
||||
__slots__ = []
|
||||
|
||||
def get(self, url_safe):
|
||||
i = 0
|
||||
while i * 32 < len(self):
|
||||
bit = self[i * 32 : (i + 1) * 32]
|
||||
i += 1
|
||||
yield "$" + encode_base64(bit, urlsafe=url_safe)
|
||||
|
||||
@staticmethod
|
||||
def create(event_ids):
|
||||
return _SmallListV2_V3(
|
||||
b"".join(decode_base64(event_id[1:]) for event_id in event_ids)
|
||||
)
|
||||
|
||||
|
||||
class EventBase(metaclass=abc.ABCMeta):
|
||||
__slots__ = [
|
||||
"room_version",
|
||||
"signatures",
|
||||
"unsigned",
|
||||
"rejected_reason",
|
||||
"_encoded_dict",
|
||||
"_auth_event_ids",
|
||||
"depth",
|
||||
"_content",
|
||||
"_hashes",
|
||||
"origin",
|
||||
"origin_server_ts",
|
||||
"_prev_event_ids",
|
||||
"redacts",
|
||||
"room_id",
|
||||
"sender",
|
||||
"type",
|
||||
"state_key",
|
||||
"internal_metadata",
|
||||
]
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def format_version(self) -> int:
|
||||
@@ -321,44 +224,32 @@ class EventBase(metaclass=abc.ABCMeta):
|
||||
assert room_version.event_format == self.format_version
|
||||
|
||||
self.room_version = room_version
|
||||
self.signatures = _Signatures.from_dict(signatures)
|
||||
self.signatures = signatures
|
||||
self.unsigned = unsigned
|
||||
self.rejected_reason = rejected_reason
|
||||
|
||||
self._encoded_dict = _encode_dict(event_dict)
|
||||
|
||||
self.depth = event_dict["depth"]
|
||||
self.origin = event_dict.get("origin")
|
||||
self.origin_server_ts = event_dict["origin_server_ts"]
|
||||
self.redacts = event_dict.get("redacts")
|
||||
self.room_id = event_dict["room_id"]
|
||||
self.sender = event_dict["sender"]
|
||||
self.type = event_dict["type"]
|
||||
if "state_key" in event_dict:
|
||||
self.state_key = event_dict["state_key"]
|
||||
self._dict = event_dict
|
||||
|
||||
self.internal_metadata = _EventInternalMetadata(internal_metadata_dict)
|
||||
|
||||
@property
|
||||
def content(self) -> JsonDict:
|
||||
return self.get_dict()["content"]
|
||||
|
||||
@property
|
||||
def hashes(self) -> JsonDict:
|
||||
return self.get_dict()["hashes"]
|
||||
|
||||
@property
|
||||
def prev_events(self) -> List[str]:
|
||||
return list(self._prev_events)
|
||||
auth_events = DictProperty("auth_events")
|
||||
depth = DictProperty("depth")
|
||||
content = DictProperty("content")
|
||||
hashes = DictProperty("hashes")
|
||||
origin = DictProperty("origin")
|
||||
origin_server_ts = DictProperty("origin_server_ts")
|
||||
prev_events = DictProperty("prev_events")
|
||||
redacts = DefaultDictProperty("redacts", None)
|
||||
room_id = DictProperty("room_id")
|
||||
sender = DictProperty("sender")
|
||||
state_key = DictProperty("state_key")
|
||||
type = DictProperty("type")
|
||||
user_id = DictProperty("sender")
|
||||
|
||||
@property
|
||||
def event_id(self) -> str:
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def user_id(self) -> str:
|
||||
return self.sender
|
||||
|
||||
@property
|
||||
def membership(self):
|
||||
return self.content["membership"]
|
||||
@@ -367,13 +258,17 @@ class EventBase(metaclass=abc.ABCMeta):
|
||||
return hasattr(self, "state_key") and self.state_key is not None
|
||||
|
||||
def get_dict(self) -> JsonDict:
|
||||
d = _decode_dict(self._encoded_dict)
|
||||
d.update(
|
||||
{"signatures": self.signatures.get_dict(), "unsigned": dict(self.unsigned)}
|
||||
)
|
||||
d = dict(self._dict)
|
||||
d.update({"signatures": self.signatures, "unsigned": dict(self.unsigned)})
|
||||
|
||||
return d
|
||||
|
||||
def get(self, key, default=None):
|
||||
return self._dict.get(key, default)
|
||||
|
||||
def get_internal_metadata_dict(self):
|
||||
return self.internal_metadata.get_dict()
|
||||
|
||||
def get_pdu_json(self, time_now=None) -> JsonDict:
|
||||
pdu_json = self.get_dict()
|
||||
|
||||
@@ -390,11 +285,41 @@ class EventBase(metaclass=abc.ABCMeta):
|
||||
def __set__(self, instance, value):
|
||||
raise AttributeError("Unrecognized attribute %s" % (instance,))
|
||||
|
||||
def __getitem__(self, field):
|
||||
return self._dict[field]
|
||||
|
||||
def __contains__(self, field):
|
||||
return field in self._dict
|
||||
|
||||
def items(self):
|
||||
return list(self._dict.items())
|
||||
|
||||
def keys(self):
|
||||
return self._dict.keys()
|
||||
|
||||
def prev_event_ids(self):
|
||||
"""Returns the list of prev event IDs. The order matches the order
|
||||
specified in the event, though there is no meaning to it.
|
||||
|
||||
Returns:
|
||||
list[str]: The list of event IDs of this event's prev_events
|
||||
"""
|
||||
return [e for e, _ in self.prev_events]
|
||||
|
||||
def auth_event_ids(self):
|
||||
"""Returns the list of auth event IDs. The order matches the order
|
||||
specified in the event, though there is no meaning to it.
|
||||
|
||||
Returns:
|
||||
list[str]: The list of event IDs of this event's auth_events
|
||||
"""
|
||||
return [e for e, _ in self.auth_events]
|
||||
|
||||
def freeze(self):
|
||||
"""'Freeze' the event dict, so it cannot be modified by accident"""
|
||||
|
||||
# this will be a no-op if the event dict is already frozen.
|
||||
# self._dict = freeze(self._dict)
|
||||
self._dict = freeze(self._dict)
|
||||
|
||||
|
||||
class FrozenEvent(EventBase):
|
||||
@@ -430,12 +355,6 @@ class FrozenEvent(EventBase):
|
||||
frozen_dict = event_dict
|
||||
|
||||
self._event_id = event_dict["event_id"]
|
||||
self._auth_event_ids = _SmallListV1.create(
|
||||
e for e, _ in event_dict["auth_events"]
|
||||
)
|
||||
self._prev_event_ids = _SmallListV1.create(
|
||||
e for e, _ in event_dict["prev_events"]
|
||||
)
|
||||
|
||||
super().__init__(
|
||||
frozen_dict,
|
||||
@@ -450,26 +369,18 @@ class FrozenEvent(EventBase):
|
||||
def event_id(self) -> str:
|
||||
return self._event_id
|
||||
|
||||
def auth_event_ids(self):
|
||||
return list(self._auth_event_ids.get())
|
||||
|
||||
def prev_event_ids(self):
|
||||
return list(self._prev_event_ids.get())
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__()
|
||||
|
||||
def __repr__(self):
|
||||
return "<FrozenEvent event_id=%r, type=%r, state_key=%r>" % (
|
||||
self.event_id,
|
||||
self.type,
|
||||
getattr(self, "state_key", None),
|
||||
self.get("event_id", None),
|
||||
self.get("type", None),
|
||||
self.get("state_key", None),
|
||||
)
|
||||
|
||||
|
||||
class FrozenEventV2(EventBase):
|
||||
__slots__ = ["_event_id"]
|
||||
|
||||
format_version = EventFormatVersions.V2 # All events of this type are V2
|
||||
|
||||
def __init__(
|
||||
@@ -504,8 +415,6 @@ class FrozenEventV2(EventBase):
|
||||
frozen_dict = event_dict
|
||||
|
||||
self._event_id = None
|
||||
self._auth_event_ids = _SmallListV2_V3.create(event_dict["auth_events"])
|
||||
self._prev_event_ids = _SmallListV2_V3.create(event_dict["prev_events"])
|
||||
|
||||
super().__init__(
|
||||
frozen_dict,
|
||||
@@ -527,6 +436,24 @@ class FrozenEventV2(EventBase):
|
||||
self._event_id = "$" + encode_base64(compute_event_reference_hash(self)[1])
|
||||
return self._event_id
|
||||
|
||||
def prev_event_ids(self):
|
||||
"""Returns the list of prev event IDs. The order matches the order
|
||||
specified in the event, though there is no meaning to it.
|
||||
|
||||
Returns:
|
||||
list[str]: The list of event IDs of this event's prev_events
|
||||
"""
|
||||
return self.prev_events
|
||||
|
||||
def auth_event_ids(self):
|
||||
"""Returns the list of auth event IDs. The order matches the order
|
||||
specified in the event, though there is no meaning to it.
|
||||
|
||||
Returns:
|
||||
list[str]: The list of event IDs of this event's auth_events
|
||||
"""
|
||||
return self.auth_events
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__()
|
||||
|
||||
@@ -534,22 +461,14 @@ class FrozenEventV2(EventBase):
|
||||
return "<%s event_id=%r, type=%r, state_key=%r>" % (
|
||||
self.__class__.__name__,
|
||||
self.event_id,
|
||||
self.type,
|
||||
self.state_key if self.is_state() else None,
|
||||
self.get("type", None),
|
||||
self.get("state_key", None),
|
||||
)
|
||||
|
||||
def auth_event_ids(self):
|
||||
return list(self._auth_event_ids.get(False))
|
||||
|
||||
def prev_event_ids(self):
|
||||
return list(self._prev_event_ids.get(False))
|
||||
|
||||
|
||||
class FrozenEventV3(FrozenEventV2):
|
||||
"""FrozenEventV3, which differs from FrozenEventV2 only in the event_id format"""
|
||||
|
||||
__slots__ = ["_event_id"]
|
||||
|
||||
format_version = EventFormatVersions.V3 # All events of this type are V3
|
||||
|
||||
@property
|
||||
@@ -565,12 +484,6 @@ class FrozenEventV3(FrozenEventV2):
|
||||
)
|
||||
return self._event_id
|
||||
|
||||
def auth_event_ids(self):
|
||||
return list(self._auth_event_ids.get(True))
|
||||
|
||||
def prev_event_ids(self):
|
||||
return list(self._prev_event_ids.get(True))
|
||||
|
||||
|
||||
def _event_type_from_format_version(format_version: int) -> Type[EventBase]:
|
||||
"""Returns the python type to use to construct an Event object for the
|
||||
|
||||
@@ -38,8 +38,6 @@ class EventValidator:
|
||||
if event.format_version == EventFormatVersions.V1:
|
||||
EventID.from_string(event.event_id)
|
||||
|
||||
event_dict = event.get_dict()
|
||||
|
||||
required = [
|
||||
"auth_events",
|
||||
"content",
|
||||
@@ -51,7 +49,7 @@ class EventValidator:
|
||||
]
|
||||
|
||||
for k in required:
|
||||
if k not in event_dict:
|
||||
if not hasattr(event, k):
|
||||
raise SynapseError(400, "Event does not have key %s" % (k,))
|
||||
|
||||
# Check that the following keys have string values
|
||||
|
||||
@@ -73,10 +73,10 @@ class FederationBase:
|
||||
* throws a SynapseError if the signature check failed.
|
||||
The deferreds run their callbacks in the sentinel
|
||||
"""
|
||||
ctx = current_context()
|
||||
|
||||
deferreds = _check_sigs_on_pdus(self.keyring, room_version, pdus)
|
||||
|
||||
ctx = current_context()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def callback(_, pdu: EventBase):
|
||||
with PreserveLoggingContext(ctx):
|
||||
@@ -90,7 +90,9 @@ class FederationBase:
|
||||
# received event was probably a redacted copy (but we then use our
|
||||
# *actual* redacted copy to be on the safe side.)
|
||||
redacted_event = prune_event(pdu)
|
||||
if set(redacted_event.content.keys()) == set(pdu.content.keys()):
|
||||
if set(redacted_event.keys()) == set(pdu.keys()) and set(
|
||||
redacted_event.content.keys()
|
||||
) == set(pdu.content.keys()):
|
||||
logger.info(
|
||||
"Event %s seems to have been redacted; using our redacted "
|
||||
"copy",
|
||||
@@ -135,7 +137,11 @@ class FederationBase:
|
||||
return deferreds
|
||||
|
||||
|
||||
class PduToCheckSig(namedtuple("PduToCheckSig", ["pdu", "sender_domain", "deferreds"])):
|
||||
class PduToCheckSig(
|
||||
namedtuple(
|
||||
"PduToCheckSig", ["pdu", "redacted_pdu_json", "sender_domain", "deferreds"]
|
||||
)
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
@@ -178,6 +184,7 @@ def _check_sigs_on_pdus(
|
||||
pdus_to_check = [
|
||||
PduToCheckSig(
|
||||
pdu=p,
|
||||
redacted_pdu_json=prune_event(p).get_pdu_json(),
|
||||
sender_domain=get_domain_from_id(p.sender),
|
||||
deferreds=[],
|
||||
)
|
||||
@@ -188,12 +195,13 @@ def _check_sigs_on_pdus(
|
||||
# (except if its a 3pid invite, in which case it may be sent by any server)
|
||||
pdus_to_check_sender = [p for p in pdus_to_check if not _is_invite_via_3pid(p.pdu)]
|
||||
|
||||
more_deferreds = keyring.verify_events_for_server(
|
||||
more_deferreds = keyring.verify_json_objects_for_server(
|
||||
[
|
||||
(
|
||||
p.sender_domain,
|
||||
p.pdu,
|
||||
p.redacted_pdu_json,
|
||||
p.pdu.origin_server_ts if room_version.enforce_key_validity else 0,
|
||||
p.pdu.event_id,
|
||||
)
|
||||
for p in pdus_to_check_sender
|
||||
]
|
||||
@@ -222,12 +230,13 @@ def _check_sigs_on_pdus(
|
||||
if p.sender_domain != get_domain_from_id(p.pdu.event_id)
|
||||
]
|
||||
|
||||
more_deferreds = keyring.verify_events_for_server(
|
||||
more_deferreds = keyring.verify_json_objects_for_server(
|
||||
[
|
||||
(
|
||||
get_domain_from_id(p.pdu.event_id),
|
||||
p.pdu,
|
||||
p.redacted_pdu_json,
|
||||
p.pdu.origin_server_ts if room_version.enforce_key_validity else 0,
|
||||
p.pdu.event_id,
|
||||
)
|
||||
for p in pdus_to_check_event_id
|
||||
]
|
||||
|
||||
@@ -33,7 +33,6 @@ from typing import (
|
||||
)
|
||||
|
||||
import attr
|
||||
import ijson
|
||||
from prometheus_client import Counter
|
||||
|
||||
from twisted.internet import defer
|
||||
@@ -56,16 +55,11 @@ from synapse.api.room_versions import (
|
||||
)
|
||||
from synapse.events import EventBase, builder
|
||||
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
|
||||
from synapse.logging.context import (
|
||||
get_thread_resource_usage,
|
||||
make_deferred_yieldable,
|
||||
preserve_fn,
|
||||
)
|
||||
from synapse.logging.context import make_deferred_yieldable, preserve_fn
|
||||
from synapse.logging.utils import log_function
|
||||
from synapse.types import JsonDict, get_domain_from_id
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.iterutils import batch_iter
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -391,6 +385,7 @@ class FederationClient(FederationBase):
|
||||
Returns:
|
||||
A list of PDUs that have valid signatures and hashes.
|
||||
"""
|
||||
deferreds = self._check_sigs_and_hashes(room_version, pdus)
|
||||
|
||||
async def handle_check_result(pdu: EventBase, deferred: Deferred):
|
||||
try:
|
||||
@@ -425,7 +420,6 @@ class FederationClient(FederationBase):
|
||||
return res
|
||||
|
||||
handle = preserve_fn(handle_check_result)
|
||||
deferreds = self._check_sigs_and_hashes(room_version, pdus)
|
||||
deferreds2 = [handle(pdu, deferred) for pdu, deferred in zip(pdus, deferreds)]
|
||||
|
||||
valid_pdus = await make_deferred_yieldable(
|
||||
@@ -673,37 +667,19 @@ class FederationClient(FederationBase):
|
||||
async def send_request(destination) -> Dict[str, Any]:
|
||||
content = await self._do_send_join(destination, pdu)
|
||||
|
||||
# logger.debug("Got content: %s", content.getvalue())
|
||||
logger.debug("Got content: %s", content)
|
||||
|
||||
# logger.info("send_join content: %d", len(content))
|
||||
state = [
|
||||
event_from_pdu_json(p, room_version, outlier=True)
|
||||
for p in content.get("state", [])
|
||||
]
|
||||
|
||||
content.seek(0)
|
||||
auth_chain = [
|
||||
event_from_pdu_json(p, room_version, outlier=True)
|
||||
for p in content.get("auth_chain", [])
|
||||
]
|
||||
|
||||
r = get_thread_resource_usage()
|
||||
logger.info("Memory before state: %s", r.ru_maxrss)
|
||||
|
||||
state = []
|
||||
for i, p in enumerate(ijson.items(content, "state.item")):
|
||||
state.append(event_from_pdu_json(p, room_version, outlier=True))
|
||||
if i % 1000 == 999:
|
||||
await self._clock.sleep(0)
|
||||
|
||||
r = get_thread_resource_usage()
|
||||
logger.info("Memory after state: %s", r.ru_maxrss)
|
||||
|
||||
logger.info("Parsed state: %d", len(state))
|
||||
content.seek(0)
|
||||
|
||||
auth_chain = []
|
||||
for i, p in enumerate(ijson.items(content, "auth_chain.item")):
|
||||
auth_chain.append(event_from_pdu_json(p, room_version, outlier=True))
|
||||
if i % 1000 == 999:
|
||||
await self._clock.sleep(0)
|
||||
|
||||
r = get_thread_resource_usage()
|
||||
logger.info("Memory after: %s", r.ru_maxrss)
|
||||
|
||||
logger.info("Parsed auth chain: %d", len(auth_chain))
|
||||
pdus = {p.event_id: p for p in itertools.chain(state, auth_chain)}
|
||||
|
||||
create_event = None
|
||||
for e in state:
|
||||
@@ -728,19 +704,12 @@ class FederationClient(FederationBase):
|
||||
% (create_room_version,)
|
||||
)
|
||||
|
||||
valid_pdus = []
|
||||
|
||||
for chunk in batch_iter(itertools.chain(state, auth_chain), 1000):
|
||||
logger.info("Handling next _check_sigs_and_hash_and_fetch chunk")
|
||||
new_valid_pdus = await self._check_sigs_and_hash_and_fetch(
|
||||
destination,
|
||||
chunk,
|
||||
outlier=True,
|
||||
room_version=room_version,
|
||||
)
|
||||
valid_pdus.extend(new_valid_pdus)
|
||||
|
||||
logger.info("_check_sigs_and_hash_and_fetch done")
|
||||
valid_pdus = await self._check_sigs_and_hash_and_fetch(
|
||||
destination,
|
||||
list(pdus.values()),
|
||||
outlier=True,
|
||||
room_version=room_version,
|
||||
)
|
||||
|
||||
valid_pdus_map = {p.event_id: p for p in valid_pdus}
|
||||
|
||||
@@ -775,8 +744,6 @@ class FederationClient(FederationBase):
|
||||
% (auth_chain_create_events,)
|
||||
)
|
||||
|
||||
logger.info("Returning from send_join")
|
||||
|
||||
return {
|
||||
"state": signed_state,
|
||||
"auth_chain": signed_auth,
|
||||
@@ -802,8 +769,6 @@ class FederationClient(FederationBase):
|
||||
if not self._is_unknown_endpoint(e):
|
||||
raise
|
||||
|
||||
raise NotImplementedError()
|
||||
|
||||
logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API")
|
||||
|
||||
resp = await self.transport_layer.send_join_v1(
|
||||
|
||||
@@ -44,7 +44,6 @@ from synapse.api.errors import (
|
||||
SynapseError,
|
||||
UnsupportedRoomVersionError,
|
||||
)
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
from synapse.events import EventBase
|
||||
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
|
||||
@@ -865,14 +864,6 @@ class FederationHandlerRegistry:
|
||||
# EDU received.
|
||||
self._edu_type_to_instance = {} # type: Dict[str, List[str]]
|
||||
|
||||
# A rate limiter for incoming room key requests per origin.
|
||||
self._room_key_request_rate_limiter = Ratelimiter(
|
||||
store=hs.get_datastore(),
|
||||
clock=self.clock,
|
||||
rate_hz=self.config.rc_key_requests.per_second,
|
||||
burst_count=self.config.rc_key_requests.burst_count,
|
||||
)
|
||||
|
||||
def register_edu_handler(
|
||||
self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]]
|
||||
) -> None:
|
||||
@@ -926,16 +917,6 @@ class FederationHandlerRegistry:
|
||||
if not self.config.use_presence and edu_type == EduTypes.Presence:
|
||||
return
|
||||
|
||||
# If the incoming room key requests from a particular origin are over
|
||||
# the limit, drop them.
|
||||
if (
|
||||
edu_type == EduTypes.RoomKeyRequest
|
||||
and not await self._room_key_request_rate_limiter.can_do_action(
|
||||
None, origin
|
||||
)
|
||||
):
|
||||
return
|
||||
|
||||
# Check if we have a handler on this instance
|
||||
handler = self.edu_handlers.get(edu_type)
|
||||
if handler:
|
||||
|
||||
@@ -28,6 +28,7 @@ from synapse.api.presence import UserPresenceState
|
||||
from synapse.events import EventBase
|
||||
from synapse.federation.units import Edu
|
||||
from synapse.handlers.presence import format_user_presence_state
|
||||
from synapse.logging import issue9533_logger
|
||||
from synapse.logging.opentracing import SynapseTags, set_tag
|
||||
from synapse.metrics import sent_transactions_counter
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
@@ -574,6 +575,14 @@ class PerDestinationQueue:
|
||||
for content in contents
|
||||
]
|
||||
|
||||
if edus:
|
||||
issue9533_logger.debug(
|
||||
"Sending %i to-device messages to %s, up to stream id %i",
|
||||
len(edus),
|
||||
self._destination,
|
||||
stream_id,
|
||||
)
|
||||
|
||||
return (edus, stream_id)
|
||||
|
||||
def _start_catching_up(self) -> None:
|
||||
|
||||
@@ -244,10 +244,7 @@ class TransportLayerClient:
|
||||
path = _create_v1_path("/send_join/%s/%s", room_id, event_id)
|
||||
|
||||
response = await self.client.put_json(
|
||||
destination=destination,
|
||||
path=path,
|
||||
data=content,
|
||||
return_string_io=True,
|
||||
destination=destination, path=path, data=content
|
||||
)
|
||||
|
||||
return response
|
||||
@@ -257,10 +254,7 @@ class TransportLayerClient:
|
||||
path = _create_v2_path("/send_join/%s/%s", room_id, event_id)
|
||||
|
||||
response = await self.client.put_json(
|
||||
destination=destination,
|
||||
path=path,
|
||||
data=content,
|
||||
return_string_io=True,
|
||||
destination=destination, path=path, data=content
|
||||
)
|
||||
|
||||
return response
|
||||
@@ -1001,6 +995,7 @@ class TransportLayerClient:
|
||||
returned per space
|
||||
exclude_rooms: a list of any rooms we can skip
|
||||
"""
|
||||
# TODO When switching to the stable endpoint, use GET instead of POST.
|
||||
path = _create_path(
|
||||
FEDERATION_UNSTABLE_PREFIX, "/org.matrix.msc2946/spaces/%s", room_id
|
||||
)
|
||||
|
||||
@@ -1376,6 +1376,32 @@ class FederationSpaceSummaryServlet(BaseFederationServlet):
|
||||
PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc2946"
|
||||
PATH = "/spaces/(?P<room_id>[^/]*)"
|
||||
|
||||
async def on_GET(
|
||||
self,
|
||||
origin: str,
|
||||
content: JsonDict,
|
||||
query: Mapping[bytes, Sequence[bytes]],
|
||||
room_id: str,
|
||||
) -> Tuple[int, JsonDict]:
|
||||
suggested_only = parse_boolean_from_args(query, "suggested_only", default=False)
|
||||
max_rooms_per_space = parse_integer_from_args(query, "max_rooms_per_space")
|
||||
|
||||
exclude_rooms = []
|
||||
if b"exclude_rooms" in query:
|
||||
try:
|
||||
exclude_rooms = [
|
||||
room_id.decode("ascii") for room_id in query[b"exclude_rooms"]
|
||||
]
|
||||
except Exception:
|
||||
raise SynapseError(
|
||||
400, "Bad query parameter for exclude_rooms", Codes.INVALID_PARAM
|
||||
)
|
||||
|
||||
return 200, await self.handler.federation_space_summary(
|
||||
room_id, suggested_only, max_rooms_per_space, exclude_rooms
|
||||
)
|
||||
|
||||
# TODO When switching to the stable endpoint, remove the POST handler.
|
||||
async def on_POST(
|
||||
self,
|
||||
origin: str,
|
||||
|
||||
@@ -17,6 +17,7 @@ import logging
|
||||
import time
|
||||
import unicodedata
|
||||
import urllib.parse
|
||||
from binascii import crc32
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
@@ -34,6 +35,7 @@ from typing import (
|
||||
import attr
|
||||
import bcrypt
|
||||
import pymacaroons
|
||||
import unpaddedbase64
|
||||
|
||||
from twisted.web.server import Request
|
||||
|
||||
@@ -66,6 +68,7 @@ from synapse.util import stringutils as stringutils
|
||||
from synapse.util.async_helpers import maybe_awaitable
|
||||
from synapse.util.macaroons import get_value_from_macaroon, satisfy_expiry
|
||||
from synapse.util.msisdn import phone_number_to_msisdn
|
||||
from synapse.util.stringutils import base62_encode
|
||||
from synapse.util.threepids import canonicalise_email
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -808,10 +811,12 @@ class AuthHandler(BaseHandler):
|
||||
logger.info(
|
||||
"Logging in user %s as %s%s", user_id, puppets_user_id, fmt_expiry
|
||||
)
|
||||
target_user_id_obj = UserID.from_string(puppets_user_id)
|
||||
else:
|
||||
logger.info(
|
||||
"Logging in user %s on device %s%s", user_id, device_id, fmt_expiry
|
||||
)
|
||||
target_user_id_obj = UserID.from_string(user_id)
|
||||
|
||||
if (
|
||||
not is_appservice_ghost
|
||||
@@ -819,7 +824,7 @@ class AuthHandler(BaseHandler):
|
||||
):
|
||||
await self.auth.check_auth_blocking(user_id)
|
||||
|
||||
access_token = self.macaroon_gen.generate_access_token(user_id)
|
||||
access_token = self.generate_access_token(target_user_id_obj)
|
||||
await self.store.add_access_token_to_user(
|
||||
user_id=user_id,
|
||||
token=access_token,
|
||||
@@ -1192,6 +1197,19 @@ class AuthHandler(BaseHandler):
|
||||
return None
|
||||
return user_id
|
||||
|
||||
def generate_access_token(self, for_user: UserID) -> str:
|
||||
"""Generates an opaque string, for use as an access token"""
|
||||
|
||||
# we use the following format for access tokens:
|
||||
# syt_<base64 local part>_<random string>_<base62 crc check>
|
||||
|
||||
b64local = unpaddedbase64.encode_base64(for_user.localpart.encode("utf-8"))
|
||||
random_string = stringutils.random_string(20)
|
||||
base = f"syt_{b64local}_{random_string}"
|
||||
|
||||
crc = base62_encode(crc32(base.encode("ascii")), minwidth=6)
|
||||
return f"{base}_{crc}"
|
||||
|
||||
async def validate_short_term_login_token(
|
||||
self, login_token: str
|
||||
) -> LoginTokenAttributes:
|
||||
@@ -1585,10 +1603,7 @@ class MacaroonGenerator:
|
||||
|
||||
hs = attr.ib()
|
||||
|
||||
def generate_access_token(
|
||||
self, user_id: str, extra_caveats: Optional[List[str]] = None
|
||||
) -> str:
|
||||
extra_caveats = extra_caveats or []
|
||||
def generate_guest_access_token(self, user_id: str) -> str:
|
||||
macaroon = self._generate_base_macaroon(user_id)
|
||||
macaroon.add_first_party_caveat("type = access")
|
||||
# Include a nonce, to make sure that each login gets a different
|
||||
@@ -1596,8 +1611,7 @@ class MacaroonGenerator:
|
||||
macaroon.add_first_party_caveat(
|
||||
"nonce = %s" % (stringutils.random_string_with_symbols(16),)
|
||||
)
|
||||
for caveat in extra_caveats:
|
||||
macaroon.add_first_party_caveat(caveat)
|
||||
macaroon.add_first_party_caveat("guest = true")
|
||||
return macaroon.serialize()
|
||||
|
||||
def generate_short_term_login_token(
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, Dict
|
||||
|
||||
from synapse.api.constants import EduTypes
|
||||
from synapse.api.constants import ToDeviceEventTypes
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.logging.context import run_in_background
|
||||
@@ -79,6 +79,8 @@ class DeviceMessageHandler:
|
||||
ReplicationUserDevicesResyncRestServlet.make_client(hs)
|
||||
)
|
||||
|
||||
# a rate limiter for room key requests. The keys are
|
||||
# (sending_user_id, sending_device_id).
|
||||
self._ratelimiter = Ratelimiter(
|
||||
store=self.store,
|
||||
clock=hs.get_clock(),
|
||||
@@ -100,12 +102,25 @@ class DeviceMessageHandler:
|
||||
for user_id, by_device in content["messages"].items():
|
||||
# we use UserID.from_string to catch invalid user ids
|
||||
if not self.is_mine(UserID.from_string(user_id)):
|
||||
logger.warning("Request for keys for non-local user %s", user_id)
|
||||
logger.warning("To-device message to non-local user %s", user_id)
|
||||
raise SynapseError(400, "Not a user here")
|
||||
|
||||
if not by_device:
|
||||
continue
|
||||
|
||||
# Ratelimit key requests by the sending user.
|
||||
if message_type == ToDeviceEventTypes.RoomKeyRequest:
|
||||
allowed, _ = await self._ratelimiter.can_do_action(
|
||||
None, (sender_user_id, None)
|
||||
)
|
||||
if not allowed:
|
||||
logger.info(
|
||||
"Dropping room_key_request from %s to %s due to rate limit",
|
||||
sender_user_id,
|
||||
user_id,
|
||||
)
|
||||
continue
|
||||
|
||||
messages_by_device = {
|
||||
device_id: {
|
||||
"content": message_content,
|
||||
@@ -192,13 +207,19 @@ class DeviceMessageHandler:
|
||||
for user_id, by_device in messages.items():
|
||||
# Ratelimit local cross-user key requests by the sending device.
|
||||
if (
|
||||
message_type == EduTypes.RoomKeyRequest
|
||||
message_type == ToDeviceEventTypes.RoomKeyRequest
|
||||
and user_id != sender_user_id
|
||||
and await self._ratelimiter.can_do_action(
|
||||
):
|
||||
allowed, _ = await self._ratelimiter.can_do_action(
|
||||
requester, (sender_user_id, requester.device_id)
|
||||
)
|
||||
):
|
||||
continue
|
||||
if not allowed:
|
||||
logger.info(
|
||||
"Dropping room_key_request from %s to %s due to rate limit",
|
||||
sender_user_id,
|
||||
user_id,
|
||||
)
|
||||
continue
|
||||
|
||||
# we use UserID.from_string to catch invalid user ids
|
||||
if self.is_mine(UserID.from_string(user_id)):
|
||||
|
||||
@@ -553,7 +553,11 @@ class FederationHandler(BaseHandler):
|
||||
room_id: str,
|
||||
event_id: str,
|
||||
) -> List[EventBase]:
|
||||
"""Requests all of the room state at a given event from a remote homeserver.
|
||||
"""Requests all of the room state at a given event from a remote
|
||||
homeserver.
|
||||
|
||||
Will also fetch any missing events reported in the `auth_chain_ids`
|
||||
section of `/state_ids`.
|
||||
|
||||
Args:
|
||||
destination: The remote homeserver to query for the state.
|
||||
@@ -561,8 +565,7 @@ class FederationHandler(BaseHandler):
|
||||
event_id: The id of the event we want the state at.
|
||||
|
||||
Returns:
|
||||
A list of events in the state, not including the event itself, and
|
||||
a list of events in the auth chain for the given event.
|
||||
A list of events in the state, not including the event itself.
|
||||
"""
|
||||
(
|
||||
state_event_ids,
|
||||
@@ -571,24 +574,53 @@ class FederationHandler(BaseHandler):
|
||||
destination, room_id, event_id=event_id
|
||||
)
|
||||
|
||||
desired_events = set(state_event_ids + auth_event_ids)
|
||||
# Fetch the state events from the DB, and check we have the auth events.
|
||||
event_map = await self.store.get_events(state_event_ids, allow_rejected=True)
|
||||
auth_events_in_store = await self.store.have_seen_events(auth_event_ids)
|
||||
|
||||
failed_to_fetch = await self._get_events_from_store_or_dest(
|
||||
destination, room_id, desired_events
|
||||
)
|
||||
|
||||
if failed_to_fetch:
|
||||
logger.warning(
|
||||
"Failed to fetch missing state/auth events for %s %s",
|
||||
event_id,
|
||||
failed_to_fetch,
|
||||
# Check for missing events. We handle state and auth event seperately,
|
||||
# as we want to pull the state from the DB, but we don't for the auth
|
||||
# events. (Note: we likely won't use the majority of the auth chain, and
|
||||
# it can be *huge* for large rooms, so it's worth ensuring that we don't
|
||||
# unnecessarily pull it from the DB).
|
||||
missing_state_events = set(state_event_ids) - set(event_map)
|
||||
missing_auth_events = set(auth_event_ids) - set(auth_events_in_store)
|
||||
if missing_state_events or missing_auth_events:
|
||||
await self._get_events_and_persist(
|
||||
destination=destination,
|
||||
room_id=room_id,
|
||||
events=missing_state_events | missing_auth_events,
|
||||
)
|
||||
|
||||
event_map = await self.store.get_events(state_event_ids, allow_rejected=True)
|
||||
if missing_state_events:
|
||||
new_events = await self.store.get_events(
|
||||
missing_state_events, allow_rejected=True
|
||||
)
|
||||
event_map.update(new_events)
|
||||
|
||||
remote_state = [
|
||||
event_map[e_id] for e_id in state_event_ids if e_id in event_map
|
||||
]
|
||||
missing_state_events.difference_update(new_events)
|
||||
|
||||
if missing_state_events:
|
||||
logger.warning(
|
||||
"Failed to fetch missing state events for %s %s",
|
||||
event_id,
|
||||
missing_state_events,
|
||||
)
|
||||
|
||||
if missing_auth_events:
|
||||
auth_events_in_store = await self.store.have_seen_events(
|
||||
missing_auth_events
|
||||
)
|
||||
missing_auth_events.difference_update(auth_events_in_store)
|
||||
|
||||
if missing_auth_events:
|
||||
logger.warning(
|
||||
"Failed to fetch missing auth events for %s %s",
|
||||
event_id,
|
||||
missing_auth_events,
|
||||
)
|
||||
|
||||
remote_state = list(event_map.values())
|
||||
|
||||
# check for events which were in the wrong room.
|
||||
#
|
||||
@@ -596,8 +628,8 @@ class FederationHandler(BaseHandler):
|
||||
# auth_events at an event in room A are actually events in room B
|
||||
|
||||
bad_events = [
|
||||
(event_id, event.room_id)
|
||||
for idx, event in enumerate(remote_state)
|
||||
(event.event_id, event.room_id)
|
||||
for event in remote_state
|
||||
if event.room_id != room_id
|
||||
]
|
||||
|
||||
@@ -619,45 +651,6 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
return remote_state
|
||||
|
||||
async def _get_events_from_store_or_dest(
|
||||
self, destination: str, room_id: str, event_ids: Iterable[str]
|
||||
) -> Set[str]:
|
||||
"""Fetch events from a remote destination, checking if we already have them.
|
||||
|
||||
Persists any events we don't already have as outliers.
|
||||
|
||||
If we fail to fetch any of the events, a warning will be logged, and the event
|
||||
will be omitted from the result. Likewise, any events which turn out not to
|
||||
be in the given room.
|
||||
|
||||
This function *does not* automatically get missing auth events of the
|
||||
newly fetched events. Callers must include the full auth chain of
|
||||
of the missing events in the `event_ids` argument, to ensure that any
|
||||
missing auth events are correctly fetched.
|
||||
|
||||
Returns:
|
||||
map from event_id to event
|
||||
"""
|
||||
have_events = await self.store.have_seen_events(event_ids)
|
||||
|
||||
missing_events = set(event_ids) - have_events
|
||||
|
||||
if not missing_events:
|
||||
return set()
|
||||
|
||||
logger.debug(
|
||||
"Fetching unknown state/auth events %s for room %s",
|
||||
missing_events,
|
||||
room_id,
|
||||
)
|
||||
|
||||
await self._get_events_and_persist(
|
||||
destination=destination, room_id=room_id, events=missing_events
|
||||
)
|
||||
|
||||
new_events = await self.store.have_seen_events(missing_events)
|
||||
return missing_events - new_events
|
||||
|
||||
async def _get_state_after_missing_prev_event(
|
||||
self,
|
||||
destination: str,
|
||||
@@ -1444,7 +1437,7 @@ class FederationHandler(BaseHandler):
|
||||
# room stuff after join currently doesn't work on workers.
|
||||
assert self.config.worker.worker_app is None
|
||||
|
||||
logger.info("Joining %s to %s", joinee, room_id)
|
||||
logger.debug("Joining %s to %s", joinee, room_id)
|
||||
|
||||
origin, event, room_version_obj = await self._make_and_verify_event(
|
||||
target_hosts,
|
||||
@@ -1455,8 +1448,6 @@ class FederationHandler(BaseHandler):
|
||||
params={"ver": KNOWN_ROOM_VERSIONS},
|
||||
)
|
||||
|
||||
logger.info("make_join done from %s", origin)
|
||||
|
||||
# This shouldn't happen, because the RoomMemberHandler has a
|
||||
# linearizer lock which only allows one operation per user per room
|
||||
# at a time - so this is just paranoia.
|
||||
@@ -1476,13 +1467,10 @@ class FederationHandler(BaseHandler):
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
logger.info("Sending join")
|
||||
ret = await self.federation_client.send_join(
|
||||
host_list, event, room_version_obj
|
||||
)
|
||||
|
||||
logger.info("send join done")
|
||||
|
||||
origin = ret["origin"]
|
||||
state = ret["state"]
|
||||
auth_chain = ret["auth_chain"]
|
||||
@@ -1507,14 +1495,10 @@ class FederationHandler(BaseHandler):
|
||||
room_version=room_version_obj,
|
||||
)
|
||||
|
||||
logger.info("Persisting auth true")
|
||||
|
||||
max_stream_id = await self._persist_auth_tree(
|
||||
origin, room_id, auth_chain, state, event, room_version_obj
|
||||
)
|
||||
|
||||
logger.info("Persisted auth true")
|
||||
|
||||
# We wait here until this instance has seen the events come down
|
||||
# replication (if we're using replication) as the below uses caches.
|
||||
await self._replication.wait_for_stream_position(
|
||||
@@ -2167,8 +2151,6 @@ class FederationHandler(BaseHandler):
|
||||
ctx = await self.state_handler.compute_event_context(e)
|
||||
events_to_context[e.event_id] = ctx
|
||||
|
||||
logger.info("Computed contexts")
|
||||
|
||||
event_map = {
|
||||
e.event_id: e for e in itertools.chain(auth_events, state, [event])
|
||||
}
|
||||
@@ -2210,8 +2192,6 @@ class FederationHandler(BaseHandler):
|
||||
else:
|
||||
logger.info("Failed to find auth event %r", e_id)
|
||||
|
||||
logger.info("Got missing events")
|
||||
|
||||
for e in itertools.chain(auth_events, state, [event]):
|
||||
auth_for_e = {
|
||||
(event_map[e_id].type, event_map[e_id].state_key): event_map[e_id]
|
||||
@@ -2236,8 +2216,6 @@ class FederationHandler(BaseHandler):
|
||||
raise
|
||||
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
|
||||
|
||||
logger.info("Authed events")
|
||||
|
||||
await self.persist_events_and_notify(
|
||||
room_id,
|
||||
[
|
||||
@@ -2246,14 +2224,10 @@ class FederationHandler(BaseHandler):
|
||||
],
|
||||
)
|
||||
|
||||
logger.info("Persisted events")
|
||||
|
||||
new_event_context = await self.state_handler.compute_event_context(
|
||||
event, old_state=state
|
||||
)
|
||||
|
||||
logger.info("Computed context")
|
||||
|
||||
return await self.persist_events_and_notify(
|
||||
room_id, [(event, new_event_context)]
|
||||
)
|
||||
@@ -2457,7 +2431,9 @@ class FederationHandler(BaseHandler):
|
||||
# If we are going to send this event over federation we precaclculate
|
||||
# the joined hosts.
|
||||
if event.internal_metadata.get_send_on_behalf_of():
|
||||
await self.event_creation_handler.cache_joined_hosts_for_event(event)
|
||||
await self.event_creation_handler.cache_joined_hosts_for_event(
|
||||
event, context
|
||||
)
|
||||
|
||||
return context
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple
|
||||
|
||||
from canonicaljson import encode_canonical_json
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.interfaces import IDelayedCall
|
||||
|
||||
from synapse import event_auth
|
||||
@@ -43,14 +44,15 @@ from synapse.events import EventBase
|
||||
from synapse.events.builder import EventBuilder
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.events.validator import EventValidator
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
|
||||
from synapse.util import json_decoder, json_encoder
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util import json_decoder, json_encoder, log_failure
|
||||
from synapse.util.async_helpers import Linearizer, unwrapFirstError
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
@@ -457,6 +459,19 @@ class EventCreationHandler:
|
||||
|
||||
self._external_cache = hs.get_external_cache()
|
||||
|
||||
# Stores the state groups we've recently added to the joined hosts
|
||||
# external cache. Note that the timeout must be significantly less than
|
||||
# the TTL on the external cache.
|
||||
self._external_cache_joined_hosts_updates = (
|
||||
None
|
||||
) # type: Optional[ExpiringCache]
|
||||
if self._external_cache.is_enabled():
|
||||
self._external_cache_joined_hosts_updates = ExpiringCache(
|
||||
"_external_cache_joined_hosts_updates",
|
||||
self.clock,
|
||||
expiry_ms=30 * 60 * 1000,
|
||||
)
|
||||
|
||||
async def create_event(
|
||||
self,
|
||||
requester: Requester,
|
||||
@@ -965,9 +980,43 @@ class EventCreationHandler:
|
||||
logger.exception("Failed to encode content: %r", event.content)
|
||||
raise
|
||||
|
||||
await self.action_generator.handle_push_actions_for_event(event, context)
|
||||
# We now persist the event (and update the cache in parallel, since we
|
||||
# don't want to block on it).
|
||||
result = await make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
run_in_background(
|
||||
self._persist_event,
|
||||
requester=requester,
|
||||
event=event,
|
||||
context=context,
|
||||
ratelimit=ratelimit,
|
||||
extra_users=extra_users,
|
||||
),
|
||||
run_in_background(
|
||||
self.cache_joined_hosts_for_event, event, context
|
||||
).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
|
||||
],
|
||||
consumeErrors=True,
|
||||
)
|
||||
).addErrback(unwrapFirstError)
|
||||
|
||||
await self.cache_joined_hosts_for_event(event)
|
||||
return result[0]
|
||||
|
||||
async def _persist_event(
|
||||
self,
|
||||
requester: Requester,
|
||||
event: EventBase,
|
||||
context: EventContext,
|
||||
ratelimit: bool = True,
|
||||
extra_users: Optional[List[UserID]] = None,
|
||||
) -> EventBase:
|
||||
"""Actually persists the event. Should only be called by
|
||||
`handle_new_client_event`, and see its docstring for documentation of
|
||||
the arguments.
|
||||
"""
|
||||
|
||||
await self.action_generator.handle_push_actions_for_event(event, context)
|
||||
|
||||
try:
|
||||
# If we're a worker we need to hit out to the master.
|
||||
@@ -1008,7 +1057,9 @@ class EventCreationHandler:
|
||||
await self.store.remove_push_actions_from_staging(event.event_id)
|
||||
raise
|
||||
|
||||
async def cache_joined_hosts_for_event(self, event: EventBase) -> None:
|
||||
async def cache_joined_hosts_for_event(
|
||||
self, event: EventBase, context: EventContext
|
||||
) -> None:
|
||||
"""Precalculate the joined hosts at the event, when using Redis, so that
|
||||
external federation senders don't have to recalculate it themselves.
|
||||
"""
|
||||
@@ -1016,6 +1067,9 @@ class EventCreationHandler:
|
||||
if not self._external_cache.is_enabled():
|
||||
return
|
||||
|
||||
# If external cache is enabled we should always have this.
|
||||
assert self._external_cache_joined_hosts_updates is not None
|
||||
|
||||
# We actually store two mappings, event ID -> prev state group,
|
||||
# state group -> joined hosts, which is much more space efficient
|
||||
# than event ID -> joined hosts.
|
||||
@@ -1023,22 +1077,28 @@ class EventCreationHandler:
|
||||
# Note: We have to cache event ID -> prev state group, as we don't
|
||||
# store that in the DB.
|
||||
#
|
||||
# Note: We always set the state group -> joined hosts cache, even if
|
||||
# we already set it, so that the expiry time is reset.
|
||||
# Note: We set the state group -> joined hosts cache if it hasn't been
|
||||
# set for a while, so that the expiry time is reset.
|
||||
|
||||
state_entry = await self.state.resolve_state_groups_for_events(
|
||||
event.room_id, event_ids=event.prev_event_ids()
|
||||
)
|
||||
|
||||
if state_entry.state_group:
|
||||
joined_hosts = await self.store.get_joined_hosts(event.room_id, state_entry)
|
||||
|
||||
await self._external_cache.set(
|
||||
"event_to_prev_state_group",
|
||||
event.event_id,
|
||||
state_entry.state_group,
|
||||
expiry_ms=60 * 60 * 1000,
|
||||
)
|
||||
|
||||
if state_entry.state_group in self._external_cache_joined_hosts_updates:
|
||||
return
|
||||
|
||||
joined_hosts = await self.store.get_joined_hosts(event.room_id, state_entry)
|
||||
|
||||
# Note that the expiry times must be larger than the expiry time in
|
||||
# _external_cache_joined_hosts_updates.
|
||||
await self._external_cache.set(
|
||||
"get_joined_hosts",
|
||||
str(state_entry.state_group),
|
||||
@@ -1046,6 +1106,8 @@ class EventCreationHandler:
|
||||
expiry_ms=60 * 60 * 1000,
|
||||
)
|
||||
|
||||
self._external_cache_joined_hosts_updates[state_entry.state_group] = None
|
||||
|
||||
async def _validate_canonical_alias(
|
||||
self, directory_handler, room_alias_str: str, expected_room_id: str
|
||||
) -> None:
|
||||
@@ -1108,7 +1170,7 @@ class EventCreationHandler:
|
||||
# it's not a self-redaction (to avoid having to look up whether the
|
||||
# user is actually admin or not).
|
||||
is_admin_redaction = False
|
||||
if event.type == EventTypes.Redaction and event.redacts:
|
||||
if event.type == EventTypes.Redaction:
|
||||
original_event = await self.store.get_event(
|
||||
event.redacts,
|
||||
redact_behaviour=EventRedactBehaviour.AS_IS,
|
||||
@@ -1195,7 +1257,7 @@ class EventCreationHandler:
|
||||
# TODO: Make sure the signatures actually are correct.
|
||||
event.signatures.update(returned_invite.signatures)
|
||||
|
||||
if event.type == EventTypes.Redaction and event.redacts:
|
||||
if event.type == EventTypes.Redaction:
|
||||
original_event = await self.store.get_event(
|
||||
event.redacts,
|
||||
redact_behaviour=EventRedactBehaviour.AS_IS,
|
||||
@@ -1401,7 +1463,7 @@ class EventCreationHandler:
|
||||
]
|
||||
|
||||
for k in immutable_fields:
|
||||
if getattr(builder, k, None) != getattr(original_event, k, None):
|
||||
if getattr(builder, k, None) != original_event.get(k):
|
||||
raise Exception(
|
||||
"Third party rules module created an invalid event: "
|
||||
"cannot change field " + k
|
||||
|
||||
@@ -722,9 +722,7 @@ class RegistrationHandler(BaseHandler):
|
||||
)
|
||||
if is_guest:
|
||||
assert valid_until_ms is None
|
||||
access_token = self.macaroon_gen.generate_access_token(
|
||||
user_id, ["guest = true"]
|
||||
)
|
||||
access_token = self.macaroon_gen.generate_guest_access_token(user_id)
|
||||
else:
|
||||
access_token = await self._auth_handler.get_access_token_for_user_id(
|
||||
user_id,
|
||||
|
||||
@@ -32,7 +32,14 @@ from synapse.api.constants import (
|
||||
RoomCreationPreset,
|
||||
RoomEncryptionAlgorithms,
|
||||
)
|
||||
from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
|
||||
from synapse.api.errors import (
|
||||
AuthError,
|
||||
Codes,
|
||||
LimitExceededError,
|
||||
NotFoundError,
|
||||
StoreError,
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.api.filtering import Filter
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
|
||||
from synapse.events import EventBase
|
||||
@@ -126,10 +133,6 @@ class RoomCreationHandler(BaseHandler):
|
||||
|
||||
self.third_party_event_rules = hs.get_third_party_event_rules()
|
||||
|
||||
self._invite_burst_count = (
|
||||
hs.config.ratelimiting.rc_invites_per_room.burst_count
|
||||
)
|
||||
|
||||
async def upgrade_room(
|
||||
self, requester: Requester, old_room_id: str, new_version: RoomVersion
|
||||
) -> str:
|
||||
@@ -475,7 +478,7 @@ class RoomCreationHandler(BaseHandler):
|
||||
):
|
||||
await self.room_member_handler.update_membership(
|
||||
requester,
|
||||
UserID.from_string(old_event.state_key),
|
||||
UserID.from_string(old_event["state_key"]),
|
||||
new_room_id,
|
||||
"ban",
|
||||
ratelimit=False,
|
||||
@@ -676,8 +679,18 @@ class RoomCreationHandler(BaseHandler):
|
||||
invite_3pid_list = []
|
||||
invite_list = []
|
||||
|
||||
if len(invite_list) + len(invite_3pid_list) > self._invite_burst_count:
|
||||
raise SynapseError(400, "Cannot invite so many users at once")
|
||||
if invite_list or invite_3pid_list:
|
||||
try:
|
||||
# If there are invites in the request, see if the ratelimiting settings
|
||||
# allow that number of invites to be sent from the current user.
|
||||
await self.room_member_handler.ratelimit_multiple_invites(
|
||||
requester,
|
||||
room_id=None,
|
||||
n_invites=len(invite_list) + len(invite_3pid_list),
|
||||
update=False,
|
||||
)
|
||||
except LimitExceededError:
|
||||
raise SynapseError(400, "Cannot invite so many users at once")
|
||||
|
||||
await self.event_creation_handler.assert_accepted_privacy_policy(requester)
|
||||
|
||||
|
||||
@@ -163,6 +163,31 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
async def forget(self, user: UserID, room_id: str) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
async def ratelimit_multiple_invites(
|
||||
self,
|
||||
requester: Optional[Requester],
|
||||
room_id: Optional[str],
|
||||
n_invites: int,
|
||||
update: bool = True,
|
||||
):
|
||||
"""Ratelimit more than one invite sent by the given requester in the given room.
|
||||
|
||||
Args:
|
||||
requester: The requester sending the invites.
|
||||
room_id: The room the invites are being sent in.
|
||||
n_invites: The amount of invites to ratelimit for.
|
||||
update: Whether to update the ratelimiter's cache.
|
||||
|
||||
Raises:
|
||||
LimitExceededError: The requester can't send that many invites in the room.
|
||||
"""
|
||||
await self._invites_per_room_limiter.ratelimit(
|
||||
requester,
|
||||
room_id,
|
||||
update=update,
|
||||
n_actions=n_invites,
|
||||
)
|
||||
|
||||
async def ratelimit_invite(
|
||||
self,
|
||||
requester: Optional[Requester],
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
import re
|
||||
from collections import deque
|
||||
from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence, Set, Tuple, cast
|
||||
|
||||
@@ -226,6 +227,23 @@ class SpaceSummaryHandler:
|
||||
suggested_only: bool,
|
||||
max_children: Optional[int],
|
||||
) -> Tuple[Sequence[JsonDict], Sequence[JsonDict]]:
|
||||
"""
|
||||
Generate a room entry and a list of event entries for a given room.
|
||||
|
||||
Args:
|
||||
requester: The requesting user, or None if this is over federation.
|
||||
room_id: The room ID to summarize.
|
||||
suggested_only: True if only suggested children should be returned.
|
||||
Otherwise, all children are returned.
|
||||
max_children: The maximum number of children to return for this node.
|
||||
|
||||
Returns:
|
||||
A tuple of:
|
||||
An iterable of a single value of the room.
|
||||
|
||||
An iterable of the sorted children events. This may be limited
|
||||
to a maximum size or may include all children.
|
||||
"""
|
||||
if not await self._is_room_accessible(room_id, requester):
|
||||
return (), ()
|
||||
|
||||
@@ -288,6 +306,7 @@ class SpaceSummaryHandler:
|
||||
ev.data
|
||||
for ev in res.events
|
||||
if ev.event_type == EventTypes.MSC1772_SPACE_CHILD
|
||||
or ev.event_type == EventTypes.SpaceChild
|
||||
)
|
||||
|
||||
async def _is_room_accessible(self, room_id: str, requester: Optional[str]) -> bool:
|
||||
@@ -331,7 +350,9 @@ class SpaceSummaryHandler:
|
||||
)
|
||||
|
||||
# TODO: update once MSC1772 lands
|
||||
room_type = create_event.content.get(EventContentFields.MSC1772_ROOM_TYPE)
|
||||
room_type = create_event.content.get(EventContentFields.ROOM_TYPE)
|
||||
if not room_type:
|
||||
room_type = create_event.content.get(EventContentFields.MSC1772_ROOM_TYPE)
|
||||
|
||||
entry = {
|
||||
"room_id": stats["room_id"],
|
||||
@@ -344,6 +365,7 @@ class SpaceSummaryHandler:
|
||||
stats["history_visibility"] == HistoryVisibility.WORLD_READABLE
|
||||
),
|
||||
"guest_can_join": stats["guest_access"] == "can_join",
|
||||
"creation_ts": create_event.origin_server_ts,
|
||||
"room_type": room_type,
|
||||
}
|
||||
|
||||
@@ -353,6 +375,18 @@ class SpaceSummaryHandler:
|
||||
return room_entry
|
||||
|
||||
async def _get_child_events(self, room_id: str) -> Iterable[EventBase]:
|
||||
"""
|
||||
Get the child events for a given room.
|
||||
|
||||
The returned results are sorted for stability.
|
||||
|
||||
Args:
|
||||
room_id: The room id to get the children of.
|
||||
|
||||
Returns:
|
||||
An iterable of sorted child events.
|
||||
"""
|
||||
|
||||
# look for child rooms/spaces.
|
||||
current_state_ids = await self._store.get_current_state_ids(room_id)
|
||||
|
||||
@@ -360,13 +394,15 @@ class SpaceSummaryHandler:
|
||||
[
|
||||
event_id
|
||||
for key, event_id in current_state_ids.items()
|
||||
# TODO: update once MSC1772 lands
|
||||
# TODO: update once MSC1772 has been FCP for a period of time.
|
||||
if key[0] == EventTypes.MSC1772_SPACE_CHILD
|
||||
or key[0] == EventTypes.SpaceChild
|
||||
]
|
||||
)
|
||||
|
||||
# filter out any events without a "via" (which implies it has been redacted)
|
||||
return (e for e in events if _has_valid_via(e))
|
||||
# filter out any events without a "via" (which implies it has been redacted),
|
||||
# and order to ensure we return stable results.
|
||||
return sorted(filter(_has_valid_via, events), key=_child_events_comparison_key)
|
||||
|
||||
|
||||
@attr.s(frozen=True, slots=True)
|
||||
@@ -392,3 +428,39 @@ def _is_suggested_child_event(edge_event: EventBase) -> bool:
|
||||
return True
|
||||
logger.debug("Ignorning not-suggested child %s", edge_event.state_key)
|
||||
return False
|
||||
|
||||
|
||||
# Order may only contain characters in the range of \x20 (space) to \x7F (~).
|
||||
_INVALID_ORDER_CHARS_RE = re.compile(r"[^\x20-\x7F]")
|
||||
|
||||
|
||||
def _child_events_comparison_key(child: EventBase) -> Tuple[bool, Optional[str], str]:
|
||||
"""
|
||||
Generate a value for comparing two child events for ordering.
|
||||
|
||||
The rules for ordering are supposed to be:
|
||||
|
||||
1. The 'order' key, if it is valid.
|
||||
2. The 'origin_server_ts' of the 'm.room.create' event.
|
||||
3. The 'room_id'.
|
||||
|
||||
But we skip step 2 since we may not have any state from the room.
|
||||
|
||||
Args:
|
||||
child: The event for generating a comparison key.
|
||||
|
||||
Returns:
|
||||
The comparison key as a tuple of:
|
||||
False if the ordering is valid.
|
||||
The ordering field.
|
||||
The room ID.
|
||||
"""
|
||||
order = child.content.get("order")
|
||||
# If order is not a string or doesn't meet the requirements, ignore it.
|
||||
if not isinstance(order, str):
|
||||
order = None
|
||||
elif len(order) > 50 or _INVALID_ORDER_CHARS_RE.search(order):
|
||||
order = None
|
||||
|
||||
# Items without an order come last.
|
||||
return (order is None, order, child.room_id)
|
||||
|
||||
@@ -154,7 +154,6 @@ async def _handle_json_response(
|
||||
request: MatrixFederationRequest,
|
||||
response: IResponse,
|
||||
start_ms: int,
|
||||
return_string_io=False,
|
||||
) -> JsonDict:
|
||||
"""
|
||||
Reads the JSON body of a response, with a timeout
|
||||
@@ -176,12 +175,12 @@ async def _handle_json_response(
|
||||
d = read_body_with_max_size(response, BinaryIOWrapper(buf), MAX_RESPONSE_SIZE)
|
||||
d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
|
||||
|
||||
await make_deferred_yieldable(d)
|
||||
def parse(_len: int):
|
||||
return json_decoder.decode(buf.getvalue())
|
||||
|
||||
if return_string_io:
|
||||
body = buf
|
||||
else:
|
||||
body = json_decoder.decode(buf.getvalue())
|
||||
d.addCallback(parse)
|
||||
|
||||
body = await make_deferred_yieldable(d)
|
||||
except BodyExceededMaxSize as e:
|
||||
# The response was too big.
|
||||
logger.warning(
|
||||
@@ -226,13 +225,12 @@ async def _handle_json_response(
|
||||
time_taken_secs = reactor.seconds() - start_ms / 1000
|
||||
|
||||
logger.info(
|
||||
"{%s} [%s] Completed request: %d %s in %.2f secs got %dB - %s %s",
|
||||
"{%s} [%s] Completed request: %d %s in %.2f secs - %s %s",
|
||||
request.txn_id,
|
||||
request.destination,
|
||||
response.code,
|
||||
response.phrase.decode("ascii", errors="replace"),
|
||||
time_taken_secs,
|
||||
len(buf.getvalue()),
|
||||
request.method,
|
||||
request.uri.decode("ascii"),
|
||||
)
|
||||
@@ -685,7 +683,6 @@ class MatrixFederationHttpClient:
|
||||
ignore_backoff: bool = False,
|
||||
backoff_on_404: bool = False,
|
||||
try_trailing_slash_on_400: bool = False,
|
||||
return_string_io=False,
|
||||
) -> Union[JsonDict, list]:
|
||||
"""Sends the specified json data using PUT
|
||||
|
||||
@@ -760,12 +757,7 @@ class MatrixFederationHttpClient:
|
||||
_sec_timeout = self.default_timeout
|
||||
|
||||
body = await _handle_json_response(
|
||||
self.reactor,
|
||||
_sec_timeout,
|
||||
request,
|
||||
response,
|
||||
start_ms,
|
||||
return_string_io=return_string_io,
|
||||
self.reactor, _sec_timeout, request, response, start_ms
|
||||
)
|
||||
|
||||
return body
|
||||
|
||||
@@ -12,8 +12,13 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# These are imported to allow for nicer logging configuration files.
|
||||
import logging
|
||||
|
||||
from synapse.logging._remote import RemoteHandler
|
||||
from synapse.logging._terse_json import JsonFormatter, TerseJsonFormatter
|
||||
|
||||
# These are imported to allow for nicer logging configuration files.
|
||||
__all__ = ["RemoteHandler", "JsonFormatter", "TerseJsonFormatter"]
|
||||
|
||||
# Debug logger for https://github.com/matrix-org/synapse/issues/9533 etc
|
||||
issue9533_logger = logging.getLogger("synapse.9533_debug")
|
||||
|
||||
@@ -53,6 +53,8 @@ def _setup_jemalloc_stats():
|
||||
logger.debug("jemalloc not found")
|
||||
return
|
||||
|
||||
logger.debug("Found jemalloc at %s", jemalloc_path)
|
||||
|
||||
jemalloc = ctypes.CDLL(jemalloc_path)
|
||||
|
||||
def _mallctl(
|
||||
@@ -127,8 +129,8 @@ def _setup_jemalloc_stats():
|
||||
"""
|
||||
try:
|
||||
_mallctl("epoch", read=False, write=1)
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.warning("Failed to reload jemalloc stats: %s", e)
|
||||
|
||||
class JemallocCollector:
|
||||
"""Metrics for internal jemalloc stats."""
|
||||
@@ -169,8 +171,9 @@ def _setup_jemalloc_stats():
|
||||
):
|
||||
try:
|
||||
value = _mallctl(f"stats.{t}")
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
# There was an error fetching the value, skip.
|
||||
logger.warning("Failed to read jemalloc stats.%s: %s", t, e)
|
||||
continue
|
||||
|
||||
g.add_metric([t], value=value)
|
||||
@@ -188,4 +191,6 @@ def setup_jemalloc_stats():
|
||||
try:
|
||||
_setup_jemalloc_stats()
|
||||
except Exception as e:
|
||||
# This should only happen if we find the loaded jemalloc library, but
|
||||
# fail to load it somehow (e.g. we somehow picked the wrong version).
|
||||
logger.info("Failed to setup collector to record jemalloc stats: %s", e)
|
||||
|
||||
@@ -38,6 +38,7 @@ from synapse.api.constants import EventTypes, HistoryVisibility, Membership
|
||||
from synapse.api.errors import AuthError
|
||||
from synapse.events import EventBase
|
||||
from synapse.handlers.presence import format_user_presence_state
|
||||
from synapse.logging import issue9533_logger
|
||||
from synapse.logging.context import PreserveLoggingContext
|
||||
from synapse.logging.opentracing import log_kv, start_active_span
|
||||
from synapse.logging.utils import log_function
|
||||
@@ -277,7 +278,7 @@ class Notifier:
|
||||
event_pos=event_pos,
|
||||
room_id=event.room_id,
|
||||
event_type=event.type,
|
||||
state_key=getattr(event, "state_key", None),
|
||||
state_key=event.get("state_key"),
|
||||
membership=event.content.get("membership"),
|
||||
max_room_stream_token=max_room_stream_token,
|
||||
extra_users=extra_users or [],
|
||||
@@ -426,6 +427,13 @@ class Notifier:
|
||||
for room in rooms:
|
||||
user_streams |= self.room_to_user_streams.get(room, set())
|
||||
|
||||
if stream_key == "to_device_key":
|
||||
issue9533_logger.debug(
|
||||
"to-device messages stream id %s, awaking streams for %s",
|
||||
new_token,
|
||||
users,
|
||||
)
|
||||
|
||||
time_now_ms = self.clock.time_msec()
|
||||
for user_stream in user_streams:
|
||||
try:
|
||||
|
||||
@@ -19,6 +19,7 @@ from typing import Any, Dict, List, Optional, Pattern, Tuple, Union
|
||||
|
||||
from synapse.events import EventBase
|
||||
from synapse.types import UserID
|
||||
from synapse.util import glob_to_regex, re_word_boundary
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -125,7 +126,7 @@ class PushRuleEvaluatorForEvent:
|
||||
self._power_levels = power_levels
|
||||
|
||||
# Maps strings of e.g. 'content.body' -> event["content"]["body"]
|
||||
self._value_cache = _flatten_dict(event.get_dict())
|
||||
self._value_cache = _flatten_dict(event)
|
||||
|
||||
def matches(
|
||||
self, condition: Dict[str, Any], user_id: str, display_name: str
|
||||
@@ -183,7 +184,7 @@ class PushRuleEvaluatorForEvent:
|
||||
r = regex_cache.get((display_name, False, True), None)
|
||||
if not r:
|
||||
r1 = re.escape(display_name)
|
||||
r1 = _re_word_boundary(r1)
|
||||
r1 = re_word_boundary(r1)
|
||||
r = re.compile(r1, flags=re.IGNORECASE)
|
||||
regex_cache[(display_name, False, True)] = r
|
||||
|
||||
@@ -212,7 +213,7 @@ def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
|
||||
try:
|
||||
r = regex_cache.get((glob, True, word_boundary), None)
|
||||
if not r:
|
||||
r = _glob_to_re(glob, word_boundary)
|
||||
r = glob_to_regex(glob, word_boundary)
|
||||
regex_cache[(glob, True, word_boundary)] = r
|
||||
return bool(r.search(value))
|
||||
except re.error:
|
||||
@@ -220,58 +221,8 @@ def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def _glob_to_re(glob: str, word_boundary: bool) -> Pattern:
|
||||
"""Generates regex for a given glob.
|
||||
|
||||
Args:
|
||||
glob
|
||||
word_boundary: Whether to match against word boundaries or entire string.
|
||||
"""
|
||||
if IS_GLOB.search(glob):
|
||||
r = re.escape(glob)
|
||||
|
||||
r = r.replace(r"\*", ".*?")
|
||||
r = r.replace(r"\?", ".")
|
||||
|
||||
# handle [abc], [a-z] and [!a-z] style ranges.
|
||||
r = GLOB_REGEX.sub(
|
||||
lambda x: (
|
||||
"[%s%s]" % (x.group(1) and "^" or "", x.group(2).replace(r"\\\-", "-"))
|
||||
),
|
||||
r,
|
||||
)
|
||||
if word_boundary:
|
||||
r = _re_word_boundary(r)
|
||||
|
||||
return re.compile(r, flags=re.IGNORECASE)
|
||||
else:
|
||||
r = "^" + r + "$"
|
||||
|
||||
return re.compile(r, flags=re.IGNORECASE)
|
||||
elif word_boundary:
|
||||
r = re.escape(glob)
|
||||
r = _re_word_boundary(r)
|
||||
|
||||
return re.compile(r, flags=re.IGNORECASE)
|
||||
else:
|
||||
r = "^" + re.escape(glob) + "$"
|
||||
return re.compile(r, flags=re.IGNORECASE)
|
||||
|
||||
|
||||
def _re_word_boundary(r: str) -> str:
|
||||
"""
|
||||
Adds word boundary characters to the start and end of an
|
||||
expression to require that the match occur as a whole word,
|
||||
but do so respecting the fact that strings starting or ending
|
||||
with non-word characters will change word boundaries.
|
||||
"""
|
||||
# we can't use \b as it chokes on unicode. however \W seems to be okay
|
||||
# as shorthand for [^0-9A-Za-z_].
|
||||
return r"(^|\W)%s(\W|$)" % (r,)
|
||||
|
||||
|
||||
def _flatten_dict(
|
||||
d: dict,
|
||||
d: Union[EventBase, dict],
|
||||
prefix: Optional[List[str]] = None,
|
||||
result: Optional[Dict[str, str]] = None,
|
||||
) -> Dict[str, str]:
|
||||
|
||||
@@ -78,7 +78,8 @@ REQUIREMENTS = [
|
||||
# we use attr.validators.deep_iterable, which arrived in 19.1.0 (Note:
|
||||
# Fedora 31 only has 19.1, so if we want to upgrade we should wait until 33
|
||||
# is out in November.)
|
||||
"attrs>=19.1.0",
|
||||
# Note: 21.1.0 broke `/sync`, see #9936
|
||||
"attrs>=19.1.0,!=21.1.0",
|
||||
"netaddr>=0.7.18",
|
||||
"Jinja2>=2.9",
|
||||
"bleach>=1.4.3",
|
||||
|
||||
@@ -51,7 +51,6 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# How long we allow callers to wait for replication updates before timing out.
|
||||
_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 30
|
||||
|
||||
|
||||
@@ -1020,6 +1020,7 @@ class RoomSpaceSummaryRestServlet(RestServlet):
|
||||
max_rooms_per_space=parse_integer(request, "max_rooms_per_space"),
|
||||
)
|
||||
|
||||
# TODO When switching to the stable endpoint, remove the POST handler.
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, room_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
|
||||
@@ -22,7 +22,6 @@ from synapse.crypto.keyring import ServerKeyFetcher
|
||||
from synapse.http.server import DirectServeJsonResource, respond_with_json
|
||||
from synapse.http.servlet import parse_integer, parse_json_object_from_request
|
||||
from synapse.util import json_decoder
|
||||
from synapse.util.async_helpers import yieldable_gather_results
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -214,13 +213,7 @@ class RemoteKey(DirectServeJsonResource):
|
||||
# If there is a cache miss, request the missing keys, then recurse (and
|
||||
# ensure the result is sent).
|
||||
if cache_misses and query_remote_on_cache_miss:
|
||||
await yieldable_gather_results(
|
||||
lambda t: self.fetcher.get_keys(*t),
|
||||
(
|
||||
(server_name, list(keys), 0)
|
||||
for server_name, keys in cache_misses.items()
|
||||
),
|
||||
)
|
||||
await self.fetcher.get_keys(cache_misses)
|
||||
await self.query_keys(request, query, query_remote_on_cache_miss=False)
|
||||
else:
|
||||
signed_keys = []
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
import logging
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
from synapse.logging import issue9533_logger
|
||||
from synapse.logging.opentracing import log_kv, set_tag, trace
|
||||
from synapse.replication.tcp.streams import ToDeviceStream
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
@@ -404,6 +405,13 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
],
|
||||
)
|
||||
|
||||
if remote_messages_by_destination:
|
||||
issue9533_logger.debug(
|
||||
"Queued outgoing to-device messages with stream_id %i for %s",
|
||||
stream_id,
|
||||
list(remote_messages_by_destination.keys()),
|
||||
)
|
||||
|
||||
async with self._device_inbox_id_gen.get_next() as stream_id:
|
||||
now_ms = self.clock.time_msec()
|
||||
await self.db_pool.runInteraction(
|
||||
@@ -533,6 +541,16 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
],
|
||||
)
|
||||
|
||||
issue9533_logger.debug(
|
||||
"Stored to-device messages with stream_id %i for %s",
|
||||
stream_id,
|
||||
[
|
||||
(user_id, device_id)
|
||||
for (user_id, messages_by_device) in local_by_user_then_device.items()
|
||||
for device_id in messages_by_device.keys()
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
|
||||
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
|
||||
|
||||
@@ -84,7 +84,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
|
||||
if keys:
|
||||
result["keys"] = keys
|
||||
|
||||
device_display_name = device.display_name
|
||||
device_display_name = None
|
||||
if self.hs.config.allow_device_name_lookup_over_federation:
|
||||
device_display_name = device.display_name
|
||||
if device_display_name:
|
||||
result["device_display_name"] = device_display_name
|
||||
|
||||
|
||||
@@ -1,21 +0,0 @@
|
||||
# Synapse Database Schemas
|
||||
|
||||
These schemas are used as a basis to create brand new Synapse databases, on both
|
||||
SQLite3 and Postgres.
|
||||
|
||||
## Building full schema dumps
|
||||
|
||||
If you want to recreate these schemas, they need to be made from a database that
|
||||
has had all background updates run.
|
||||
|
||||
To do so, use `scripts-dev/make_full_schema.sh`. This will produce new
|
||||
`full.sql.postgres ` and `full.sql.sqlite` files.
|
||||
|
||||
Ensure postgres is installed and your user has the ability to run bash commands
|
||||
such as `createdb`, then call
|
||||
|
||||
./scripts-dev/make_full_schema.sh -p postgres_username -o output_dir/
|
||||
|
||||
There are currently two folders with full-schema snapshots. `16` is a snapshot
|
||||
from 2015, for historical reference. The other contains the most recent full
|
||||
schema snapshot.
|
||||
@@ -26,16 +26,13 @@ from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.storage.database import LoggingDatabaseConnection
|
||||
from synapse.storage.engines import BaseDatabaseEngine
|
||||
from synapse.storage.engines.postgres import PostgresEngine
|
||||
from synapse.storage.schema import SCHEMA_VERSION
|
||||
from synapse.storage.types import Cursor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Remember to update this number every time a change is made to database
|
||||
# schema files, so the users will be informed on server restarts.
|
||||
SCHEMA_VERSION = 59
|
||||
|
||||
dir_path = os.path.abspath(os.path.dirname(__file__))
|
||||
schema_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "schema")
|
||||
|
||||
|
||||
class PrepareDatabaseException(Exception):
|
||||
@@ -167,7 +164,14 @@ def _setup_new_database(
|
||||
|
||||
Example directory structure:
|
||||
|
||||
schema/
|
||||
schema/
|
||||
common/
|
||||
delta/
|
||||
...
|
||||
full_schemas/
|
||||
11/
|
||||
foo.sql
|
||||
main/
|
||||
delta/
|
||||
...
|
||||
full_schemas/
|
||||
@@ -175,15 +179,14 @@ def _setup_new_database(
|
||||
test.sql
|
||||
...
|
||||
11/
|
||||
foo.sql
|
||||
bar.sql
|
||||
...
|
||||
|
||||
In the example foo.sql and bar.sql would be run, and then any delta files
|
||||
for versions strictly greater than 11.
|
||||
|
||||
Note: we apply the full schemas and deltas from the top level `schema/`
|
||||
folder as well those in the data stores specified.
|
||||
Note: we apply the full schemas and deltas from the `schema/common`
|
||||
folder as well those in the databases specified.
|
||||
|
||||
Args:
|
||||
cur: a database cursor
|
||||
@@ -195,12 +198,12 @@ def _setup_new_database(
|
||||
# configured to our liking.
|
||||
database_engine.check_new_database(cur)
|
||||
|
||||
current_dir = os.path.join(dir_path, "schema", "full_schemas")
|
||||
full_schemas_dir = os.path.join(schema_path, "common", "full_schemas")
|
||||
|
||||
# First we find the highest full schema version we have
|
||||
valid_versions = []
|
||||
|
||||
for filename in os.listdir(current_dir):
|
||||
for filename in os.listdir(full_schemas_dir):
|
||||
try:
|
||||
ver = int(filename)
|
||||
except ValueError:
|
||||
@@ -218,15 +221,13 @@ def _setup_new_database(
|
||||
|
||||
logger.debug("Initialising schema v%d", max_current_ver)
|
||||
|
||||
# Now lets find all the full schema files, both in the global schema and
|
||||
# in data store schemas.
|
||||
directories = [os.path.join(current_dir, str(max_current_ver))]
|
||||
# Now let's find all the full schema files, both in the common schema and
|
||||
# in database schemas.
|
||||
directories = [os.path.join(full_schemas_dir, str(max_current_ver))]
|
||||
directories.extend(
|
||||
os.path.join(
|
||||
dir_path,
|
||||
"databases",
|
||||
schema_path,
|
||||
database,
|
||||
"schema",
|
||||
"full_schemas",
|
||||
str(max_current_ver),
|
||||
)
|
||||
@@ -357,6 +358,9 @@ def _upgrade_existing_database(
|
||||
check_database_before_upgrade(cur, database_engine, config)
|
||||
|
||||
start_ver = current_version
|
||||
|
||||
# if we got to this schema version by running a full_schema rather than a series
|
||||
# of deltas, we should not run the deltas for this version.
|
||||
if not upgraded:
|
||||
start_ver += 1
|
||||
|
||||
@@ -385,12 +389,10 @@ def _upgrade_existing_database(
|
||||
# directories for schema updates.
|
||||
|
||||
# First we find the directories to search in
|
||||
delta_dir = os.path.join(dir_path, "schema", "delta", str(v))
|
||||
delta_dir = os.path.join(schema_path, "common", "delta", str(v))
|
||||
directories = [delta_dir]
|
||||
for database in databases:
|
||||
directories.append(
|
||||
os.path.join(dir_path, "databases", database, "schema", "delta", str(v))
|
||||
)
|
||||
directories.append(os.path.join(schema_path, database, "delta", str(v)))
|
||||
|
||||
# Used to check if we have any duplicate file names
|
||||
file_name_counter = Counter() # type: CounterType[str]
|
||||
@@ -621,8 +623,8 @@ def _get_or_create_schema_state(
|
||||
txn: Cursor, database_engine: BaseDatabaseEngine
|
||||
) -> Optional[Tuple[int, List[str], bool]]:
|
||||
# Bluntly try creating the schema_version tables.
|
||||
schema_path = os.path.join(dir_path, "schema", "schema_version.sql")
|
||||
executescript(txn, schema_path)
|
||||
sql_path = os.path.join(schema_path, "common", "schema_version.sql")
|
||||
executescript(txn, sql_path)
|
||||
|
||||
txn.execute("SELECT version, upgraded FROM schema_version")
|
||||
row = txn.fetchone()
|
||||
|
||||
37
synapse/storage/schema/README.md
Normal file
37
synapse/storage/schema/README.md
Normal file
@@ -0,0 +1,37 @@
|
||||
# Synapse Database Schemas
|
||||
|
||||
This directory contains the schema files used to build Synapse databases.
|
||||
|
||||
Synapse supports splitting its datastore across multiple physical databases (which can
|
||||
be useful for large installations), and the schema files are therefore split according
|
||||
to the logical database they are apply to.
|
||||
|
||||
At the time of writing, the following "logical" databases are supported:
|
||||
|
||||
* `state` - used to store Matrix room state (more specifically, `state_groups`,
|
||||
their relationships and contents.)
|
||||
* `main` - stores everything else.
|
||||
|
||||
Addionally, the `common` directory contains schema files for tables which must be
|
||||
present on *all* physical databases.
|
||||
|
||||
## Full schema dumps
|
||||
|
||||
In the `full_schemas` directories, only the most recently-numbered snapshot is useful
|
||||
(`54` at the time of writing). Older snapshots (eg, `16`) are present for historical
|
||||
reference only.
|
||||
|
||||
## Building full schema dumps
|
||||
|
||||
If you want to recreate these schemas, they need to be made from a database that
|
||||
has had all background updates run.
|
||||
|
||||
To do so, use `scripts-dev/make_full_schema.sh`. This will produce new
|
||||
`full.sql.postgres` and `full.sql.sqlite` files.
|
||||
|
||||
Ensure postgres is installed, then run:
|
||||
|
||||
./scripts-dev/make_full_schema.sh -p postgres_username -o output_dir/
|
||||
|
||||
NB at the time of writing, this script predates the split into separate `state`/`main`
|
||||
databases so will require updates to handle that correctly.
|
||||
17
synapse/storage/schema/__init__.py
Normal file
17
synapse/storage/schema/__init__.py
Normal file
@@ -0,0 +1,17 @@
|
||||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# Remember to update this number every time a change is made to database
|
||||
# schema files, so the users will be informed on server restarts.
|
||||
SCHEMA_VERSION = 59
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user