mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-09 01:30:18 +00:00
Compare commits
142 Commits
erikj/mino
...
anoa/debug
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5bd0d19b95 | ||
|
|
88bb6c27e1 | ||
|
|
066804f591 | ||
|
|
56b5f1d0ee | ||
|
|
a438950a00 | ||
|
|
2fa55c0cc6 | ||
|
|
c8c926f9c9 | ||
|
|
163f23785a | ||
|
|
5aa6dff99e | ||
|
|
e43e78b985 | ||
|
|
782b811789 | ||
|
|
e913823a22 | ||
|
|
8c75667ad7 | ||
|
|
443162e577 | ||
|
|
4a17a647a9 | ||
|
|
88b41986db | ||
|
|
6d110ddea4 | ||
|
|
c37db0211e | ||
|
|
5e477c1deb | ||
|
|
7581d30e9f | ||
|
|
60724c46b7 | ||
|
|
6a35046363 | ||
|
|
7df04ca0e6 | ||
|
|
beb19cf61a | ||
|
|
d8d91983bc | ||
|
|
ebfcbbff9c | ||
|
|
77d0a4507b | ||
|
|
0de9f9486a | ||
|
|
f9e98176bf | ||
|
|
bd5e555b0d | ||
|
|
900bca9707 | ||
|
|
e55a240681 | ||
|
|
b8cfe79ffc | ||
|
|
8120a238a4 | ||
|
|
37a9873f63 | ||
|
|
e38c44b418 | ||
|
|
1cde4cf3f1 | ||
|
|
2dce68c651 | ||
|
|
9c0775e86a | ||
|
|
69ce55c510 | ||
|
|
54dd28621b | ||
|
|
751d51dd12 | ||
|
|
42ac4ca477 | ||
|
|
6640460d05 | ||
|
|
8f826f98ac | ||
|
|
dc6fb56c5f | ||
|
|
fe593ef990 | ||
|
|
5ec2077bf9 | ||
|
|
156f271867 | ||
|
|
51c094c4ac | ||
|
|
6b0efe73e2 | ||
|
|
39f6595b4a | ||
|
|
885134529f | ||
|
|
7e5f40e771 | ||
|
|
50ea178c20 | ||
|
|
04f4b5f6f8 | ||
|
|
14b2ebe767 | ||
|
|
f9e3a3f4d0 | ||
|
|
aee2bae952 | ||
|
|
87c65576e0 | ||
|
|
06eb5cae08 | ||
|
|
66315d862f | ||
|
|
bbf725e7da | ||
|
|
99bbe177b6 | ||
|
|
20545a2199 | ||
|
|
ce460dc31c | ||
|
|
fb078f921b | ||
|
|
1f5f3ae8b1 | ||
|
|
2bff4457d9 | ||
|
|
1d66dce83e | ||
|
|
54b78a0e3b | ||
|
|
297aaf4816 | ||
|
|
45df9d35a9 | ||
|
|
a27056d539 | ||
|
|
80e580ae92 | ||
|
|
87972f07e5 | ||
|
|
78a15b1f9d | ||
|
|
fe678a0900 | ||
|
|
83b6c69d3d | ||
|
|
31a2116331 | ||
|
|
13892776ef | ||
|
|
8ef8fb2c1c | ||
|
|
43f874055d | ||
|
|
6b0ef34706 | ||
|
|
fe6ab0439d | ||
|
|
fd983fad96 | ||
|
|
7dcbc33a1b | ||
|
|
6a8880b9c3 | ||
|
|
a0178df104 | ||
|
|
6f67a8b570 | ||
|
|
65c73cdfec | ||
|
|
809e8567f6 | ||
|
|
b68041df3d | ||
|
|
b29474e0aa | ||
|
|
27d099edd6 | ||
|
|
2e7fad87d4 | ||
|
|
b2bd54a2e3 | ||
|
|
3ab8e9c293 | ||
|
|
174aaa1d62 | ||
|
|
036c6cea07 | ||
|
|
bbeee33d63 | ||
|
|
cc7ab0d84a | ||
|
|
e4ffb14d57 | ||
|
|
d96ac97d29 | ||
|
|
12d4259000 | ||
|
|
9b06d8f8a6 | ||
|
|
ab0073a6c0 | ||
|
|
2201bc9795 | ||
|
|
cab4a52535 | ||
|
|
b32ac60c22 | ||
|
|
132b673dbe | ||
|
|
3e99528f2b | ||
|
|
380122866f | ||
|
|
1f773eec91 | ||
|
|
7728d87fd7 | ||
|
|
8c75b621bf | ||
|
|
c1156d3e2b | ||
|
|
e66f099ca9 | ||
|
|
bbf8886a05 | ||
|
|
4aea0bd292 | ||
|
|
691659568f | ||
|
|
a301934f46 | ||
|
|
4c2ed3f20e | ||
|
|
af6c389501 | ||
|
|
7b0e2d961c | ||
|
|
fcf4599488 | ||
|
|
7936d2a96e | ||
|
|
509e381afa | ||
|
|
272eee1ae1 | ||
|
|
4f7e4fc2fb | ||
|
|
1fcb9a1a7a | ||
|
|
0bd8cf435e | ||
|
|
9c1b83b007 | ||
|
|
8f6d9c4cf0 | ||
|
|
99eed85a77 | ||
|
|
a90d0dc5c2 | ||
|
|
4fb5f4d0ce | ||
|
|
7b7c3cedf2 | ||
|
|
fc87d2ffb3 | ||
|
|
2b37eabca1 | ||
|
|
0001e8397e | ||
|
|
197b08de35 |
@@ -6,12 +6,7 @@
|
||||
set -ex
|
||||
|
||||
apt-get update
|
||||
apt-get install -y python3.5 python3.5-dev python3-pip libxml2-dev libxslt-dev zlib1g-dev
|
||||
|
||||
# workaround for https://github.com/jaraco/zipp/issues/40
|
||||
python3.5 -m pip install 'setuptools>=34.4.0'
|
||||
|
||||
python3.5 -m pip install tox
|
||||
apt-get install -y python3.5 python3.5-dev python3-pip libxml2-dev libxslt-dev zlib1g-dev tox
|
||||
|
||||
export LANG="C.UTF-8"
|
||||
|
||||
|
||||
233
CHANGES.md
233
CHANGES.md
@@ -1,3 +1,236 @@
|
||||
Synapse 1.12.0 (2020-03-23)
|
||||
===========================
|
||||
|
||||
No significant changes since 1.12.0rc1.
|
||||
|
||||
Debian packages and Docker images are rebuilt using the latest versions of
|
||||
dependency libraries, including Twisted 20.3.0. **Please see security advisory
|
||||
below**.
|
||||
|
||||
Security advisory
|
||||
-----------------
|
||||
|
||||
Synapse may be vulnerable to request-smuggling attacks when it is used with a
|
||||
reverse-proxy. The vulnerabilties are fixed in Twisted 20.3.0, and are
|
||||
described in
|
||||
[CVE-2020-10108](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-10108)
|
||||
and
|
||||
[CVE-2020-10109](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-10109).
|
||||
For a good introduction to this class of request-smuggling attacks, see
|
||||
https://portswigger.net/research/http-desync-attacks-request-smuggling-reborn.
|
||||
|
||||
We are not aware of these vulnerabilities being exploited in the wild, and
|
||||
do not believe that they are exploitable with current versions of any reverse
|
||||
proxies. Nevertheless, we recommend that all Synapse administrators ensure that
|
||||
they have the latest versions of the Twisted library to ensure that their
|
||||
installation remains secure.
|
||||
|
||||
* Administrators using the [`matrix.org` Docker
|
||||
image](https://hub.docker.com/r/matrixdotorg/synapse/) or the [Debian/Ubuntu
|
||||
packages from
|
||||
`matrix.org`](https://github.com/matrix-org/synapse/blob/master/INSTALL.md#matrixorg-packages)
|
||||
should ensure that they have version 1.12.0 installed: these images include
|
||||
Twisted 20.3.0.
|
||||
* Administrators who have [installed Synapse from
|
||||
source](https://github.com/matrix-org/synapse/blob/master/INSTALL.md#installing-from-source)
|
||||
should upgrade Twisted within their virtualenv by running:
|
||||
```sh
|
||||
<path_to_virtualenv>/bin/pip install 'Twisted>=20.3.0'
|
||||
```
|
||||
* Administrators who have installed Synapse from distribution packages should
|
||||
consult the information from their distributions.
|
||||
|
||||
The `matrix.org` Synapse instance was not vulnerable to these vulnerabilities.
|
||||
|
||||
Advance notice of change to the default `git` branch for Synapse
|
||||
----------------------------------------------------------------
|
||||
|
||||
Currently, the default `git` branch for Synapse is `master`, which tracks the
|
||||
latest release.
|
||||
|
||||
After the release of Synapse 1.13.0, we intend to change this default to
|
||||
`develop`, which is the development tip. This is more consistent with common
|
||||
practice and modern `git` usage.
|
||||
|
||||
Although we try to keep `develop` in a stable state, there may be occasions
|
||||
where regressions creep in. Developers and distributors who have scripts which
|
||||
run builds using the default branch of `Synapse` should therefore consider
|
||||
pinning their scripts to `master`.
|
||||
|
||||
|
||||
Synapse 1.12.0rc1 (2020-03-19)
|
||||
==============================
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Changes related to room alias management ([MSC2432](https://github.com/matrix-org/matrix-doc/pull/2432)):
|
||||
- Publishing/removing a room from the room directory now requires the user to have a power level capable of modifying the canonical alias, instead of the room aliases. ([\#6965](https://github.com/matrix-org/synapse/issues/6965))
|
||||
- Validate the `alt_aliases` property of canonical alias events. ([\#6971](https://github.com/matrix-org/synapse/issues/6971))
|
||||
- Users with a power level sufficient to modify the canonical alias of a room can now delete room aliases. ([\#6986](https://github.com/matrix-org/synapse/issues/6986))
|
||||
- Implement updated authorization rules and redaction rules for aliases events, from [MSC2261](https://github.com/matrix-org/matrix-doc/pull/2261) and [MSC2432](https://github.com/matrix-org/matrix-doc/pull/2432). ([\#7037](https://github.com/matrix-org/synapse/issues/7037))
|
||||
- Stop sending m.room.aliases events during room creation and upgrade. ([\#6941](https://github.com/matrix-org/synapse/issues/6941))
|
||||
- Synapse no longer uses room alias events to calculate room names for push notifications. ([\#6966](https://github.com/matrix-org/synapse/issues/6966))
|
||||
- The room list endpoint no longer returns a list of aliases. ([\#6970](https://github.com/matrix-org/synapse/issues/6970))
|
||||
- Remove special handling of aliases events from [MSC2260](https://github.com/matrix-org/matrix-doc/pull/2260) added in v1.10.0rc1. ([\#7034](https://github.com/matrix-org/synapse/issues/7034))
|
||||
- Expose the `synctl`, `hash_password` and `generate_config` commands in the snapcraft package. Contributed by @devec0. ([\#6315](https://github.com/matrix-org/synapse/issues/6315))
|
||||
- Check that server_name is correctly set before running database updates. ([\#6982](https://github.com/matrix-org/synapse/issues/6982))
|
||||
- Break down monthly active users by `appservice_id` and emit via Prometheus. ([\#7030](https://github.com/matrix-org/synapse/issues/7030))
|
||||
- Render a configurable and comprehensible error page if something goes wrong during the SAML2 authentication process. ([\#7058](https://github.com/matrix-org/synapse/issues/7058), [\#7067](https://github.com/matrix-org/synapse/issues/7067))
|
||||
- Add an optional parameter to control whether other sessions are logged out when a user's password is modified. ([\#7085](https://github.com/matrix-org/synapse/issues/7085))
|
||||
- Add prometheus metrics for the number of active pushers. ([\#7103](https://github.com/matrix-org/synapse/issues/7103), [\#7106](https://github.com/matrix-org/synapse/issues/7106))
|
||||
- Improve performance when making HTTPS requests to sygnal, sydent, etc, by sharing the SSL context object between connections. ([\#7094](https://github.com/matrix-org/synapse/issues/7094))
|
||||
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- When a user's profile is updated via the admin API, also generate a displayname/avatar update for that user in each room. ([\#6572](https://github.com/matrix-org/synapse/issues/6572))
|
||||
- Fix a couple of bugs in email configuration handling. ([\#6962](https://github.com/matrix-org/synapse/issues/6962))
|
||||
- Fix an issue affecting worker-based deployments where replication would stop working, necessitating a full restart, after joining a large room. ([\#6967](https://github.com/matrix-org/synapse/issues/6967))
|
||||
- Fix `duplicate key` error which was logged when rejoining a room over federation. ([\#6968](https://github.com/matrix-org/synapse/issues/6968))
|
||||
- Prevent user from setting 'deactivated' to anything other than a bool on the v2 PUT /users Admin API. ([\#6990](https://github.com/matrix-org/synapse/issues/6990))
|
||||
- Fix py35-old CI by using native tox package. ([\#7018](https://github.com/matrix-org/synapse/issues/7018))
|
||||
- Fix a bug causing `org.matrix.dummy_event` to be included in responses from `/sync`. ([\#7035](https://github.com/matrix-org/synapse/issues/7035))
|
||||
- Fix a bug that renders UTF-8 text files incorrectly when loaded from media. Contributed by @TheStranjer. ([\#7044](https://github.com/matrix-org/synapse/issues/7044))
|
||||
- Fix a bug that would cause Synapse to respond with an error about event visibility if a client tried to request the state of a room at a given token. ([\#7066](https://github.com/matrix-org/synapse/issues/7066))
|
||||
- Repair a data-corruption issue which was introduced in Synapse 1.10, and fixed in Synapse 1.11, and which could cause `/sync` to return with 404 errors about missing events and unknown rooms. ([\#7070](https://github.com/matrix-org/synapse/issues/7070))
|
||||
- Fix a bug causing account validity renewal emails to be sent even if the feature is turned off in some cases. ([\#7074](https://github.com/matrix-org/synapse/issues/7074))
|
||||
|
||||
|
||||
Improved Documentation
|
||||
----------------------
|
||||
|
||||
- Updated CentOS8 install instructions. Contributed by Richard Kellner. ([\#6925](https://github.com/matrix-org/synapse/issues/6925))
|
||||
- Fix `POSTGRES_INITDB_ARGS` in the `contrib/docker/docker-compose.yml` example docker-compose configuration. ([\#6984](https://github.com/matrix-org/synapse/issues/6984))
|
||||
- Change date in [INSTALL.md](./INSTALL.md#tls-certificates) for last date of getting TLS certificates to November 2019. ([\#7015](https://github.com/matrix-org/synapse/issues/7015))
|
||||
- Document that the fallback auth endpoints must be routed to the same worker node as the register endpoints. ([\#7048](https://github.com/matrix-org/synapse/issues/7048))
|
||||
|
||||
|
||||
Deprecations and Removals
|
||||
-------------------------
|
||||
|
||||
- Remove the unused query_auth federation endpoint per [MSC2451](https://github.com/matrix-org/matrix-doc/pull/2451). ([\#7026](https://github.com/matrix-org/synapse/issues/7026))
|
||||
|
||||
|
||||
Internal Changes
|
||||
----------------
|
||||
|
||||
- Add type hints to `logging/context.py`. ([\#6309](https://github.com/matrix-org/synapse/issues/6309))
|
||||
- Add some clarifications to `README.md` in the database schema directory. ([\#6615](https://github.com/matrix-org/synapse/issues/6615))
|
||||
- Refactoring work in preparation for changing the event redaction algorithm. ([\#6874](https://github.com/matrix-org/synapse/issues/6874), [\#6875](https://github.com/matrix-org/synapse/issues/6875), [\#6983](https://github.com/matrix-org/synapse/issues/6983), [\#7003](https://github.com/matrix-org/synapse/issues/7003))
|
||||
- Improve performance of v2 state resolution for large rooms. ([\#6952](https://github.com/matrix-org/synapse/issues/6952), [\#7095](https://github.com/matrix-org/synapse/issues/7095))
|
||||
- Reduce time spent doing GC, by freezing objects on startup. ([\#6953](https://github.com/matrix-org/synapse/issues/6953))
|
||||
- Minor perfermance fixes to `get_auth_chain_ids`. ([\#6954](https://github.com/matrix-org/synapse/issues/6954))
|
||||
- Don't record remote cross-signing keys in the `devices` table. ([\#6956](https://github.com/matrix-org/synapse/issues/6956))
|
||||
- Use flake8-comprehensions to enforce good hygiene of list/set/dict comprehensions. ([\#6957](https://github.com/matrix-org/synapse/issues/6957))
|
||||
- Merge worker apps together. ([\#6964](https://github.com/matrix-org/synapse/issues/6964), [\#7002](https://github.com/matrix-org/synapse/issues/7002), [\#7055](https://github.com/matrix-org/synapse/issues/7055), [\#7104](https://github.com/matrix-org/synapse/issues/7104))
|
||||
- Remove redundant `store_room` call from `FederationHandler._process_received_pdu`. ([\#6979](https://github.com/matrix-org/synapse/issues/6979))
|
||||
- Update warning for incorrect database collation/ctype to include link to documentation. ([\#6985](https://github.com/matrix-org/synapse/issues/6985))
|
||||
- Add some type annotations to the database storage classes. ([\#6987](https://github.com/matrix-org/synapse/issues/6987))
|
||||
- Port `synapse.handlers.presence` to async/await. ([\#6991](https://github.com/matrix-org/synapse/issues/6991), [\#7019](https://github.com/matrix-org/synapse/issues/7019))
|
||||
- Add some type annotations to the federation base & client classes. ([\#6995](https://github.com/matrix-org/synapse/issues/6995))
|
||||
- Port `synapse.rest.keys` to async/await. ([\#7020](https://github.com/matrix-org/synapse/issues/7020))
|
||||
- Add a type check to `is_verified` when processing room keys. ([\#7045](https://github.com/matrix-org/synapse/issues/7045))
|
||||
- Add type annotations and comments to the auth handler. ([\#7063](https://github.com/matrix-org/synapse/issues/7063))
|
||||
|
||||
|
||||
Synapse 1.11.1 (2020-03-03)
|
||||
===========================
|
||||
|
||||
This release includes a security fix impacting installations using Single Sign-On (i.e. SAML2 or CAS) for authentication. Administrators of such installations are encouraged to upgrade as soon as possible.
|
||||
|
||||
The release also includes fixes for a couple of other bugs.
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Add a confirmation step to the SSO login flow before redirecting users to the redirect URL. ([b2bd54a2](https://github.com/matrix-org/synapse/commit/b2bd54a2e31d9a248f73fadb184ae9b4cbdb49f9), [65c73cdf](https://github.com/matrix-org/synapse/commit/65c73cdfec1876a9fec2fd2c3a74923cd146fe0b), [a0178df1](https://github.com/matrix-org/synapse/commit/a0178df10422a76fd403b82d2b2a4ed28a9a9d1e))
|
||||
- Fixed set a user as an admin with the admin API `PUT /_synapse/admin/v2/users/<user_id>`. Contributed by @dklimpel. ([\#6910](https://github.com/matrix-org/synapse/issues/6910))
|
||||
- Fix bug introduced in Synapse 1.11.0 which sometimes caused errors when joining rooms over federation, with `'coroutine' object has no attribute 'event_id'`. ([\#6996](https://github.com/matrix-org/synapse/issues/6996))
|
||||
|
||||
|
||||
Synapse 1.11.0 (2020-02-21)
|
||||
===========================
|
||||
|
||||
Improved Documentation
|
||||
----------------------
|
||||
|
||||
- Small grammatical fixes to the ACME v1 deprecation notice. ([\#6944](https://github.com/matrix-org/synapse/issues/6944))
|
||||
|
||||
|
||||
Synapse 1.11.0rc1 (2020-02-19)
|
||||
==============================
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Admin API to add or modify threepids of user accounts. ([\#6769](https://github.com/matrix-org/synapse/issues/6769))
|
||||
- Limit the number of events that can be requested by the backfill federation API to 100. ([\#6864](https://github.com/matrix-org/synapse/issues/6864))
|
||||
- Add ability to run some group APIs on workers. ([\#6866](https://github.com/matrix-org/synapse/issues/6866))
|
||||
- Reject device display names over 100 characters in length to prevent abuse. ([\#6882](https://github.com/matrix-org/synapse/issues/6882))
|
||||
- Add ability to route federation user device queries to workers. ([\#6873](https://github.com/matrix-org/synapse/issues/6873))
|
||||
- The result of a user directory search can now be filtered via the spam checker. ([\#6888](https://github.com/matrix-org/synapse/issues/6888))
|
||||
- Implement new `GET /_matrix/client/unstable/org.matrix.msc2432/rooms/{roomId}/aliases` endpoint as per [MSC2432](https://github.com/matrix-org/matrix-doc/pull/2432). ([\#6939](https://github.com/matrix-org/synapse/issues/6939), [\#6948](https://github.com/matrix-org/synapse/issues/6948), [\#6949](https://github.com/matrix-org/synapse/issues/6949))
|
||||
- Stop sending `m.room.alias` events wheng adding / removing aliases. Check `alt_aliases` in the latest `m.room.canonical_alias` event when deleting an alias. ([\#6904](https://github.com/matrix-org/synapse/issues/6904))
|
||||
- Change the default power levels of invites, tombstones and server ACLs for new rooms. ([\#6834](https://github.com/matrix-org/synapse/issues/6834))
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fixed third party event rules function `on_create_room`'s return value being ignored. ([\#6781](https://github.com/matrix-org/synapse/issues/6781))
|
||||
- Allow URL-encoded User IDs on `/_synapse/admin/v2/users/<user_id>[/admin]` endpoints. Thanks to @NHAS for reporting. ([\#6825](https://github.com/matrix-org/synapse/issues/6825))
|
||||
- Fix Synapse refusing to start if `federation_certificate_verification_whitelist` option is blank. ([\#6849](https://github.com/matrix-org/synapse/issues/6849))
|
||||
- Fix errors from logging in the purge jobs related to the message retention policies support. ([\#6945](https://github.com/matrix-org/synapse/issues/6945))
|
||||
- Return a 404 instead of 200 for querying information of a non-existant user through the admin API. ([\#6901](https://github.com/matrix-org/synapse/issues/6901))
|
||||
|
||||
|
||||
Updates to the Docker image
|
||||
---------------------------
|
||||
|
||||
- The deprecated "generate-config-on-the-fly" mode is no longer supported. ([\#6918](https://github.com/matrix-org/synapse/issues/6918))
|
||||
|
||||
|
||||
Improved Documentation
|
||||
----------------------
|
||||
|
||||
- Add details of PR merge strategy to contributing docs. ([\#6846](https://github.com/matrix-org/synapse/issues/6846))
|
||||
- Spell out that the last event sent to a room won't be deleted by a purge. ([\#6891](https://github.com/matrix-org/synapse/issues/6891))
|
||||
- Update Synapse's documentation to warn about the deprecation of ACME v1. ([\#6905](https://github.com/matrix-org/synapse/issues/6905), [\#6907](https://github.com/matrix-org/synapse/issues/6907), [\#6909](https://github.com/matrix-org/synapse/issues/6909))
|
||||
- Add documentation for the spam checker. ([\#6906](https://github.com/matrix-org/synapse/issues/6906))
|
||||
- Fix worker docs to point `/publicised_groups` API correctly. ([\#6938](https://github.com/matrix-org/synapse/issues/6938))
|
||||
- Clean up and update docs on setting up federation. ([\#6940](https://github.com/matrix-org/synapse/issues/6940))
|
||||
- Add a warning about indentation to generated configuration files. ([\#6920](https://github.com/matrix-org/synapse/issues/6920))
|
||||
- Databases created using the compose file in contrib/docker will now always have correct encoding and locale settings. Contributed by Fridtjof Mund. ([\#6921](https://github.com/matrix-org/synapse/issues/6921))
|
||||
- Update pip install directions in readme to avoid error when using zsh. ([\#6855](https://github.com/matrix-org/synapse/issues/6855))
|
||||
|
||||
|
||||
Deprecations and Removals
|
||||
-------------------------
|
||||
|
||||
- Remove `m.lazy_load_members` from `unstable_features` since lazy loading is in the stable Client-Server API version r0.5.0. ([\#6877](https://github.com/matrix-org/synapse/issues/6877))
|
||||
|
||||
|
||||
Internal Changes
|
||||
----------------
|
||||
|
||||
- Add type hints to `SyncHandler`. ([\#6821](https://github.com/matrix-org/synapse/issues/6821))
|
||||
- Refactoring work in preparation for changing the event redaction algorithm. ([\#6823](https://github.com/matrix-org/synapse/issues/6823), [\#6827](https://github.com/matrix-org/synapse/issues/6827), [\#6854](https://github.com/matrix-org/synapse/issues/6854), [\#6856](https://github.com/matrix-org/synapse/issues/6856), [\#6857](https://github.com/matrix-org/synapse/issues/6857), [\#6858](https://github.com/matrix-org/synapse/issues/6858))
|
||||
- Fix stacktraces when using `ObservableDeferred` and async/await. ([\#6836](https://github.com/matrix-org/synapse/issues/6836))
|
||||
- Port much of `synapse.handlers.federation` to async/await. ([\#6837](https://github.com/matrix-org/synapse/issues/6837), [\#6840](https://github.com/matrix-org/synapse/issues/6840))
|
||||
- Populate `rooms.room_version` database column at startup, rather than in a background update. ([\#6847](https://github.com/matrix-org/synapse/issues/6847))
|
||||
- Reduce amount we log at `INFO` level. ([\#6833](https://github.com/matrix-org/synapse/issues/6833), [\#6862](https://github.com/matrix-org/synapse/issues/6862))
|
||||
- Remove unused `get_room_stats_state` method. ([\#6869](https://github.com/matrix-org/synapse/issues/6869))
|
||||
- Add typing to `synapse.federation.sender` and port to async/await. ([\#6871](https://github.com/matrix-org/synapse/issues/6871))
|
||||
- Refactor `_EventInternalMetadata` object to improve type safety. ([\#6872](https://github.com/matrix-org/synapse/issues/6872))
|
||||
- Add an additional entry to the SyTest blacklist for worker mode. ([\#6883](https://github.com/matrix-org/synapse/issues/6883))
|
||||
- Fix the use of sed in the linting scripts when using BSD sed. ([\#6887](https://github.com/matrix-org/synapse/issues/6887))
|
||||
- Add type hints to the spam checker module. ([\#6915](https://github.com/matrix-org/synapse/issues/6915))
|
||||
- Convert the directory handler tests to use HomeserverTestCase. ([\#6919](https://github.com/matrix-org/synapse/issues/6919))
|
||||
- Increase DB/CPU perf of `_is_server_still_joined` check. ([\#6936](https://github.com/matrix-org/synapse/issues/6936))
|
||||
- Tiny optimisation for incoming HTTP request dispatch. ([\#6950](https://github.com/matrix-org/synapse/issues/6950))
|
||||
|
||||
|
||||
Synapse 1.10.1 (2020-02-17)
|
||||
===========================
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ python 3.6 and to install each tool:
|
||||
|
||||
```
|
||||
# Install the dependencies
|
||||
pip install -U black flake8 isort
|
||||
pip install -U black flake8 flake8-comprehensions isort
|
||||
|
||||
# Run the linter script
|
||||
./scripts-dev/lint.sh
|
||||
|
||||
24
INSTALL.md
24
INSTALL.md
@@ -124,12 +124,21 @@ sudo pacman -S base-devel python python-pip \
|
||||
|
||||
#### CentOS/Fedora
|
||||
|
||||
Installing prerequisites on CentOS 7 or Fedora 25:
|
||||
Installing prerequisites on CentOS 8 or Fedora>26:
|
||||
|
||||
```
|
||||
sudo dnf install libtiff-devel libjpeg-devel libzip-devel freetype-devel \
|
||||
libwebp-devel tk-devel redhat-rpm-config \
|
||||
python3-virtualenv libffi-devel openssl-devel
|
||||
sudo dnf groupinstall "Development Tools"
|
||||
```
|
||||
|
||||
Installing prerequisites on CentOS 7 or Fedora<=25:
|
||||
|
||||
```
|
||||
sudo yum install libtiff-devel libjpeg-devel libzip-devel freetype-devel \
|
||||
lcms2-devel libwebp-devel tcl-devel tk-devel redhat-rpm-config \
|
||||
python-virtualenv libffi-devel openssl-devel
|
||||
python3-virtualenv libffi-devel openssl-devel
|
||||
sudo yum groupinstall "Development Tools"
|
||||
```
|
||||
|
||||
@@ -418,13 +427,12 @@ so, you will need to edit `homeserver.yaml`, as follows:
|
||||
for having Synapse automatically provision and renew federation
|
||||
certificates through ACME can be found at [ACME.md](docs/ACME.md).
|
||||
Note that, as pointed out in that document, this feature will not
|
||||
work with installs set up after November 2020.
|
||||
work with installs set up after November 2019.
|
||||
|
||||
If you are using your
|
||||
own certificate, be sure to use a `.pem` file that includes the full
|
||||
certificate chain including any intermediate certificates (for
|
||||
instance, if using certbot, use `fullchain.pem` as your certificate,
|
||||
not `cert.pem`).
|
||||
If you are using your own certificate, be sure to use a `.pem` file that
|
||||
includes the full certificate chain including any intermediate certificates
|
||||
(for instance, if using certbot, use `fullchain.pem` as your certificate, not
|
||||
`cert.pem`).
|
||||
|
||||
For a more detailed guide to configuring your server for federation, see
|
||||
[federate.md](docs/federate.md)
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
Admin API to add or modify threepids of user accounts.
|
||||
@@ -1 +0,0 @@
|
||||
Fixed third party event rules function `on_create_room`'s return value being ignored.
|
||||
@@ -1 +0,0 @@
|
||||
Add type hints to `SyncHandler`.
|
||||
@@ -1 +0,0 @@
|
||||
Refactoring work in preparation for changing the event redaction algorithm.
|
||||
@@ -1 +0,0 @@
|
||||
Allow URL-encoded User IDs on `/_synapse/admin/v2/users/<user_id>[/admin]` endpoints. Thanks to @NHAS for reporting.
|
||||
@@ -1 +0,0 @@
|
||||
Refactoring work in preparation for changing the event redaction algorithm.
|
||||
@@ -1 +0,0 @@
|
||||
Reducing log level to DEBUG for synapse.storage.TIME.
|
||||
@@ -1 +0,0 @@
|
||||
Change the default power levels of invites, tombstones and server ACLs for new rooms.
|
||||
@@ -1 +0,0 @@
|
||||
Fix stacktraces when using `ObservableDeferred` and async/await.
|
||||
@@ -1 +0,0 @@
|
||||
Port much of `synapse.handlers.federation` to async/await.
|
||||
@@ -1 +0,0 @@
|
||||
Port much of `synapse.handlers.federation` to async/await.
|
||||
@@ -1 +0,0 @@
|
||||
Fix an issue with cross-signing where device signatures were not sent to remote servers.
|
||||
@@ -1 +0,0 @@
|
||||
Add details of PR merge strategy to contributing docs.
|
||||
@@ -1 +0,0 @@
|
||||
Populate `rooms.room_version` database column at startup, rather than in a background update.
|
||||
@@ -1 +0,0 @@
|
||||
Fix Synapse refusing to start if `federation_certificate_verification_whitelist` option is blank.
|
||||
@@ -1 +0,0 @@
|
||||
Refactoring work in preparation for changing the event redaction algorithm.
|
||||
@@ -1 +0,0 @@
|
||||
Update pip install directiosn in readme to avoid error when using zsh.
|
||||
@@ -1 +0,0 @@
|
||||
Refactoring work in preparation for changing the event redaction algorithm.
|
||||
@@ -1 +0,0 @@
|
||||
Refactoring work in preparation for changing the event redaction algorithm.
|
||||
@@ -1 +0,0 @@
|
||||
Refactoring work in preparation for changing the event redaction algorithm.
|
||||
@@ -1 +0,0 @@
|
||||
Reduce amount we log at `INFO` level.
|
||||
@@ -1 +0,0 @@
|
||||
Limit the number of events that can be requested by the backfill federation API to 100.
|
||||
@@ -1 +0,0 @@
|
||||
Add ability to run some group APIs on workers.
|
||||
@@ -1 +0,0 @@
|
||||
Remove unused `get_room_stats_state` method.
|
||||
@@ -1 +0,0 @@
|
||||
Add typing to `synapse.federation.sender` and port to async/await.
|
||||
@@ -1 +0,0 @@
|
||||
Refactor _EventInternalMetadata object to improve type safety.
|
||||
@@ -1 +0,0 @@
|
||||
Add ability to route federation user device queries to workers.
|
||||
@@ -1 +0,0 @@
|
||||
Remove `m.lazy_load_members` from `unstable_features` since lazy loading is in the stable Client-Server API version r0.5.0.
|
||||
@@ -1 +0,0 @@
|
||||
Reject device display names over 100 characters in length.
|
||||
@@ -1 +0,0 @@
|
||||
Add an additional entry to the SyTest blacklist for worker mode.
|
||||
@@ -1 +0,0 @@
|
||||
Fix the use of sed in the linting scripts when using BSD sed.
|
||||
@@ -1 +0,0 @@
|
||||
The result of a user directory search can now be filtered via the spam checker.
|
||||
@@ -1 +0,0 @@
|
||||
Spell out that the last event sent to a room won't be deleted by a purge.
|
||||
@@ -1 +0,0 @@
|
||||
Return a 404 instead of 200 for querying information of a non-existant user through the admin API.
|
||||
@@ -1 +0,0 @@
|
||||
Stop sending alias events during adding / removing aliases. Check alt_aliases in the latest canonical aliases event when deleting an alias.
|
||||
@@ -1 +0,0 @@
|
||||
Update Synapse's documentation to warn about the deprecation of ACME v1.
|
||||
@@ -1 +0,0 @@
|
||||
Add documentation for the spam checker.
|
||||
@@ -1 +0,0 @@
|
||||
Update Synapse's documentation to warn about the deprecation of ACME v1.
|
||||
@@ -1 +0,0 @@
|
||||
Update Synapse's documentation to warn about the deprecation of ACME v1.
|
||||
@@ -1 +0,0 @@
|
||||
Add type hints to the spam checker module.
|
||||
@@ -1 +0,0 @@
|
||||
The deprecated "generate-config-on-the-fly" mode is no longer supported.
|
||||
@@ -1 +0,0 @@
|
||||
Convert the directory handler tests to use HomeserverTestCase.
|
||||
@@ -1 +0,0 @@
|
||||
Add a warning about indentation to generated configuration files.
|
||||
@@ -1 +0,0 @@
|
||||
Databases created using the compose file in contrib/docker will now always have correct encoding and locale settings. Contributed by Fridtjof Mund.
|
||||
@@ -1 +0,0 @@
|
||||
Increase DB/CPU perf of `_is_server_still_joined` check.
|
||||
@@ -1 +0,0 @@
|
||||
Increase perf of `get_auth_chain_ids` used in state res v2.
|
||||
@@ -1 +0,0 @@
|
||||
Fix worker docs to point `/publicised_groups` API correctly.
|
||||
@@ -1 +0,0 @@
|
||||
Implement `GET /_matrix/client/r0/rooms/{roomId}/aliases` endpoint as per [MSC2432](https://github.com/matrix-org/matrix-doc/pull/2432).
|
||||
@@ -1 +0,0 @@
|
||||
Clean up and update docs on setting up federation.
|
||||
@@ -1 +0,0 @@
|
||||
Fix errors from logging in the purge jobs related to the message retention policies support.
|
||||
@@ -1 +0,0 @@
|
||||
Increase perf of `get_auth_chain_ids` used in state res v2.
|
||||
@@ -1 +0,0 @@
|
||||
Implement `GET /_matrix/client/r0/rooms/{roomId}/aliases` endpoint as per [MSC2432](https://github.com/matrix-org/matrix-doc/pull/2432).
|
||||
@@ -1 +0,0 @@
|
||||
Implement `GET /_matrix/client/r0/rooms/{roomId}/aliases` endpoint as per [MSC2432](https://github.com/matrix-org/matrix-doc/pull/2432).
|
||||
@@ -1 +0,0 @@
|
||||
Tiny optimisation for incoming HTTP request dispatch.
|
||||
@@ -1 +0,0 @@
|
||||
Revert #6937.
|
||||
@@ -1 +0,0 @@
|
||||
Minor perf fixes to `get_auth_chain_ids`.
|
||||
@@ -15,10 +15,9 @@ services:
|
||||
restart: unless-stopped
|
||||
# See the readme for a full documentation of the environment settings
|
||||
environment:
|
||||
- SYNAPSE_CONFIG_PATH=/etc/homeserver.yaml
|
||||
- SYNAPSE_CONFIG_PATH=/data/homeserver.yaml
|
||||
volumes:
|
||||
# You may either store all the files in a local folder
|
||||
- ./matrix-config/homeserver.yaml:/etc/homeserver.yaml
|
||||
- ./files:/data
|
||||
# .. or you may split this between different storage points
|
||||
# - ./files:/data
|
||||
@@ -58,7 +57,7 @@ services:
|
||||
- POSTGRES_PASSWORD=changeme
|
||||
# ensure the database gets created correctly
|
||||
# https://github.com/matrix-org/synapse/blob/master/docs/postgres.md#set-up-database
|
||||
- POSTGRES_INITDB_ARGS="--encoding=UTF-8 --lc-collate=C --lc-ctype=C"
|
||||
- POSTGRES_INITDB_ARGS=--encoding=UTF-8 --lc-collate=C --lc-ctype=C
|
||||
volumes:
|
||||
# You may store the database tables in a local folder..
|
||||
- ./schemas:/var/lib/postgresql/data
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Using the Synapse Grafana dashboard
|
||||
|
||||
0. Set up Prometheus and Grafana. Out of scope for this readme. Useful documentation about using Grafana with Prometheus: http://docs.grafana.org/features/datasources/prometheus/
|
||||
1. Have your Prometheus scrape your Synapse. https://github.com/matrix-org/synapse/blob/master/docs/metrics-howto.rst
|
||||
1. Have your Prometheus scrape your Synapse. https://github.com/matrix-org/synapse/blob/master/docs/metrics-howto.md
|
||||
2. Import dashboard into Grafana. Download `synapse.json`. Import it to Grafana and select the correct Prometheus datasource. http://docs.grafana.org/reference/export_import/
|
||||
3. Set up additional recording rules
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
"gnetId": null,
|
||||
"graphTooltip": 0,
|
||||
"id": 1,
|
||||
"iteration": 1561447718159,
|
||||
"iteration": 1584612489167,
|
||||
"links": [
|
||||
{
|
||||
"asDropdown": true,
|
||||
@@ -34,6 +34,7 @@
|
||||
"panels": [
|
||||
{
|
||||
"collapsed": false,
|
||||
"datasource": null,
|
||||
"gridPos": {
|
||||
"h": 1,
|
||||
"w": 24,
|
||||
@@ -52,12 +53,14 @@
|
||||
"dashes": false,
|
||||
"datasource": "$datasource",
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"gridPos": {
|
||||
"h": 9,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 1
|
||||
},
|
||||
"hiddenSeries": false,
|
||||
"id": 75,
|
||||
"legend": {
|
||||
"avg": false,
|
||||
@@ -72,7 +75,9 @@
|
||||
"linewidth": 1,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {},
|
||||
"options": {
|
||||
"dataLinks": []
|
||||
},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -151,6 +156,7 @@
|
||||
"editable": true,
|
||||
"error": false,
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"grid": {},
|
||||
"gridPos": {
|
||||
"h": 9,
|
||||
@@ -158,6 +164,7 @@
|
||||
"x": 12,
|
||||
"y": 1
|
||||
},
|
||||
"hiddenSeries": false,
|
||||
"id": 33,
|
||||
"legend": {
|
||||
"avg": false,
|
||||
@@ -172,7 +179,9 @@
|
||||
"linewidth": 2,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {},
|
||||
"options": {
|
||||
"dataLinks": []
|
||||
},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -302,12 +311,14 @@
|
||||
"dashes": false,
|
||||
"datasource": "$datasource",
|
||||
"fill": 0,
|
||||
"fillGradient": 0,
|
||||
"gridPos": {
|
||||
"h": 9,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 10
|
||||
},
|
||||
"hiddenSeries": false,
|
||||
"id": 107,
|
||||
"legend": {
|
||||
"avg": false,
|
||||
@@ -322,7 +333,9 @@
|
||||
"linewidth": 1,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {},
|
||||
"options": {
|
||||
"dataLinks": []
|
||||
},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -425,12 +438,14 @@
|
||||
"dashes": false,
|
||||
"datasource": "$datasource",
|
||||
"fill": 0,
|
||||
"fillGradient": 0,
|
||||
"gridPos": {
|
||||
"h": 9,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 19
|
||||
},
|
||||
"hiddenSeries": false,
|
||||
"id": 118,
|
||||
"legend": {
|
||||
"avg": false,
|
||||
@@ -445,7 +460,9 @@
|
||||
"linewidth": 1,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {},
|
||||
"options": {
|
||||
"dataLinks": []
|
||||
},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -542,6 +559,7 @@
|
||||
},
|
||||
{
|
||||
"collapsed": true,
|
||||
"datasource": null,
|
||||
"gridPos": {
|
||||
"h": 1,
|
||||
"w": 24,
|
||||
@@ -1361,6 +1379,7 @@
|
||||
},
|
||||
{
|
||||
"collapsed": true,
|
||||
"datasource": null,
|
||||
"gridPos": {
|
||||
"h": 1,
|
||||
"w": 24,
|
||||
@@ -1732,6 +1751,7 @@
|
||||
},
|
||||
{
|
||||
"collapsed": true,
|
||||
"datasource": null,
|
||||
"gridPos": {
|
||||
"h": 1,
|
||||
"w": 24,
|
||||
@@ -2439,6 +2459,7 @@
|
||||
},
|
||||
{
|
||||
"collapsed": true,
|
||||
"datasource": null,
|
||||
"gridPos": {
|
||||
"h": 1,
|
||||
"w": 24,
|
||||
@@ -2635,6 +2656,7 @@
|
||||
},
|
||||
{
|
||||
"collapsed": true,
|
||||
"datasource": null,
|
||||
"gridPos": {
|
||||
"h": 1,
|
||||
"w": 24,
|
||||
@@ -2650,11 +2672,12 @@
|
||||
"dashes": false,
|
||||
"datasource": "$datasource",
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"gridPos": {
|
||||
"h": 9,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 61
|
||||
"y": 33
|
||||
},
|
||||
"id": 79,
|
||||
"legend": {
|
||||
@@ -2670,6 +2693,9 @@
|
||||
"linewidth": 1,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {
|
||||
"dataLinks": []
|
||||
},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -2684,8 +2710,13 @@
|
||||
"expr": "sum(rate(synapse_federation_client_sent_transactions{instance=\"$instance\"}[$bucket_size]))",
|
||||
"format": "time_series",
|
||||
"intervalFactor": 1,
|
||||
"legendFormat": "txn rate",
|
||||
"legendFormat": "successful txn rate",
|
||||
"refId": "A"
|
||||
},
|
||||
{
|
||||
"expr": "sum(rate(synapse_util_metrics_block_count{block_name=\"_send_new_transaction\",instance=\"$instance\"}[$bucket_size]) - ignoring (block_name) rate(synapse_federation_client_sent_transactions{instance=\"$instance\"}[$bucket_size]))",
|
||||
"legendFormat": "failed txn rate",
|
||||
"refId": "B"
|
||||
}
|
||||
],
|
||||
"thresholds": [],
|
||||
@@ -2736,11 +2767,12 @@
|
||||
"dashes": false,
|
||||
"datasource": "$datasource",
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"gridPos": {
|
||||
"h": 9,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 61
|
||||
"y": 33
|
||||
},
|
||||
"id": 83,
|
||||
"legend": {
|
||||
@@ -2756,6 +2788,9 @@
|
||||
"linewidth": 1,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {
|
||||
"dataLinks": []
|
||||
},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -2829,11 +2864,12 @@
|
||||
"dashes": false,
|
||||
"datasource": "$datasource",
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"gridPos": {
|
||||
"h": 9,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 70
|
||||
"y": 42
|
||||
},
|
||||
"id": 109,
|
||||
"legend": {
|
||||
@@ -2849,6 +2885,9 @@
|
||||
"linewidth": 1,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {
|
||||
"dataLinks": []
|
||||
},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -2923,11 +2962,12 @@
|
||||
"dashes": false,
|
||||
"datasource": "$datasource",
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"gridPos": {
|
||||
"h": 9,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 70
|
||||
"y": 42
|
||||
},
|
||||
"id": 111,
|
||||
"legend": {
|
||||
@@ -2943,6 +2983,9 @@
|
||||
"linewidth": 1,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {
|
||||
"dataLinks": []
|
||||
},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -3009,6 +3052,7 @@
|
||||
},
|
||||
{
|
||||
"collapsed": true,
|
||||
"datasource": null,
|
||||
"gridPos": {
|
||||
"h": 1,
|
||||
"w": 24,
|
||||
@@ -3024,12 +3068,14 @@
|
||||
"dashes": false,
|
||||
"datasource": "$datasource",
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"gridPos": {
|
||||
"h": 7,
|
||||
"h": 8,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 62
|
||||
"y": 34
|
||||
},
|
||||
"hiddenSeries": false,
|
||||
"id": 51,
|
||||
"legend": {
|
||||
"avg": false,
|
||||
@@ -3044,6 +3090,9 @@
|
||||
"linewidth": 1,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {
|
||||
"dataLinks": []
|
||||
},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -3112,6 +3161,95 @@
|
||||
"align": false,
|
||||
"alignLevel": null
|
||||
}
|
||||
},
|
||||
{
|
||||
"aliasColors": {},
|
||||
"bars": false,
|
||||
"dashLength": 10,
|
||||
"dashes": false,
|
||||
"datasource": "$datasource",
|
||||
"description": "",
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"gridPos": {
|
||||
"h": 8,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 34
|
||||
},
|
||||
"hiddenSeries": false,
|
||||
"id": 134,
|
||||
"legend": {
|
||||
"avg": false,
|
||||
"current": false,
|
||||
"hideZero": false,
|
||||
"max": false,
|
||||
"min": false,
|
||||
"show": true,
|
||||
"total": false,
|
||||
"values": false
|
||||
},
|
||||
"lines": true,
|
||||
"linewidth": 1,
|
||||
"nullPointMode": "null",
|
||||
"options": {
|
||||
"dataLinks": []
|
||||
},
|
||||
"percentage": false,
|
||||
"pointradius": 2,
|
||||
"points": false,
|
||||
"renderer": "flot",
|
||||
"seriesOverrides": [],
|
||||
"spaceLength": 10,
|
||||
"stack": false,
|
||||
"steppedLine": false,
|
||||
"targets": [
|
||||
{
|
||||
"expr": "topk(10,synapse_pushers{job=~\"$job\",index=~\"$index\", instance=\"$instance\"})",
|
||||
"legendFormat": "{{kind}} {{app_id}}",
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"thresholds": [],
|
||||
"timeFrom": null,
|
||||
"timeRegions": [],
|
||||
"timeShift": null,
|
||||
"title": "Active pusher instances by app",
|
||||
"tooltip": {
|
||||
"shared": false,
|
||||
"sort": 2,
|
||||
"value_type": "individual"
|
||||
},
|
||||
"type": "graph",
|
||||
"xaxis": {
|
||||
"buckets": null,
|
||||
"mode": "time",
|
||||
"name": null,
|
||||
"show": true,
|
||||
"values": []
|
||||
},
|
||||
"yaxes": [
|
||||
{
|
||||
"format": "short",
|
||||
"label": null,
|
||||
"logBase": 1,
|
||||
"max": null,
|
||||
"min": null,
|
||||
"show": true
|
||||
},
|
||||
{
|
||||
"format": "short",
|
||||
"label": null,
|
||||
"logBase": 1,
|
||||
"max": null,
|
||||
"min": null,
|
||||
"show": true
|
||||
}
|
||||
],
|
||||
"yaxis": {
|
||||
"align": false,
|
||||
"alignLevel": null
|
||||
}
|
||||
}
|
||||
],
|
||||
"repeat": null,
|
||||
@@ -3120,6 +3258,7 @@
|
||||
},
|
||||
{
|
||||
"collapsed": true,
|
||||
"datasource": null,
|
||||
"gridPos": {
|
||||
"h": 1,
|
||||
"w": 24,
|
||||
@@ -3523,6 +3662,7 @@
|
||||
},
|
||||
{
|
||||
"collapsed": true,
|
||||
"datasource": null,
|
||||
"gridPos": {
|
||||
"h": 1,
|
||||
"w": 24,
|
||||
@@ -3540,6 +3680,7 @@
|
||||
"editable": true,
|
||||
"error": false,
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"grid": {},
|
||||
"gridPos": {
|
||||
"h": 13,
|
||||
@@ -3562,6 +3703,9 @@
|
||||
"linewidth": 2,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {
|
||||
"dataLinks": []
|
||||
},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -3630,6 +3774,7 @@
|
||||
"editable": true,
|
||||
"error": false,
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"grid": {},
|
||||
"gridPos": {
|
||||
"h": 13,
|
||||
@@ -3652,6 +3797,9 @@
|
||||
"linewidth": 2,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {
|
||||
"dataLinks": []
|
||||
},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -3720,6 +3868,7 @@
|
||||
"editable": true,
|
||||
"error": false,
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"grid": {},
|
||||
"gridPos": {
|
||||
"h": 13,
|
||||
@@ -3742,6 +3891,9 @@
|
||||
"linewidth": 2,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {
|
||||
"dataLinks": []
|
||||
},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -3810,6 +3962,7 @@
|
||||
"editable": true,
|
||||
"error": false,
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"grid": {},
|
||||
"gridPos": {
|
||||
"h": 13,
|
||||
@@ -3832,6 +3985,9 @@
|
||||
"linewidth": 2,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {
|
||||
"dataLinks": []
|
||||
},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -3921,6 +4077,7 @@
|
||||
"linewidth": 2,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -4010,6 +4167,7 @@
|
||||
"linewidth": 2,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -4076,6 +4234,7 @@
|
||||
},
|
||||
{
|
||||
"collapsed": true,
|
||||
"datasource": null,
|
||||
"gridPos": {
|
||||
"h": 1,
|
||||
"w": 24,
|
||||
@@ -4540,6 +4699,7 @@
|
||||
},
|
||||
{
|
||||
"collapsed": true,
|
||||
"datasource": null,
|
||||
"gridPos": {
|
||||
"h": 1,
|
||||
"w": 24,
|
||||
@@ -5060,6 +5220,7 @@
|
||||
},
|
||||
{
|
||||
"collapsed": true,
|
||||
"datasource": null,
|
||||
"gridPos": {
|
||||
"h": 1,
|
||||
"w": 24,
|
||||
@@ -5079,7 +5240,7 @@
|
||||
"h": 7,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 67
|
||||
"y": 39
|
||||
},
|
||||
"id": 2,
|
||||
"legend": {
|
||||
@@ -5095,6 +5256,7 @@
|
||||
"linewidth": 1,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -5198,7 +5360,7 @@
|
||||
"h": 7,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 67
|
||||
"y": 39
|
||||
},
|
||||
"id": 41,
|
||||
"legend": {
|
||||
@@ -5214,6 +5376,7 @@
|
||||
"linewidth": 1,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -5286,7 +5449,7 @@
|
||||
"h": 7,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 74
|
||||
"y": 46
|
||||
},
|
||||
"id": 42,
|
||||
"legend": {
|
||||
@@ -5302,6 +5465,7 @@
|
||||
"linewidth": 1,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -5373,7 +5537,7 @@
|
||||
"h": 7,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 74
|
||||
"y": 46
|
||||
},
|
||||
"id": 43,
|
||||
"legend": {
|
||||
@@ -5389,6 +5553,7 @@
|
||||
"linewidth": 1,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -5460,7 +5625,7 @@
|
||||
"h": 7,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 81
|
||||
"y": 53
|
||||
},
|
||||
"id": 113,
|
||||
"legend": {
|
||||
@@ -5476,6 +5641,7 @@
|
||||
"linewidth": 1,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -5546,7 +5712,7 @@
|
||||
"h": 7,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 81
|
||||
"y": 53
|
||||
},
|
||||
"id": 115,
|
||||
"legend": {
|
||||
@@ -5562,6 +5728,7 @@
|
||||
"linewidth": 1,
|
||||
"links": [],
|
||||
"nullPointMode": "null",
|
||||
"options": {},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -5573,7 +5740,7 @@
|
||||
"steppedLine": false,
|
||||
"targets": [
|
||||
{
|
||||
"expr": "rate(synapse_replication_tcp_protocol_close_reason{job=\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size])",
|
||||
"expr": "rate(synapse_replication_tcp_protocol_close_reason{job=~\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size])",
|
||||
"format": "time_series",
|
||||
"intervalFactor": 1,
|
||||
"legendFormat": "{{job}}-{{index}} {{reason_type}}",
|
||||
@@ -5628,6 +5795,7 @@
|
||||
},
|
||||
{
|
||||
"collapsed": true,
|
||||
"datasource": null,
|
||||
"gridPos": {
|
||||
"h": 1,
|
||||
"w": 24,
|
||||
@@ -5643,11 +5811,12 @@
|
||||
"dashes": false,
|
||||
"datasource": "$datasource",
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"gridPos": {
|
||||
"h": 9,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 13
|
||||
"y": 40
|
||||
},
|
||||
"id": 67,
|
||||
"legend": {
|
||||
@@ -5663,7 +5832,9 @@
|
||||
"linewidth": 1,
|
||||
"links": [],
|
||||
"nullPointMode": "connected",
|
||||
"options": {},
|
||||
"options": {
|
||||
"dataLinks": []
|
||||
},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -5679,7 +5850,7 @@
|
||||
"format": "time_series",
|
||||
"interval": "",
|
||||
"intervalFactor": 1,
|
||||
"legendFormat": "{{job}}-{{index}} ",
|
||||
"legendFormat": "{{job}}-{{index}} {{name}}",
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
@@ -5731,11 +5902,12 @@
|
||||
"dashes": false,
|
||||
"datasource": "$datasource",
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"gridPos": {
|
||||
"h": 9,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 13
|
||||
"y": 40
|
||||
},
|
||||
"id": 71,
|
||||
"legend": {
|
||||
@@ -5751,7 +5923,9 @@
|
||||
"linewidth": 1,
|
||||
"links": [],
|
||||
"nullPointMode": "connected",
|
||||
"options": {},
|
||||
"options": {
|
||||
"dataLinks": []
|
||||
},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -5819,11 +5993,12 @@
|
||||
"dashes": false,
|
||||
"datasource": "$datasource",
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"gridPos": {
|
||||
"h": 9,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 22
|
||||
"y": 49
|
||||
},
|
||||
"id": 121,
|
||||
"interval": "",
|
||||
@@ -5840,7 +6015,9 @@
|
||||
"linewidth": 1,
|
||||
"links": [],
|
||||
"nullPointMode": "connected",
|
||||
"options": {},
|
||||
"options": {
|
||||
"dataLinks": []
|
||||
},
|
||||
"paceLength": 10,
|
||||
"percentage": false,
|
||||
"pointradius": 5,
|
||||
@@ -5909,6 +6086,7 @@
|
||||
},
|
||||
{
|
||||
"collapsed": true,
|
||||
"datasource": null,
|
||||
"gridPos": {
|
||||
"h": 1,
|
||||
"w": 24,
|
||||
@@ -6607,7 +6785,7 @@
|
||||
}
|
||||
],
|
||||
"refresh": "5m",
|
||||
"schemaVersion": 18,
|
||||
"schemaVersion": 22,
|
||||
"style": "dark",
|
||||
"tags": [
|
||||
"matrix"
|
||||
@@ -6616,7 +6794,7 @@
|
||||
"list": [
|
||||
{
|
||||
"current": {
|
||||
"tags": [],
|
||||
"selected": true,
|
||||
"text": "Prometheus",
|
||||
"value": "Prometheus"
|
||||
},
|
||||
@@ -6638,6 +6816,7 @@
|
||||
"auto_count": 100,
|
||||
"auto_min": "30s",
|
||||
"current": {
|
||||
"selected": false,
|
||||
"text": "auto",
|
||||
"value": "$__auto_interval_bucket_size"
|
||||
},
|
||||
@@ -6719,9 +6898,9 @@
|
||||
"allFormat": "regex wildcard",
|
||||
"allValue": "",
|
||||
"current": {
|
||||
"text": "All",
|
||||
"text": "synapse",
|
||||
"value": [
|
||||
"$__all"
|
||||
"synapse"
|
||||
]
|
||||
},
|
||||
"datasource": "$datasource",
|
||||
@@ -6751,7 +6930,9 @@
|
||||
"allValue": ".*",
|
||||
"current": {
|
||||
"text": "All",
|
||||
"value": "$__all"
|
||||
"value": [
|
||||
"$__all"
|
||||
]
|
||||
},
|
||||
"datasource": "$datasource",
|
||||
"definition": "",
|
||||
@@ -6810,5 +6991,5 @@
|
||||
"timezone": "",
|
||||
"title": "Synapse",
|
||||
"uid": "000000012",
|
||||
"version": 10
|
||||
"version": 19
|
||||
}
|
||||
18
debian/changelog
vendored
18
debian/changelog
vendored
@@ -1,3 +1,21 @@
|
||||
matrix-synapse-py3 (1.12.0) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.12.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Mon, 23 Mar 2020 12:13:03 +0000
|
||||
|
||||
matrix-synapse-py3 (1.11.1) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.11.1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 03 Mar 2020 15:01:22 +0000
|
||||
|
||||
matrix-synapse-py3 (1.11.0) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.11.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Fri, 21 Feb 2020 08:54:34 +0000
|
||||
|
||||
matrix-synapse-py3 (1.10.1) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.10.1.
|
||||
|
||||
@@ -38,6 +38,7 @@ The parameter ``threepids`` is optional.
|
||||
The parameter ``avatar_url`` is optional.
|
||||
The parameter ``admin`` is optional and defaults to 'false'.
|
||||
The parameter ``deactivated`` is optional and defaults to 'false'.
|
||||
The parameter ``password`` is optional. If provided the user's password is updated and all devices are logged out.
|
||||
If the user already exists then optional parameters default to the current value.
|
||||
|
||||
List Accounts
|
||||
@@ -168,11 +169,14 @@ with a body of:
|
||||
.. code:: json
|
||||
|
||||
{
|
||||
"new_password": "<secret>"
|
||||
"new_password": "<secret>",
|
||||
"logout_devices": true,
|
||||
}
|
||||
|
||||
including an ``access_token`` of a server admin.
|
||||
|
||||
The parameter ``new_password`` is required.
|
||||
The parameter ``logout_devices`` is optional and defaults to ``true``.
|
||||
|
||||
Get whether a user is a server administrator or not
|
||||
===================================================
|
||||
|
||||
@@ -30,7 +30,7 @@ The necessary tools are detailed below.
|
||||
|
||||
Install `flake8` with:
|
||||
|
||||
pip install --upgrade flake8
|
||||
pip install --upgrade flake8 flake8-comprehensions
|
||||
|
||||
Check all application and test code with:
|
||||
|
||||
|
||||
@@ -1347,6 +1347,25 @@ saml2_config:
|
||||
#
|
||||
#grandfathered_mxid_source_attribute: upn
|
||||
|
||||
# Directory in which Synapse will try to find the template files below.
|
||||
# If not set, default templates from within the Synapse package will be used.
|
||||
#
|
||||
# DO NOT UNCOMMENT THIS SETTING unless you want to customise the templates.
|
||||
# If you *do* uncomment it, you will need to make sure that all the templates
|
||||
# below are in the directory.
|
||||
#
|
||||
# Synapse will look for the following templates in this directory:
|
||||
#
|
||||
# * HTML page to display to users if something goes wrong during the
|
||||
# authentication process: 'saml_error.html'.
|
||||
#
|
||||
# This template doesn't currently need any variable to render.
|
||||
#
|
||||
# You can see the default templates at:
|
||||
# https://github.com/matrix-org/synapse/tree/master/synapse/res/templates
|
||||
#
|
||||
#template_dir: "res/templates"
|
||||
|
||||
|
||||
|
||||
# Enable CAS for registration and login.
|
||||
@@ -1360,6 +1379,56 @@ saml2_config:
|
||||
# # name: value
|
||||
|
||||
|
||||
# Additional settings to use with single-sign on systems such as SAML2 and CAS.
|
||||
#
|
||||
sso:
|
||||
# A list of client URLs which are whitelisted so that the user does not
|
||||
# have to confirm giving access to their account to the URL. Any client
|
||||
# whose URL starts with an entry in the following list will not be subject
|
||||
# to an additional confirmation step after the SSO login is completed.
|
||||
#
|
||||
# WARNING: An entry such as "https://my.client" is insecure, because it
|
||||
# will also match "https://my.client.evil.site", exposing your users to
|
||||
# phishing attacks from evil.site. To avoid this, include a slash after the
|
||||
# hostname: "https://my.client/".
|
||||
#
|
||||
# By default, this list is empty.
|
||||
#
|
||||
#client_whitelist:
|
||||
# - https://riot.im/develop
|
||||
# - https://my.custom.client/
|
||||
|
||||
# Directory in which Synapse will try to find the template files below.
|
||||
# If not set, default templates from within the Synapse package will be used.
|
||||
#
|
||||
# DO NOT UNCOMMENT THIS SETTING unless you want to customise the templates.
|
||||
# If you *do* uncomment it, you will need to make sure that all the templates
|
||||
# below are in the directory.
|
||||
#
|
||||
# Synapse will look for the following templates in this directory:
|
||||
#
|
||||
# * HTML page for a confirmation step before redirecting back to the client
|
||||
# with the login token: 'sso_redirect_confirm.html'.
|
||||
#
|
||||
# When rendering, this template is given three variables:
|
||||
# * redirect_url: the URL the user is about to be redirected to. Needs
|
||||
# manual escaping (see
|
||||
# https://jinja.palletsprojects.com/en/2.11.x/templates/#html-escaping).
|
||||
#
|
||||
# * display_url: the same as `redirect_url`, but with the query
|
||||
# parameters stripped. The intention is to have a
|
||||
# human-readable URL to show to users, not to use it as
|
||||
# the final address to redirect to. Needs manual escaping
|
||||
# (see https://jinja.palletsprojects.com/en/2.11.x/templates/#html-escaping).
|
||||
#
|
||||
# * server_name: the homeserver's name.
|
||||
#
|
||||
# You can see the default templates at:
|
||||
# https://github.com/matrix-org/synapse/tree/master/synapse/res/templates
|
||||
#
|
||||
#template_dir: "res/templates"
|
||||
|
||||
|
||||
# The JWT needs to contain a globally unique "sub" (subject) claim.
|
||||
#
|
||||
#jwt_config:
|
||||
@@ -1409,10 +1478,6 @@ email:
|
||||
#
|
||||
#require_transport_security: true
|
||||
|
||||
# Enable sending emails for messages that the user has missed
|
||||
#
|
||||
#enable_notifs: false
|
||||
|
||||
# notif_from defines the "From" address to use when sending emails.
|
||||
# It must be set if email sending is enabled.
|
||||
#
|
||||
@@ -1430,6 +1495,11 @@ email:
|
||||
#
|
||||
#app_name: my_branded_matrix_server
|
||||
|
||||
# Uncomment the following to enable sending emails for messages that the user
|
||||
# has missed. Disabled by default.
|
||||
#
|
||||
#enable_notifs: true
|
||||
|
||||
# Uncomment the following to disable automatic subscription to email
|
||||
# notifications for new users. Enabled by default.
|
||||
#
|
||||
|
||||
@@ -273,6 +273,7 @@ Additionally, the following REST endpoints can be handled, but all requests must
|
||||
be routed to the same instance:
|
||||
|
||||
^/_matrix/client/(r0|unstable)/register$
|
||||
^/_matrix/client/(r0|unstable)/auth/.*/fallback/web$
|
||||
|
||||
Pagination requests can also be handled, but all requests with the same path
|
||||
room must be routed to the same instance. Additionally, care must be taken to
|
||||
|
||||
@@ -103,7 +103,7 @@ def main():
|
||||
|
||||
yaml.safe_dump(result, sys.stdout, default_flow_style=False)
|
||||
|
||||
rows = list(row for server, json in result.items() for row in rows_v2(server, json))
|
||||
rows = [row for server, json in result.items() for row in rows_v2(server, json)]
|
||||
|
||||
cursor = connection.cursor()
|
||||
cursor.executemany(
|
||||
|
||||
@@ -1,20 +1,31 @@
|
||||
name: matrix-synapse
|
||||
base: core18
|
||||
version: git
|
||||
version: git
|
||||
summary: Reference Matrix homeserver
|
||||
description: |
|
||||
Synapse is the reference Matrix homeserver.
|
||||
Matrix is a federated and decentralised instant messaging and VoIP system.
|
||||
|
||||
grade: stable
|
||||
confinement: strict
|
||||
grade: stable
|
||||
confinement: strict
|
||||
|
||||
apps:
|
||||
matrix-synapse:
|
||||
matrix-synapse:
|
||||
command: synctl --no-daemonize start $SNAP_COMMON/homeserver.yaml
|
||||
stop-command: synctl -c $SNAP_COMMON stop
|
||||
plugs: [network-bind, network]
|
||||
daemon: simple
|
||||
daemon: simple
|
||||
hash-password:
|
||||
command: hash_password
|
||||
generate-config:
|
||||
command: generate_config
|
||||
generate-signing-key:
|
||||
command: generate_signing_key.py
|
||||
register-new-matrix-user:
|
||||
command: register_new_matrix_user
|
||||
plugs: [network]
|
||||
synctl:
|
||||
command: synctl
|
||||
parts:
|
||||
matrix-synapse:
|
||||
source: .
|
||||
|
||||
@@ -36,7 +36,7 @@ try:
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
__version__ = "1.10.1"
|
||||
__version__ = "1.12.0"
|
||||
|
||||
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
||||
# We import here so that we don't have to install a bunch of deps when
|
||||
|
||||
@@ -538,13 +538,13 @@ class Auth(object):
|
||||
return defer.succeed(auth_ids)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_can_change_room_list(self, room_id, user):
|
||||
"""Check if the user is allowed to edit the room's entry in the
|
||||
def check_can_change_room_list(self, room_id: str, user: UserID):
|
||||
"""Determine whether the user is allowed to edit the room's entry in the
|
||||
published room list.
|
||||
|
||||
Args:
|
||||
room_id (str)
|
||||
user (UserID)
|
||||
room_id
|
||||
user
|
||||
"""
|
||||
|
||||
is_admin = yield self.is_server_admin(user)
|
||||
@@ -556,7 +556,7 @@ class Auth(object):
|
||||
|
||||
# We currently require the user is a "moderator" in the room. We do this
|
||||
# by checking if they would (theoretically) be able to change the
|
||||
# m.room.aliases events
|
||||
# m.room.canonical_alias events
|
||||
power_level_event = yield self.state.get_current_state(
|
||||
room_id, EventTypes.PowerLevels, ""
|
||||
)
|
||||
@@ -566,16 +566,11 @@ class Auth(object):
|
||||
auth_events[(EventTypes.PowerLevels, "")] = power_level_event
|
||||
|
||||
send_level = event_auth.get_send_level(
|
||||
EventTypes.Aliases, "", power_level_event
|
||||
EventTypes.CanonicalAlias, "", power_level_event
|
||||
)
|
||||
user_level = event_auth.get_user_power_level(user_id, auth_events)
|
||||
|
||||
if user_level < send_level:
|
||||
raise AuthError(
|
||||
403,
|
||||
"This server requires you to be a moderator in the room to"
|
||||
" edit its room list entry",
|
||||
)
|
||||
return user_level >= send_level
|
||||
|
||||
@staticmethod
|
||||
def has_access_token(request):
|
||||
|
||||
@@ -66,6 +66,7 @@ class Codes(object):
|
||||
EXPIRED_ACCOUNT = "ORG_MATRIX_EXPIRED_ACCOUNT"
|
||||
INVALID_SIGNATURE = "M_INVALID_SIGNATURE"
|
||||
USER_DEACTIVATED = "M_USER_DEACTIVATED"
|
||||
BAD_ALIAS = "M_BAD_ALIAS"
|
||||
|
||||
|
||||
class CodeMessageException(RuntimeError):
|
||||
|
||||
@@ -57,7 +57,7 @@ class RoomVersion(object):
|
||||
state_res = attr.ib() # int; one of the StateResolutionVersions
|
||||
enforce_key_validity = attr.ib() # bool
|
||||
|
||||
# bool: before MSC2260, anyone was allowed to send an aliases event
|
||||
# bool: before MSC2261/MSC2432, m.room.aliases had special auth rules and redaction rules
|
||||
special_case_aliases_auth = attr.ib(type=bool, default=False)
|
||||
|
||||
|
||||
@@ -102,12 +102,13 @@ class RoomVersions(object):
|
||||
enforce_key_validity=True,
|
||||
special_case_aliases_auth=True,
|
||||
)
|
||||
MSC2260_DEV = RoomVersion(
|
||||
"org.matrix.msc2260",
|
||||
MSC2432_DEV = RoomVersion(
|
||||
"org.matrix.msc2432",
|
||||
RoomDisposition.UNSTABLE,
|
||||
EventFormatVersions.V3,
|
||||
StateResolutionVersions.V2,
|
||||
enforce_key_validity=True,
|
||||
special_case_aliases_auth=False,
|
||||
)
|
||||
|
||||
|
||||
@@ -119,6 +120,6 @@ KNOWN_ROOM_VERSIONS = {
|
||||
RoomVersions.V3,
|
||||
RoomVersions.V4,
|
||||
RoomVersions.V5,
|
||||
RoomVersions.MSC2260_DEV,
|
||||
RoomVersions.MSC2432_DEV,
|
||||
)
|
||||
} # type: Dict[str, RoomVersion]
|
||||
|
||||
@@ -141,7 +141,7 @@ def start_reactor(
|
||||
|
||||
def quit_with_error(error_string):
|
||||
message_lines = error_string.split("\n")
|
||||
line_length = max([len(l) for l in message_lines if len(l) < 80]) + 2
|
||||
line_length = max(len(l) for l in message_lines if len(l) < 80) + 2
|
||||
sys.stderr.write("*" * line_length + "\n")
|
||||
for line in message_lines:
|
||||
sys.stderr.write(" %s\n" % (line.rstrip(),))
|
||||
@@ -276,9 +276,19 @@ def start(hs, listeners=None):
|
||||
# It is now safe to start your Synapse.
|
||||
hs.start_listening(listeners)
|
||||
hs.get_datastore().db.start_profiling()
|
||||
hs.get_pusherpool().start()
|
||||
|
||||
setup_sentry(hs)
|
||||
setup_sdnotify(hs)
|
||||
|
||||
# We now freeze all allocated objects in the hopes that (almost)
|
||||
# everything currently allocated are things that will be used for the
|
||||
# rest of time. Doing so means less work each GC (hopefully).
|
||||
#
|
||||
# This only works on Python 3.7
|
||||
if sys.version_info >= (3, 7):
|
||||
gc.collect()
|
||||
gc.freeze()
|
||||
except Exception:
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
reactor = hs.get_reactor()
|
||||
|
||||
@@ -13,161 +13,11 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
import sys
|
||||
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
import synapse
|
||||
from synapse import events
|
||||
from synapse.app import _base
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext, run_in_background
|
||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.directory import DirectoryStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.server import HomeServer
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
logger = logging.getLogger("synapse.app.appservice")
|
||||
|
||||
|
||||
class AppserviceSlaveStore(
|
||||
DirectoryStore,
|
||||
SlavedEventStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
class AppserviceServer(HomeServer):
|
||||
DATASTORE_CLASS = AppserviceSlaveStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
bind_addresses = listener_config["bind_addresses"]
|
||||
site_tag = listener_config.get("tag", port)
|
||||
resources = {}
|
||||
for res in listener_config["resources"]:
|
||||
for name in res["names"]:
|
||||
if name == "metrics":
|
||||
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
||||
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
_base.listen_tcp(
|
||||
bind_addresses,
|
||||
port,
|
||||
SynapseSite(
|
||||
"synapse.access.http.%s" % (site_tag,),
|
||||
site_tag,
|
||||
listener_config,
|
||||
root_resource,
|
||||
self.version_string,
|
||||
),
|
||||
)
|
||||
|
||||
logger.info("Synapse appservice now listening on port %d", port)
|
||||
|
||||
def start_listening(self, listeners):
|
||||
for listener in listeners:
|
||||
if listener["type"] == "http":
|
||||
self._listen_http(listener)
|
||||
elif listener["type"] == "manhole":
|
||||
_base.listen_tcp(
|
||||
listener["bind_addresses"],
|
||||
listener["port"],
|
||||
manhole(
|
||||
username="matrix", password="rabbithole", globals={"hs": self}
|
||||
),
|
||||
)
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warning(
|
||||
(
|
||||
"Metrics listener configured, but "
|
||||
"enable_metrics is not True!"
|
||||
)
|
||||
)
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"], listener["port"])
|
||||
else:
|
||||
logger.warning("Unrecognized listener type: %s", listener["type"])
|
||||
|
||||
self.get_tcp_replication().start_replication(self)
|
||||
|
||||
def build_tcp_replication(self):
|
||||
return ASReplicationHandler(self)
|
||||
|
||||
|
||||
class ASReplicationHandler(ReplicationClientHandler):
|
||||
def __init__(self, hs):
|
||||
super(ASReplicationHandler, self).__init__(hs.get_datastore())
|
||||
self.appservice_handler = hs.get_application_service_handler()
|
||||
|
||||
async def on_rdata(self, stream_name, token, rows):
|
||||
await super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
|
||||
if stream_name == "events":
|
||||
max_stream_id = self.store.get_room_max_stream_ordering()
|
||||
run_in_background(self._notify_app_services, max_stream_id)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _notify_app_services(self, room_stream_id):
|
||||
try:
|
||||
yield self.appservice_handler.notify_interested_services(room_stream_id)
|
||||
except Exception:
|
||||
logger.exception("Error notifying application services of event")
|
||||
|
||||
|
||||
def start(config_options):
|
||||
try:
|
||||
config = HomeServerConfig.load_config("Synapse appservice", config_options)
|
||||
except ConfigError as e:
|
||||
sys.stderr.write("\n" + str(e) + "\n")
|
||||
sys.exit(1)
|
||||
|
||||
assert config.worker_app == "synapse.app.appservice"
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
if config.notify_appservices:
|
||||
sys.stderr.write(
|
||||
"\nThe appservices must be disabled in the main synapse process"
|
||||
"\nbefore they can be run in a separate worker."
|
||||
"\nPlease add ``notify_appservices: false`` to the main config"
|
||||
"\n"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
# Force the pushers to start since they will be disabled in the main config
|
||||
config.notify_appservices = True
|
||||
|
||||
ps = AppserviceServer(
|
||||
config.server_name,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
)
|
||||
|
||||
setup_logging(ps, config, use_worker_options=True)
|
||||
|
||||
ps.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ps, config.worker_listeners
|
||||
)
|
||||
|
||||
_base.start_worker_reactor("synapse-appservice", config)
|
||||
|
||||
from synapse.app.generic_worker import start
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
if __name__ == "__main__":
|
||||
with LoggingContext("main"):
|
||||
|
||||
@@ -13,195 +13,11 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
import sys
|
||||
|
||||
from twisted.internet import reactor
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
import synapse
|
||||
from synapse import events
|
||||
from synapse.app import _base
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.http.server import JsonResource
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
|
||||
from synapse.replication.slave.storage.devices import SlavedDeviceStore
|
||||
from synapse.replication.slave.storage.directory import DirectoryStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
|
||||
from synapse.replication.slave.storage.keys import SlavedKeyStore
|
||||
from synapse.replication.slave.storage.profile import SlavedProfileStore
|
||||
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
|
||||
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.slave.storage.room import RoomStore
|
||||
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.rest.client.v1.login import LoginRestServlet
|
||||
from synapse.rest.client.v1.push_rule import PushRuleRestServlet
|
||||
from synapse.rest.client.v1.room import (
|
||||
JoinedRoomMemberListRestServlet,
|
||||
PublicRoomListRestServlet,
|
||||
RoomEventContextServlet,
|
||||
RoomMemberListRestServlet,
|
||||
RoomMessageListRestServlet,
|
||||
RoomStateRestServlet,
|
||||
)
|
||||
from synapse.rest.client.v1.voip import VoipRestServlet
|
||||
from synapse.rest.client.v2_alpha import groups
|
||||
from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
|
||||
from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
|
||||
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
|
||||
from synapse.rest.client.versions import VersionsRestServlet
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.data_stores.main.monthly_active_users import (
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
)
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
logger = logging.getLogger("synapse.app.client_reader")
|
||||
|
||||
|
||||
class ClientReaderSlavedStore(
|
||||
SlavedDeviceInboxStore,
|
||||
SlavedDeviceStore,
|
||||
SlavedReceiptsStore,
|
||||
SlavedPushRuleStore,
|
||||
SlavedGroupServerStore,
|
||||
SlavedAccountDataStore,
|
||||
SlavedEventStore,
|
||||
SlavedKeyStore,
|
||||
RoomStore,
|
||||
DirectoryStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
SlavedTransactionStore,
|
||||
SlavedProfileStore,
|
||||
SlavedClientIpStore,
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
BaseSlavedStore,
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
class ClientReaderServer(HomeServer):
|
||||
DATASTORE_CLASS = ClientReaderSlavedStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
bind_addresses = listener_config["bind_addresses"]
|
||||
site_tag = listener_config.get("tag", port)
|
||||
resources = {}
|
||||
for res in listener_config["resources"]:
|
||||
for name in res["names"]:
|
||||
if name == "metrics":
|
||||
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
||||
elif name == "client":
|
||||
resource = JsonResource(self, canonical_json=False)
|
||||
|
||||
PublicRoomListRestServlet(self).register(resource)
|
||||
RoomMemberListRestServlet(self).register(resource)
|
||||
JoinedRoomMemberListRestServlet(self).register(resource)
|
||||
RoomStateRestServlet(self).register(resource)
|
||||
RoomEventContextServlet(self).register(resource)
|
||||
RoomMessageListRestServlet(self).register(resource)
|
||||
RegisterRestServlet(self).register(resource)
|
||||
LoginRestServlet(self).register(resource)
|
||||
ThreepidRestServlet(self).register(resource)
|
||||
KeyQueryServlet(self).register(resource)
|
||||
KeyChangesServlet(self).register(resource)
|
||||
VoipRestServlet(self).register(resource)
|
||||
PushRuleRestServlet(self).register(resource)
|
||||
VersionsRestServlet(self).register(resource)
|
||||
|
||||
groups.register_servlets(self, resource)
|
||||
|
||||
resources.update({"/_matrix/client": resource})
|
||||
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
_base.listen_tcp(
|
||||
bind_addresses,
|
||||
port,
|
||||
SynapseSite(
|
||||
"synapse.access.http.%s" % (site_tag,),
|
||||
site_tag,
|
||||
listener_config,
|
||||
root_resource,
|
||||
self.version_string,
|
||||
),
|
||||
)
|
||||
|
||||
logger.info("Synapse client reader now listening on port %d", port)
|
||||
|
||||
def start_listening(self, listeners):
|
||||
for listener in listeners:
|
||||
if listener["type"] == "http":
|
||||
self._listen_http(listener)
|
||||
elif listener["type"] == "manhole":
|
||||
_base.listen_tcp(
|
||||
listener["bind_addresses"],
|
||||
listener["port"],
|
||||
manhole(
|
||||
username="matrix", password="rabbithole", globals={"hs": self}
|
||||
),
|
||||
)
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warning(
|
||||
(
|
||||
"Metrics listener configured, but "
|
||||
"enable_metrics is not True!"
|
||||
)
|
||||
)
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"], listener["port"])
|
||||
else:
|
||||
logger.warning("Unrecognized listener type: %s", listener["type"])
|
||||
|
||||
self.get_tcp_replication().start_replication(self)
|
||||
|
||||
def build_tcp_replication(self):
|
||||
return ReplicationClientHandler(self.get_datastore())
|
||||
|
||||
|
||||
def start(config_options):
|
||||
try:
|
||||
config = HomeServerConfig.load_config("Synapse client reader", config_options)
|
||||
except ConfigError as e:
|
||||
sys.stderr.write("\n" + str(e) + "\n")
|
||||
sys.exit(1)
|
||||
|
||||
assert config.worker_app == "synapse.app.client_reader"
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
ss = ClientReaderServer(
|
||||
config.server_name,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
)
|
||||
|
||||
_base.start_worker_reactor("synapse-client-reader", config)
|
||||
|
||||
from synapse.app.generic_worker import start
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
if __name__ == "__main__":
|
||||
with LoggingContext("main"):
|
||||
|
||||
@@ -13,191 +13,11 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
import sys
|
||||
|
||||
from twisted.internet import reactor
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
import synapse
|
||||
from synapse import events
|
||||
from synapse.app import _base
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.http.server import JsonResource
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
from synapse.replication.slave.storage.devices import SlavedDeviceStore
|
||||
from synapse.replication.slave.storage.directory import DirectoryStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.profile import SlavedProfileStore
|
||||
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
|
||||
from synapse.replication.slave.storage.pushers import SlavedPusherStore
|
||||
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.slave.storage.room import RoomStore
|
||||
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.rest.client.v1.profile import (
|
||||
ProfileAvatarURLRestServlet,
|
||||
ProfileDisplaynameRestServlet,
|
||||
ProfileRestServlet,
|
||||
)
|
||||
from synapse.rest.client.v1.room import (
|
||||
JoinRoomAliasServlet,
|
||||
RoomMembershipRestServlet,
|
||||
RoomSendEventRestServlet,
|
||||
RoomStateEventRestServlet,
|
||||
)
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.data_stores.main.monthly_active_users import (
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
)
|
||||
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
logger = logging.getLogger("synapse.app.event_creator")
|
||||
|
||||
|
||||
class EventCreatorSlavedStore(
|
||||
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
|
||||
# rather than going via the correct worker.
|
||||
UserDirectoryStore,
|
||||
DirectoryStore,
|
||||
SlavedTransactionStore,
|
||||
SlavedProfileStore,
|
||||
SlavedAccountDataStore,
|
||||
SlavedPusherStore,
|
||||
SlavedReceiptsStore,
|
||||
SlavedPushRuleStore,
|
||||
SlavedDeviceStore,
|
||||
SlavedClientIpStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedEventStore,
|
||||
SlavedRegistrationStore,
|
||||
RoomStore,
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
BaseSlavedStore,
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
class EventCreatorServer(HomeServer):
|
||||
DATASTORE_CLASS = EventCreatorSlavedStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
bind_addresses = listener_config["bind_addresses"]
|
||||
site_tag = listener_config.get("tag", port)
|
||||
resources = {}
|
||||
for res in listener_config["resources"]:
|
||||
for name in res["names"]:
|
||||
if name == "metrics":
|
||||
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
||||
elif name == "client":
|
||||
resource = JsonResource(self, canonical_json=False)
|
||||
RoomSendEventRestServlet(self).register(resource)
|
||||
RoomMembershipRestServlet(self).register(resource)
|
||||
RoomStateEventRestServlet(self).register(resource)
|
||||
JoinRoomAliasServlet(self).register(resource)
|
||||
ProfileAvatarURLRestServlet(self).register(resource)
|
||||
ProfileDisplaynameRestServlet(self).register(resource)
|
||||
ProfileRestServlet(self).register(resource)
|
||||
resources.update(
|
||||
{
|
||||
"/_matrix/client/r0": resource,
|
||||
"/_matrix/client/unstable": resource,
|
||||
"/_matrix/client/v2_alpha": resource,
|
||||
"/_matrix/client/api/v1": resource,
|
||||
}
|
||||
)
|
||||
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
_base.listen_tcp(
|
||||
bind_addresses,
|
||||
port,
|
||||
SynapseSite(
|
||||
"synapse.access.http.%s" % (site_tag,),
|
||||
site_tag,
|
||||
listener_config,
|
||||
root_resource,
|
||||
self.version_string,
|
||||
),
|
||||
)
|
||||
|
||||
logger.info("Synapse event creator now listening on port %d", port)
|
||||
|
||||
def start_listening(self, listeners):
|
||||
for listener in listeners:
|
||||
if listener["type"] == "http":
|
||||
self._listen_http(listener)
|
||||
elif listener["type"] == "manhole":
|
||||
_base.listen_tcp(
|
||||
listener["bind_addresses"],
|
||||
listener["port"],
|
||||
manhole(
|
||||
username="matrix", password="rabbithole", globals={"hs": self}
|
||||
),
|
||||
)
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warning(
|
||||
(
|
||||
"Metrics listener configured, but "
|
||||
"enable_metrics is not True!"
|
||||
)
|
||||
)
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"], listener["port"])
|
||||
else:
|
||||
logger.warning("Unrecognized listener type: %s", listener["type"])
|
||||
|
||||
self.get_tcp_replication().start_replication(self)
|
||||
|
||||
def build_tcp_replication(self):
|
||||
return ReplicationClientHandler(self.get_datastore())
|
||||
|
||||
|
||||
def start(config_options):
|
||||
try:
|
||||
config = HomeServerConfig.load_config("Synapse event creator", config_options)
|
||||
except ConfigError as e:
|
||||
sys.stderr.write("\n" + str(e) + "\n")
|
||||
sys.exit(1)
|
||||
|
||||
assert config.worker_app == "synapse.app.event_creator"
|
||||
|
||||
assert config.worker_replication_http_port is not None
|
||||
|
||||
# This should only be done on the user directory worker or the master
|
||||
config.update_user_directory = False
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
ss = EventCreatorServer(
|
||||
config.server_name,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
)
|
||||
|
||||
_base.start_worker_reactor("synapse-event-creator", config)
|
||||
|
||||
from synapse.app.generic_worker import start
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
if __name__ == "__main__":
|
||||
with LoggingContext("main"):
|
||||
|
||||
@@ -13,177 +13,11 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
import sys
|
||||
|
||||
from twisted.internet import reactor
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
import synapse
|
||||
from synapse import events
|
||||
from synapse.api.urls import FEDERATION_PREFIX, SERVER_KEY_V2_PREFIX
|
||||
from synapse.app import _base
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.federation.transport.server import TransportLayerServer
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.devices import SlavedDeviceStore
|
||||
from synapse.replication.slave.storage.directory import DirectoryStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
|
||||
from synapse.replication.slave.storage.keys import SlavedKeyStore
|
||||
from synapse.replication.slave.storage.profile import SlavedProfileStore
|
||||
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
|
||||
from synapse.replication.slave.storage.pushers import SlavedPusherStore
|
||||
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.slave.storage.room import RoomStore
|
||||
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.rest.key.v2 import KeyApiV2Resource
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.data_stores.main.monthly_active_users import (
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
)
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
logger = logging.getLogger("synapse.app.federation_reader")
|
||||
|
||||
|
||||
class FederationReaderSlavedStore(
|
||||
SlavedAccountDataStore,
|
||||
SlavedProfileStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedPusherStore,
|
||||
SlavedPushRuleStore,
|
||||
SlavedReceiptsStore,
|
||||
SlavedEventStore,
|
||||
SlavedKeyStore,
|
||||
SlavedRegistrationStore,
|
||||
SlavedGroupServerStore,
|
||||
SlavedDeviceStore,
|
||||
RoomStore,
|
||||
DirectoryStore,
|
||||
SlavedTransactionStore,
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
BaseSlavedStore,
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
class FederationReaderServer(HomeServer):
|
||||
DATASTORE_CLASS = FederationReaderSlavedStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
bind_addresses = listener_config["bind_addresses"]
|
||||
site_tag = listener_config.get("tag", port)
|
||||
resources = {}
|
||||
for res in listener_config["resources"]:
|
||||
for name in res["names"]:
|
||||
if name == "metrics":
|
||||
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
||||
elif name == "federation":
|
||||
resources.update({FEDERATION_PREFIX: TransportLayerServer(self)})
|
||||
if name == "openid" and "federation" not in res["names"]:
|
||||
# Only load the openid resource separately if federation resource
|
||||
# is not specified since federation resource includes openid
|
||||
# resource.
|
||||
resources.update(
|
||||
{
|
||||
FEDERATION_PREFIX: TransportLayerServer(
|
||||
self, servlet_groups=["openid"]
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
if name in ["keys", "federation"]:
|
||||
resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self)
|
||||
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
_base.listen_tcp(
|
||||
bind_addresses,
|
||||
port,
|
||||
SynapseSite(
|
||||
"synapse.access.http.%s" % (site_tag,),
|
||||
site_tag,
|
||||
listener_config,
|
||||
root_resource,
|
||||
self.version_string,
|
||||
),
|
||||
reactor=self.get_reactor(),
|
||||
)
|
||||
|
||||
logger.info("Synapse federation reader now listening on port %d", port)
|
||||
|
||||
def start_listening(self, listeners):
|
||||
for listener in listeners:
|
||||
if listener["type"] == "http":
|
||||
self._listen_http(listener)
|
||||
elif listener["type"] == "manhole":
|
||||
_base.listen_tcp(
|
||||
listener["bind_addresses"],
|
||||
listener["port"],
|
||||
manhole(
|
||||
username="matrix", password="rabbithole", globals={"hs": self}
|
||||
),
|
||||
)
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warning(
|
||||
(
|
||||
"Metrics listener configured, but "
|
||||
"enable_metrics is not True!"
|
||||
)
|
||||
)
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"], listener["port"])
|
||||
else:
|
||||
logger.warning("Unrecognized listener type: %s", listener["type"])
|
||||
|
||||
self.get_tcp_replication().start_replication(self)
|
||||
|
||||
def build_tcp_replication(self):
|
||||
return ReplicationClientHandler(self.get_datastore())
|
||||
|
||||
|
||||
def start(config_options):
|
||||
try:
|
||||
config = HomeServerConfig.load_config(
|
||||
"Synapse federation reader", config_options
|
||||
)
|
||||
except ConfigError as e:
|
||||
sys.stderr.write("\n" + str(e) + "\n")
|
||||
sys.exit(1)
|
||||
|
||||
assert config.worker_app == "synapse.app.federation_reader"
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
ss = FederationReaderServer(
|
||||
config.server_name,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
)
|
||||
|
||||
_base.start_worker_reactor("synapse-federation-reader", config)
|
||||
|
||||
from synapse.app.generic_worker import start
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
if __name__ == "__main__":
|
||||
with LoggingContext("main"):
|
||||
|
||||
@@ -13,308 +13,11 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
import sys
|
||||
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
import synapse
|
||||
from synapse import events
|
||||
from synapse.app import _base
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.federation import send_queue
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext, run_in_background
|
||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
|
||||
from synapse.replication.slave.storage.devices import SlavedDeviceStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.presence import SlavedPresenceStore
|
||||
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.replication.tcp.streams._base import (
|
||||
DeviceListsStream,
|
||||
ReceiptsStream,
|
||||
ToDeviceStream,
|
||||
)
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.database import Database
|
||||
from synapse.types import ReadReceipt
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
logger = logging.getLogger("synapse.app.federation_sender")
|
||||
|
||||
|
||||
class FederationSenderSlaveStore(
|
||||
SlavedDeviceInboxStore,
|
||||
SlavedTransactionStore,
|
||||
SlavedReceiptsStore,
|
||||
SlavedEventStore,
|
||||
SlavedRegistrationStore,
|
||||
SlavedDeviceStore,
|
||||
SlavedPresenceStore,
|
||||
):
|
||||
def __init__(self, database: Database, db_conn, hs):
|
||||
super(FederationSenderSlaveStore, self).__init__(database, db_conn, hs)
|
||||
|
||||
# We pull out the current federation stream position now so that we
|
||||
# always have a known value for the federation position in memory so
|
||||
# that we don't have to bounce via a deferred once when we start the
|
||||
# replication streams.
|
||||
self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)
|
||||
|
||||
def _get_federation_out_pos(self, db_conn):
|
||||
sql = "SELECT stream_id FROM federation_stream_position WHERE type = ?"
|
||||
sql = self.database_engine.convert_param_style(sql)
|
||||
|
||||
txn = db_conn.cursor()
|
||||
txn.execute(sql, ("federation",))
|
||||
rows = txn.fetchall()
|
||||
txn.close()
|
||||
|
||||
return rows[0][0] if rows else -1
|
||||
|
||||
|
||||
class FederationSenderServer(HomeServer):
|
||||
DATASTORE_CLASS = FederationSenderSlaveStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
bind_addresses = listener_config["bind_addresses"]
|
||||
site_tag = listener_config.get("tag", port)
|
||||
resources = {}
|
||||
for res in listener_config["resources"]:
|
||||
for name in res["names"]:
|
||||
if name == "metrics":
|
||||
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
||||
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
_base.listen_tcp(
|
||||
bind_addresses,
|
||||
port,
|
||||
SynapseSite(
|
||||
"synapse.access.http.%s" % (site_tag,),
|
||||
site_tag,
|
||||
listener_config,
|
||||
root_resource,
|
||||
self.version_string,
|
||||
),
|
||||
)
|
||||
|
||||
logger.info("Synapse federation_sender now listening on port %d", port)
|
||||
|
||||
def start_listening(self, listeners):
|
||||
for listener in listeners:
|
||||
if listener["type"] == "http":
|
||||
self._listen_http(listener)
|
||||
elif listener["type"] == "manhole":
|
||||
_base.listen_tcp(
|
||||
listener["bind_addresses"],
|
||||
listener["port"],
|
||||
manhole(
|
||||
username="matrix", password="rabbithole", globals={"hs": self}
|
||||
),
|
||||
)
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warning(
|
||||
(
|
||||
"Metrics listener configured, but "
|
||||
"enable_metrics is not True!"
|
||||
)
|
||||
)
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"], listener["port"])
|
||||
else:
|
||||
logger.warning("Unrecognized listener type: %s", listener["type"])
|
||||
|
||||
self.get_tcp_replication().start_replication(self)
|
||||
|
||||
def build_tcp_replication(self):
|
||||
return FederationSenderReplicationHandler(self)
|
||||
|
||||
|
||||
class FederationSenderReplicationHandler(ReplicationClientHandler):
|
||||
def __init__(self, hs):
|
||||
super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
|
||||
self.send_handler = FederationSenderHandler(hs, self)
|
||||
|
||||
async def on_rdata(self, stream_name, token, rows):
|
||||
await super(FederationSenderReplicationHandler, self).on_rdata(
|
||||
stream_name, token, rows
|
||||
)
|
||||
self.send_handler.process_replication_rows(stream_name, token, rows)
|
||||
|
||||
def get_streams_to_replicate(self):
|
||||
args = super(
|
||||
FederationSenderReplicationHandler, self
|
||||
).get_streams_to_replicate()
|
||||
args.update(self.send_handler.stream_positions())
|
||||
return args
|
||||
|
||||
def on_remote_server_up(self, server: str):
|
||||
"""Called when get a new REMOTE_SERVER_UP command."""
|
||||
|
||||
# Let's wake up the transaction queue for the server in case we have
|
||||
# pending stuff to send to it.
|
||||
self.send_handler.wake_destination(server)
|
||||
|
||||
|
||||
def start(config_options):
|
||||
try:
|
||||
config = HomeServerConfig.load_config(
|
||||
"Synapse federation sender", config_options
|
||||
)
|
||||
except ConfigError as e:
|
||||
sys.stderr.write("\n" + str(e) + "\n")
|
||||
sys.exit(1)
|
||||
|
||||
assert config.worker_app == "synapse.app.federation_sender"
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
if config.send_federation:
|
||||
sys.stderr.write(
|
||||
"\nThe send_federation must be disabled in the main synapse process"
|
||||
"\nbefore they can be run in a separate worker."
|
||||
"\nPlease add ``send_federation: false`` to the main config"
|
||||
"\n"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
# Force the pushers to start since they will be disabled in the main config
|
||||
config.send_federation = True
|
||||
|
||||
ss = FederationSenderServer(
|
||||
config.server_name,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
)
|
||||
|
||||
_base.start_worker_reactor("synapse-federation-sender", config)
|
||||
|
||||
|
||||
class FederationSenderHandler(object):
|
||||
"""Processes the replication stream and forwards the appropriate entries
|
||||
to the federation sender.
|
||||
"""
|
||||
|
||||
def __init__(self, hs: FederationSenderServer, replication_client):
|
||||
self.store = hs.get_datastore()
|
||||
self._is_mine_id = hs.is_mine_id
|
||||
self.federation_sender = hs.get_federation_sender()
|
||||
self.replication_client = replication_client
|
||||
|
||||
self.federation_position = self.store.federation_out_pos_startup
|
||||
self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
|
||||
|
||||
self._last_ack = self.federation_position
|
||||
|
||||
self._room_serials = {}
|
||||
self._room_typing = {}
|
||||
|
||||
def on_start(self):
|
||||
# There may be some events that are persisted but haven't been sent,
|
||||
# so send them now.
|
||||
self.federation_sender.notify_new_events(
|
||||
self.store.get_room_max_stream_ordering()
|
||||
)
|
||||
|
||||
def wake_destination(self, server: str):
|
||||
self.federation_sender.wake_destination(server)
|
||||
|
||||
def stream_positions(self):
|
||||
return {"federation": self.federation_position}
|
||||
|
||||
def process_replication_rows(self, stream_name, token, rows):
|
||||
# The federation stream contains things that we want to send out, e.g.
|
||||
# presence, typing, etc.
|
||||
if stream_name == "federation":
|
||||
send_queue.process_rows_for_federation(self.federation_sender, rows)
|
||||
run_in_background(self.update_token, token)
|
||||
|
||||
# We also need to poke the federation sender when new events happen
|
||||
elif stream_name == "events":
|
||||
self.federation_sender.notify_new_events(token)
|
||||
|
||||
# ... and when new receipts happen
|
||||
elif stream_name == ReceiptsStream.NAME:
|
||||
run_as_background_process(
|
||||
"process_receipts_for_federation", self._on_new_receipts, rows
|
||||
)
|
||||
|
||||
# ... as well as device updates and messages
|
||||
elif stream_name == DeviceListsStream.NAME:
|
||||
hosts = set(row.destination for row in rows)
|
||||
for host in hosts:
|
||||
self.federation_sender.send_device_messages(host)
|
||||
|
||||
elif stream_name == ToDeviceStream.NAME:
|
||||
# The to_device stream includes stuff to be pushed to both local
|
||||
# clients and remote servers, so we ignore entities that start with
|
||||
# '@' (since they'll be local users rather than destinations).
|
||||
hosts = set(row.entity for row in rows if not row.entity.startswith("@"))
|
||||
for host in hosts:
|
||||
self.federation_sender.send_device_messages(host)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _on_new_receipts(self, rows):
|
||||
"""
|
||||
Args:
|
||||
rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]):
|
||||
new receipts to be processed
|
||||
"""
|
||||
for receipt in rows:
|
||||
# we only want to send on receipts for our own users
|
||||
if not self._is_mine_id(receipt.user_id):
|
||||
continue
|
||||
receipt_info = ReadReceipt(
|
||||
receipt.room_id,
|
||||
receipt.receipt_type,
|
||||
receipt.user_id,
|
||||
[receipt.event_id],
|
||||
receipt.data,
|
||||
)
|
||||
yield self.federation_sender.send_read_receipt(receipt_info)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_token(self, token):
|
||||
try:
|
||||
self.federation_position = token
|
||||
|
||||
# We linearize here to ensure we don't have races updating the token
|
||||
with (yield self._fed_position_linearizer.queue(None)):
|
||||
if self._last_ack < self.federation_position:
|
||||
yield self.store.update_federation_out_pos(
|
||||
"federation", self.federation_position
|
||||
)
|
||||
|
||||
# We ACK this token over replication so that the master can drop
|
||||
# its in memory queues
|
||||
self.replication_client.send_federation_ack(
|
||||
self.federation_position
|
||||
)
|
||||
self._last_ack = self.federation_position
|
||||
except Exception:
|
||||
logger.exception("Error updating federation stream position")
|
||||
|
||||
from synapse.app.generic_worker import start
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
if __name__ == "__main__":
|
||||
with LoggingContext("main"):
|
||||
|
||||
@@ -13,241 +13,11 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
import sys
|
||||
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
import synapse
|
||||
from synapse import events
|
||||
from synapse.api.errors import HttpResponseException, SynapseError
|
||||
from synapse.app import _base
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.http.server import JsonResource
|
||||
from synapse.http.servlet import RestServlet, parse_json_object_from_request
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
from synapse.replication.slave.storage.devices import SlavedDeviceStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.rest.client.v2_alpha._base import client_patterns
|
||||
from synapse.server import HomeServer
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
logger = logging.getLogger("synapse.app.frontend_proxy")
|
||||
|
||||
|
||||
class PresenceStatusStubServlet(RestServlet):
|
||||
PATTERNS = client_patterns("/presence/(?P<user_id>[^/]*)/status")
|
||||
|
||||
def __init__(self, hs):
|
||||
super(PresenceStatusStubServlet, self).__init__()
|
||||
self.http_client = hs.get_simple_http_client()
|
||||
self.auth = hs.get_auth()
|
||||
self.main_uri = hs.config.worker_main_http_uri
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, user_id):
|
||||
# Pass through the auth headers, if any, in case the access token
|
||||
# is there.
|
||||
auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
|
||||
headers = {"Authorization": auth_headers}
|
||||
|
||||
try:
|
||||
result = yield self.http_client.get_json(
|
||||
self.main_uri + request.uri.decode("ascii"), headers=headers
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
raise e.to_synapse_error()
|
||||
|
||||
return 200, result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_PUT(self, request, user_id):
|
||||
yield self.auth.get_user_by_req(request)
|
||||
return 200, {}
|
||||
|
||||
|
||||
class KeyUploadServlet(RestServlet):
|
||||
PATTERNS = client_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
|
||||
|
||||
def __init__(self, hs):
|
||||
"""
|
||||
Args:
|
||||
hs (synapse.server.HomeServer): server
|
||||
"""
|
||||
super(KeyUploadServlet, self).__init__()
|
||||
self.auth = hs.get_auth()
|
||||
self.store = hs.get_datastore()
|
||||
self.http_client = hs.get_simple_http_client()
|
||||
self.main_uri = hs.config.worker_main_http_uri
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, request, device_id):
|
||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
user_id = requester.user.to_string()
|
||||
body = parse_json_object_from_request(request)
|
||||
|
||||
if device_id is not None:
|
||||
# passing the device_id here is deprecated; however, we allow it
|
||||
# for now for compatibility with older clients.
|
||||
if requester.device_id is not None and device_id != requester.device_id:
|
||||
logger.warning(
|
||||
"Client uploading keys for a different device "
|
||||
"(logged in as %s, uploading for %s)",
|
||||
requester.device_id,
|
||||
device_id,
|
||||
)
|
||||
else:
|
||||
device_id = requester.device_id
|
||||
|
||||
if device_id is None:
|
||||
raise SynapseError(
|
||||
400, "To upload keys, you must pass device_id when authenticating"
|
||||
)
|
||||
|
||||
if body:
|
||||
# They're actually trying to upload something, proxy to main synapse.
|
||||
# Pass through the auth headers, if any, in case the access token
|
||||
# is there.
|
||||
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", [])
|
||||
headers = {"Authorization": auth_headers}
|
||||
result = yield self.http_client.post_json_get_json(
|
||||
self.main_uri + request.uri.decode("ascii"), body, headers=headers
|
||||
)
|
||||
|
||||
return 200, result
|
||||
else:
|
||||
# Just interested in counts.
|
||||
result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
|
||||
return 200, {"one_time_key_counts": result}
|
||||
|
||||
|
||||
class FrontendProxySlavedStore(
|
||||
SlavedDeviceStore,
|
||||
SlavedClientIpStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
BaseSlavedStore,
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
class FrontendProxyServer(HomeServer):
|
||||
DATASTORE_CLASS = FrontendProxySlavedStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
bind_addresses = listener_config["bind_addresses"]
|
||||
site_tag = listener_config.get("tag", port)
|
||||
resources = {}
|
||||
for res in listener_config["resources"]:
|
||||
for name in res["names"]:
|
||||
if name == "metrics":
|
||||
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
||||
elif name == "client":
|
||||
resource = JsonResource(self, canonical_json=False)
|
||||
KeyUploadServlet(self).register(resource)
|
||||
|
||||
# If presence is disabled, use the stub servlet that does
|
||||
# not allow sending presence
|
||||
if not self.config.use_presence:
|
||||
PresenceStatusStubServlet(self).register(resource)
|
||||
|
||||
resources.update(
|
||||
{
|
||||
"/_matrix/client/r0": resource,
|
||||
"/_matrix/client/unstable": resource,
|
||||
"/_matrix/client/v2_alpha": resource,
|
||||
"/_matrix/client/api/v1": resource,
|
||||
}
|
||||
)
|
||||
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
_base.listen_tcp(
|
||||
bind_addresses,
|
||||
port,
|
||||
SynapseSite(
|
||||
"synapse.access.http.%s" % (site_tag,),
|
||||
site_tag,
|
||||
listener_config,
|
||||
root_resource,
|
||||
self.version_string,
|
||||
),
|
||||
reactor=self.get_reactor(),
|
||||
)
|
||||
|
||||
logger.info("Synapse client reader now listening on port %d", port)
|
||||
|
||||
def start_listening(self, listeners):
|
||||
for listener in listeners:
|
||||
if listener["type"] == "http":
|
||||
self._listen_http(listener)
|
||||
elif listener["type"] == "manhole":
|
||||
_base.listen_tcp(
|
||||
listener["bind_addresses"],
|
||||
listener["port"],
|
||||
manhole(
|
||||
username="matrix", password="rabbithole", globals={"hs": self}
|
||||
),
|
||||
)
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warning(
|
||||
(
|
||||
"Metrics listener configured, but "
|
||||
"enable_metrics is not True!"
|
||||
)
|
||||
)
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"], listener["port"])
|
||||
else:
|
||||
logger.warning("Unrecognized listener type: %s", listener["type"])
|
||||
|
||||
self.get_tcp_replication().start_replication(self)
|
||||
|
||||
def build_tcp_replication(self):
|
||||
return ReplicationClientHandler(self.get_datastore())
|
||||
|
||||
|
||||
def start(config_options):
|
||||
try:
|
||||
config = HomeServerConfig.load_config("Synapse frontend proxy", config_options)
|
||||
except ConfigError as e:
|
||||
sys.stderr.write("\n" + str(e) + "\n")
|
||||
sys.exit(1)
|
||||
|
||||
assert config.worker_app == "synapse.app.frontend_proxy"
|
||||
|
||||
assert config.worker_main_http_uri is not None
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
ss = FrontendProxyServer(
|
||||
config.server_name,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
)
|
||||
|
||||
_base.start_worker_reactor("synapse-frontend-proxy", config)
|
||||
|
||||
from synapse.app.generic_worker import start
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
if __name__ == "__main__":
|
||||
with LoggingContext("main"):
|
||||
|
||||
923
synapse/app/generic_worker.py
Normal file
923
synapse/app/generic_worker.py
Normal file
@@ -0,0 +1,923 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2016 OpenMarket Ltd
|
||||
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import contextlib
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
import synapse
|
||||
import synapse.events
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import HttpResponseException, SynapseError
|
||||
from synapse.api.urls import (
|
||||
CLIENT_API_PREFIX,
|
||||
FEDERATION_PREFIX,
|
||||
LEGACY_MEDIA_PREFIX,
|
||||
MEDIA_PREFIX,
|
||||
SERVER_KEY_V2_PREFIX,
|
||||
)
|
||||
from synapse.app import _base
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.federation import send_queue
|
||||
from synapse.federation.transport.server import TransportLayerServer
|
||||
from synapse.handlers.presence import PresenceHandler, get_interested_parties
|
||||
from synapse.http.server import JsonResource
|
||||
from synapse.http.servlet import RestServlet, parse_json_object_from_request
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext, run_in_background
|
||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
|
||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
|
||||
from synapse.replication.slave.storage.devices import SlavedDeviceStore
|
||||
from synapse.replication.slave.storage.directory import DirectoryStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
|
||||
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
|
||||
from synapse.replication.slave.storage.keys import SlavedKeyStore
|
||||
from synapse.replication.slave.storage.presence import SlavedPresenceStore
|
||||
from synapse.replication.slave.storage.profile import SlavedProfileStore
|
||||
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
|
||||
from synapse.replication.slave.storage.pushers import SlavedPusherStore
|
||||
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.slave.storage.room import RoomStore
|
||||
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.replication.tcp.streams._base import (
|
||||
DeviceListsStream,
|
||||
ReceiptsStream,
|
||||
ToDeviceStream,
|
||||
)
|
||||
from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow
|
||||
from synapse.rest.admin import register_servlets_for_media_repo
|
||||
from synapse.rest.client.v1 import events
|
||||
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
|
||||
from synapse.rest.client.v1.login import LoginRestServlet
|
||||
from synapse.rest.client.v1.profile import (
|
||||
ProfileAvatarURLRestServlet,
|
||||
ProfileDisplaynameRestServlet,
|
||||
ProfileRestServlet,
|
||||
)
|
||||
from synapse.rest.client.v1.push_rule import PushRuleRestServlet
|
||||
from synapse.rest.client.v1.room import (
|
||||
JoinedRoomMemberListRestServlet,
|
||||
JoinRoomAliasServlet,
|
||||
PublicRoomListRestServlet,
|
||||
RoomEventContextServlet,
|
||||
RoomInitialSyncRestServlet,
|
||||
RoomMemberListRestServlet,
|
||||
RoomMembershipRestServlet,
|
||||
RoomMessageListRestServlet,
|
||||
RoomSendEventRestServlet,
|
||||
RoomStateEventRestServlet,
|
||||
RoomStateRestServlet,
|
||||
)
|
||||
from synapse.rest.client.v1.voip import VoipRestServlet
|
||||
from synapse.rest.client.v2_alpha import groups, sync, user_directory
|
||||
from synapse.rest.client.v2_alpha._base import client_patterns
|
||||
from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
|
||||
from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
|
||||
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
|
||||
from synapse.rest.client.versions import VersionsRestServlet
|
||||
from synapse.rest.key.v2 import KeyApiV2Resource
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.data_stores.main.media_repository import MediaRepositoryStore
|
||||
from synapse.storage.data_stores.main.monthly_active_users import (
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
)
|
||||
from synapse.storage.data_stores.main.presence import UserPresenceState
|
||||
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
|
||||
from synapse.types import ReadReceipt
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.stringutils import random_string
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
logger = logging.getLogger("synapse.app.generic_worker")
|
||||
|
||||
|
||||
class PresenceStatusStubServlet(RestServlet):
|
||||
"""If presence is disabled this servlet can be used to stub out setting
|
||||
presence status, while proxying the getters to the master instance.
|
||||
"""
|
||||
|
||||
PATTERNS = client_patterns("/presence/(?P<user_id>[^/]*)/status")
|
||||
|
||||
def __init__(self, hs):
|
||||
super(PresenceStatusStubServlet, self).__init__()
|
||||
self.http_client = hs.get_simple_http_client()
|
||||
self.auth = hs.get_auth()
|
||||
self.main_uri = hs.config.worker_main_http_uri
|
||||
|
||||
async def on_GET(self, request, user_id):
|
||||
# Pass through the auth headers, if any, in case the access token
|
||||
# is there.
|
||||
auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
|
||||
headers = {"Authorization": auth_headers}
|
||||
|
||||
try:
|
||||
result = await self.http_client.get_json(
|
||||
self.main_uri + request.uri.decode("ascii"), headers=headers
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
raise e.to_synapse_error()
|
||||
|
||||
return 200, result
|
||||
|
||||
async def on_PUT(self, request, user_id):
|
||||
await self.auth.get_user_by_req(request)
|
||||
return 200, {}
|
||||
|
||||
|
||||
class KeyUploadServlet(RestServlet):
|
||||
"""An implementation of the `KeyUploadServlet` that responds to read only
|
||||
requests, but otherwise proxies through to the master instance.
|
||||
"""
|
||||
|
||||
PATTERNS = client_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
|
||||
|
||||
def __init__(self, hs):
|
||||
"""
|
||||
Args:
|
||||
hs (synapse.server.HomeServer): server
|
||||
"""
|
||||
super(KeyUploadServlet, self).__init__()
|
||||
self.auth = hs.get_auth()
|
||||
self.store = hs.get_datastore()
|
||||
self.http_client = hs.get_simple_http_client()
|
||||
self.main_uri = hs.config.worker_main_http_uri
|
||||
|
||||
async def on_POST(self, request, device_id):
|
||||
requester = await self.auth.get_user_by_req(request, allow_guest=True)
|
||||
user_id = requester.user.to_string()
|
||||
body = parse_json_object_from_request(request)
|
||||
|
||||
if device_id is not None:
|
||||
# passing the device_id here is deprecated; however, we allow it
|
||||
# for now for compatibility with older clients.
|
||||
if requester.device_id is not None and device_id != requester.device_id:
|
||||
logger.warning(
|
||||
"Client uploading keys for a different device "
|
||||
"(logged in as %s, uploading for %s)",
|
||||
requester.device_id,
|
||||
device_id,
|
||||
)
|
||||
else:
|
||||
device_id = requester.device_id
|
||||
|
||||
if device_id is None:
|
||||
raise SynapseError(
|
||||
400, "To upload keys, you must pass device_id when authenticating"
|
||||
)
|
||||
|
||||
if body:
|
||||
# They're actually trying to upload something, proxy to main synapse.
|
||||
# Pass through the auth headers, if any, in case the access token
|
||||
# is there.
|
||||
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", [])
|
||||
headers = {"Authorization": auth_headers}
|
||||
result = await self.http_client.post_json_get_json(
|
||||
self.main_uri + request.uri.decode("ascii"), body, headers=headers
|
||||
)
|
||||
|
||||
return 200, result
|
||||
else:
|
||||
# Just interested in counts.
|
||||
result = await self.store.count_e2e_one_time_keys(user_id, device_id)
|
||||
return 200, {"one_time_key_counts": result}
|
||||
|
||||
|
||||
UPDATE_SYNCING_USERS_MS = 10 * 1000
|
||||
|
||||
|
||||
class GenericWorkerPresence(object):
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
self.http_client = hs.get_simple_http_client()
|
||||
self.store = hs.get_datastore()
|
||||
self.user_to_num_current_syncs = {}
|
||||
self.clock = hs.get_clock()
|
||||
self.notifier = hs.get_notifier()
|
||||
|
||||
active_presence = self.store.take_presence_startup_info()
|
||||
self.user_to_current_state = {state.user_id: state for state in active_presence}
|
||||
|
||||
# user_id -> last_sync_ms. Lists the users that have stopped syncing
|
||||
# but we haven't notified the master of that yet
|
||||
self.users_going_offline = {}
|
||||
|
||||
self._send_stop_syncing_loop = self.clock.looping_call(
|
||||
self.send_stop_syncing, UPDATE_SYNCING_USERS_MS
|
||||
)
|
||||
|
||||
self.process_id = random_string(16)
|
||||
logger.info("Presence process_id is %r", self.process_id)
|
||||
|
||||
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
|
||||
if self.hs.config.use_presence:
|
||||
self.hs.get_tcp_replication().send_user_sync(
|
||||
user_id, is_syncing, last_sync_ms
|
||||
)
|
||||
|
||||
def mark_as_coming_online(self, user_id):
|
||||
"""A user has started syncing. Send a UserSync to the master, unless they
|
||||
had recently stopped syncing.
|
||||
|
||||
Args:
|
||||
user_id (str)
|
||||
"""
|
||||
going_offline = self.users_going_offline.pop(user_id, None)
|
||||
if not going_offline:
|
||||
# Safe to skip because we haven't yet told the master they were offline
|
||||
self.send_user_sync(user_id, True, self.clock.time_msec())
|
||||
|
||||
def mark_as_going_offline(self, user_id):
|
||||
"""A user has stopped syncing. We wait before notifying the master as
|
||||
its likely they'll come back soon. This allows us to avoid sending
|
||||
a stopped syncing immediately followed by a started syncing notification
|
||||
to the master
|
||||
|
||||
Args:
|
||||
user_id (str)
|
||||
"""
|
||||
self.users_going_offline[user_id] = self.clock.time_msec()
|
||||
|
||||
def send_stop_syncing(self):
|
||||
"""Check if there are any users who have stopped syncing a while ago
|
||||
and haven't come back yet. If there are poke the master about them.
|
||||
"""
|
||||
now = self.clock.time_msec()
|
||||
for user_id, last_sync_ms in list(self.users_going_offline.items()):
|
||||
if now - last_sync_ms > UPDATE_SYNCING_USERS_MS:
|
||||
self.users_going_offline.pop(user_id, None)
|
||||
self.send_user_sync(user_id, False, last_sync_ms)
|
||||
|
||||
def set_state(self, user, state, ignore_status_msg=False):
|
||||
# TODO Hows this supposed to work?
|
||||
return defer.succeed(None)
|
||||
|
||||
get_states = __func__(PresenceHandler.get_states)
|
||||
get_state = __func__(PresenceHandler.get_state)
|
||||
current_state_for_users = __func__(PresenceHandler.current_state_for_users)
|
||||
|
||||
def user_syncing(self, user_id, affect_presence):
|
||||
if affect_presence:
|
||||
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
|
||||
self.user_to_num_current_syncs[user_id] = curr_sync + 1
|
||||
|
||||
# If we went from no in flight sync to some, notify replication
|
||||
if self.user_to_num_current_syncs[user_id] == 1:
|
||||
self.mark_as_coming_online(user_id)
|
||||
|
||||
def _end():
|
||||
# We check that the user_id is in user_to_num_current_syncs because
|
||||
# user_to_num_current_syncs may have been cleared if we are
|
||||
# shutting down.
|
||||
if affect_presence and user_id in self.user_to_num_current_syncs:
|
||||
self.user_to_num_current_syncs[user_id] -= 1
|
||||
|
||||
# If we went from one in flight sync to non, notify replication
|
||||
if self.user_to_num_current_syncs[user_id] == 0:
|
||||
self.mark_as_going_offline(user_id)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _user_syncing():
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
_end()
|
||||
|
||||
return defer.succeed(_user_syncing())
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def notify_from_replication(self, states, stream_id):
|
||||
parties = yield get_interested_parties(self.store, states)
|
||||
room_ids_to_states, users_to_states = parties
|
||||
|
||||
self.notifier.on_new_event(
|
||||
"presence_key",
|
||||
stream_id,
|
||||
rooms=room_ids_to_states.keys(),
|
||||
users=users_to_states.keys(),
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def process_replication_rows(self, token, rows):
|
||||
states = [
|
||||
UserPresenceState(
|
||||
row.user_id,
|
||||
row.state,
|
||||
row.last_active_ts,
|
||||
row.last_federation_update_ts,
|
||||
row.last_user_sync_ts,
|
||||
row.status_msg,
|
||||
row.currently_active,
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
|
||||
for state in states:
|
||||
self.user_to_current_state[state.user_id] = state
|
||||
|
||||
stream_id = token
|
||||
yield self.notify_from_replication(states, stream_id)
|
||||
|
||||
def get_currently_syncing_users(self):
|
||||
if self.hs.config.use_presence:
|
||||
return [
|
||||
user_id
|
||||
for user_id, count in self.user_to_num_current_syncs.items()
|
||||
if count > 0
|
||||
]
|
||||
else:
|
||||
return set()
|
||||
|
||||
|
||||
class GenericWorkerTyping(object):
|
||||
def __init__(self, hs):
|
||||
self._latest_room_serial = 0
|
||||
self._reset()
|
||||
|
||||
def _reset(self):
|
||||
"""
|
||||
Reset the typing handler's data caches.
|
||||
"""
|
||||
# map room IDs to serial numbers
|
||||
self._room_serials = {}
|
||||
# map room IDs to sets of users currently typing
|
||||
self._room_typing = {}
|
||||
|
||||
def stream_positions(self):
|
||||
# We must update this typing token from the response of the previous
|
||||
# sync. In particular, the stream id may "reset" back to zero/a low
|
||||
# value which we *must* use for the next replication request.
|
||||
return {"typing": self._latest_room_serial}
|
||||
|
||||
def process_replication_rows(self, token, rows):
|
||||
if self._latest_room_serial > token:
|
||||
# The master has gone backwards. To prevent inconsistent data, just
|
||||
# clear everything.
|
||||
self._reset()
|
||||
|
||||
# Set the latest serial token to whatever the server gave us.
|
||||
self._latest_room_serial = token
|
||||
|
||||
for row in rows:
|
||||
self._room_serials[row.room_id] = token
|
||||
self._room_typing[row.room_id] = row.user_ids
|
||||
|
||||
|
||||
class GenericWorkerSlavedStore(
|
||||
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
|
||||
# rather than going via the correct worker.
|
||||
UserDirectoryStore,
|
||||
SlavedDeviceInboxStore,
|
||||
SlavedDeviceStore,
|
||||
SlavedReceiptsStore,
|
||||
SlavedPushRuleStore,
|
||||
SlavedGroupServerStore,
|
||||
SlavedAccountDataStore,
|
||||
SlavedPusherStore,
|
||||
SlavedEventStore,
|
||||
SlavedKeyStore,
|
||||
RoomStore,
|
||||
DirectoryStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
SlavedTransactionStore,
|
||||
SlavedProfileStore,
|
||||
SlavedClientIpStore,
|
||||
SlavedPresenceStore,
|
||||
SlavedFilteringStore,
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
MediaRepositoryStore,
|
||||
BaseSlavedStore,
|
||||
):
|
||||
def __init__(self, database, db_conn, hs):
|
||||
super(GenericWorkerSlavedStore, self).__init__(database, db_conn, hs)
|
||||
|
||||
# We pull out the current federation stream position now so that we
|
||||
# always have a known value for the federation position in memory so
|
||||
# that we don't have to bounce via a deferred once when we start the
|
||||
# replication streams.
|
||||
self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)
|
||||
|
||||
def _get_federation_out_pos(self, db_conn):
|
||||
sql = "SELECT stream_id FROM federation_stream_position WHERE type = ?"
|
||||
sql = self.database_engine.convert_param_style(sql)
|
||||
|
||||
txn = db_conn.cursor()
|
||||
txn.execute(sql, ("federation",))
|
||||
rows = txn.fetchall()
|
||||
txn.close()
|
||||
|
||||
return rows[0][0] if rows else -1
|
||||
|
||||
|
||||
class GenericWorkerServer(HomeServer):
|
||||
DATASTORE_CLASS = GenericWorkerSlavedStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
bind_addresses = listener_config["bind_addresses"]
|
||||
site_tag = listener_config.get("tag", port)
|
||||
resources = {}
|
||||
for res in listener_config["resources"]:
|
||||
for name in res["names"]:
|
||||
if name == "metrics":
|
||||
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
||||
elif name == "client":
|
||||
resource = JsonResource(self, canonical_json=False)
|
||||
|
||||
PublicRoomListRestServlet(self).register(resource)
|
||||
RoomMemberListRestServlet(self).register(resource)
|
||||
JoinedRoomMemberListRestServlet(self).register(resource)
|
||||
RoomStateRestServlet(self).register(resource)
|
||||
RoomEventContextServlet(self).register(resource)
|
||||
RoomMessageListRestServlet(self).register(resource)
|
||||
RegisterRestServlet(self).register(resource)
|
||||
LoginRestServlet(self).register(resource)
|
||||
ThreepidRestServlet(self).register(resource)
|
||||
KeyQueryServlet(self).register(resource)
|
||||
KeyChangesServlet(self).register(resource)
|
||||
VoipRestServlet(self).register(resource)
|
||||
PushRuleRestServlet(self).register(resource)
|
||||
VersionsRestServlet(self).register(resource)
|
||||
RoomSendEventRestServlet(self).register(resource)
|
||||
RoomMembershipRestServlet(self).register(resource)
|
||||
RoomStateEventRestServlet(self).register(resource)
|
||||
JoinRoomAliasServlet(self).register(resource)
|
||||
ProfileAvatarURLRestServlet(self).register(resource)
|
||||
ProfileDisplaynameRestServlet(self).register(resource)
|
||||
ProfileRestServlet(self).register(resource)
|
||||
KeyUploadServlet(self).register(resource)
|
||||
|
||||
sync.register_servlets(self, resource)
|
||||
events.register_servlets(self, resource)
|
||||
InitialSyncRestServlet(self).register(resource)
|
||||
RoomInitialSyncRestServlet(self).register(resource)
|
||||
|
||||
user_directory.register_servlets(self, resource)
|
||||
|
||||
# If presence is disabled, use the stub servlet that does
|
||||
# not allow sending presence
|
||||
if not self.config.use_presence:
|
||||
PresenceStatusStubServlet(self).register(resource)
|
||||
|
||||
groups.register_servlets(self, resource)
|
||||
|
||||
resources.update({CLIENT_API_PREFIX: resource})
|
||||
elif name == "federation":
|
||||
resources.update({FEDERATION_PREFIX: TransportLayerServer(self)})
|
||||
elif name == "media":
|
||||
if self.config.can_load_media_repo:
|
||||
media_repo = self.get_media_repository_resource()
|
||||
|
||||
# We need to serve the admin servlets for media on the
|
||||
# worker.
|
||||
admin_resource = JsonResource(self, canonical_json=False)
|
||||
register_servlets_for_media_repo(self, admin_resource)
|
||||
|
||||
resources.update(
|
||||
{
|
||||
MEDIA_PREFIX: media_repo,
|
||||
LEGACY_MEDIA_PREFIX: media_repo,
|
||||
"/_synapse/admin": admin_resource,
|
||||
}
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
"A 'media' listener is configured but the media"
|
||||
" repository is disabled. Ignoring."
|
||||
)
|
||||
|
||||
if name == "openid" and "federation" not in res["names"]:
|
||||
# Only load the openid resource separately if federation resource
|
||||
# is not specified since federation resource includes openid
|
||||
# resource.
|
||||
resources.update(
|
||||
{
|
||||
FEDERATION_PREFIX: TransportLayerServer(
|
||||
self, servlet_groups=["openid"]
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
if name in ["keys", "federation"]:
|
||||
resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self)
|
||||
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
_base.listen_tcp(
|
||||
bind_addresses,
|
||||
port,
|
||||
SynapseSite(
|
||||
"synapse.access.http.%s" % (site_tag,),
|
||||
site_tag,
|
||||
listener_config,
|
||||
root_resource,
|
||||
self.version_string,
|
||||
),
|
||||
reactor=self.get_reactor(),
|
||||
)
|
||||
|
||||
logger.info("Synapse worker now listening on port %d", port)
|
||||
|
||||
def start_listening(self, listeners):
|
||||
for listener in listeners:
|
||||
if listener["type"] == "http":
|
||||
self._listen_http(listener)
|
||||
elif listener["type"] == "manhole":
|
||||
_base.listen_tcp(
|
||||
listener["bind_addresses"],
|
||||
listener["port"],
|
||||
manhole(
|
||||
username="matrix", password="rabbithole", globals={"hs": self}
|
||||
),
|
||||
)
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warning(
|
||||
(
|
||||
"Metrics listener configured, but "
|
||||
"enable_metrics is not True!"
|
||||
)
|
||||
)
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"], listener["port"])
|
||||
else:
|
||||
logger.warning("Unrecognized listener type: %s", listener["type"])
|
||||
|
||||
self.get_tcp_replication().start_replication(self)
|
||||
|
||||
def remove_pusher(self, app_id, push_key, user_id):
|
||||
self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
|
||||
|
||||
def build_tcp_replication(self):
|
||||
return GenericWorkerReplicationHandler(self)
|
||||
|
||||
def build_presence_handler(self):
|
||||
return GenericWorkerPresence(self)
|
||||
|
||||
def build_typing_handler(self):
|
||||
return GenericWorkerTyping(self)
|
||||
|
||||
|
||||
class GenericWorkerReplicationHandler(ReplicationClientHandler):
|
||||
def __init__(self, hs):
|
||||
super(GenericWorkerReplicationHandler, self).__init__(hs.get_datastore())
|
||||
|
||||
self.store = hs.get_datastore()
|
||||
self.typing_handler = hs.get_typing_handler()
|
||||
# NB this is a SynchrotronPresence, not a normal PresenceHandler
|
||||
self.presence_handler = hs.get_presence_handler()
|
||||
self.notifier = hs.get_notifier()
|
||||
|
||||
self.notify_pushers = hs.config.start_pushers
|
||||
self.pusher_pool = hs.get_pusherpool()
|
||||
|
||||
if hs.config.send_federation:
|
||||
self.send_handler = FederationSenderHandler(hs, self)
|
||||
else:
|
||||
self.send_handler = None
|
||||
|
||||
async def on_rdata(self, stream_name, token, rows):
|
||||
await super(GenericWorkerReplicationHandler, self).on_rdata(
|
||||
stream_name, token, rows
|
||||
)
|
||||
run_in_background(self.process_and_notify, stream_name, token, rows)
|
||||
|
||||
def get_streams_to_replicate(self):
|
||||
args = super(GenericWorkerReplicationHandler, self).get_streams_to_replicate()
|
||||
args.update(self.typing_handler.stream_positions())
|
||||
if self.send_handler:
|
||||
args.update(self.send_handler.stream_positions())
|
||||
return args
|
||||
|
||||
def get_currently_syncing_users(self):
|
||||
return self.presence_handler.get_currently_syncing_users()
|
||||
|
||||
async def process_and_notify(self, stream_name, token, rows):
|
||||
try:
|
||||
if self.send_handler:
|
||||
self.send_handler.process_replication_rows(stream_name, token, rows)
|
||||
|
||||
if stream_name == "events":
|
||||
# We shouldn't get multiple rows per token for events stream, so
|
||||
# we don't need to optimise this for multiple rows.
|
||||
for row in rows:
|
||||
if row.type != EventsStreamEventRow.TypeId:
|
||||
continue
|
||||
assert isinstance(row, EventsStreamRow)
|
||||
|
||||
event = await self.store.get_event(
|
||||
row.data.event_id, allow_rejected=True
|
||||
)
|
||||
if event.rejected_reason:
|
||||
continue
|
||||
|
||||
extra_users = ()
|
||||
if event.type == EventTypes.Member:
|
||||
extra_users = (event.state_key,)
|
||||
max_token = self.store.get_room_max_stream_ordering()
|
||||
self.notifier.on_new_room_event(
|
||||
event, token, max_token, extra_users
|
||||
)
|
||||
|
||||
await self.pusher_pool.on_new_notifications(token, token)
|
||||
elif stream_name == "push_rules":
|
||||
self.notifier.on_new_event(
|
||||
"push_rules_key", token, users=[row.user_id for row in rows]
|
||||
)
|
||||
elif stream_name in ("account_data", "tag_account_data"):
|
||||
self.notifier.on_new_event(
|
||||
"account_data_key", token, users=[row.user_id for row in rows]
|
||||
)
|
||||
elif stream_name == "receipts":
|
||||
self.notifier.on_new_event(
|
||||
"receipt_key", token, rooms=[row.room_id for row in rows]
|
||||
)
|
||||
await self.pusher_pool.on_new_receipts(
|
||||
token, token, {row.room_id for row in rows}
|
||||
)
|
||||
elif stream_name == "typing":
|
||||
self.typing_handler.process_replication_rows(token, rows)
|
||||
self.notifier.on_new_event(
|
||||
"typing_key", token, rooms=[row.room_id for row in rows]
|
||||
)
|
||||
elif stream_name == "to_device":
|
||||
entities = [row.entity for row in rows if row.entity.startswith("@")]
|
||||
if entities:
|
||||
self.notifier.on_new_event("to_device_key", token, users=entities)
|
||||
elif stream_name == "device_lists":
|
||||
all_room_ids = set()
|
||||
for row in rows:
|
||||
room_ids = await self.store.get_rooms_for_user(row.user_id)
|
||||
all_room_ids.update(room_ids)
|
||||
self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
|
||||
elif stream_name == "presence":
|
||||
await self.presence_handler.process_replication_rows(token, rows)
|
||||
elif stream_name == "receipts":
|
||||
self.notifier.on_new_event(
|
||||
"groups_key", token, users=[row.user_id for row in rows]
|
||||
)
|
||||
elif stream_name == "pushers":
|
||||
for row in rows:
|
||||
if row.deleted:
|
||||
self.stop_pusher(row.user_id, row.app_id, row.pushkey)
|
||||
else:
|
||||
await self.start_pusher(row.user_id, row.app_id, row.pushkey)
|
||||
except Exception:
|
||||
logger.exception("Error processing replication")
|
||||
|
||||
def stop_pusher(self, user_id, app_id, pushkey):
|
||||
if not self.notify_pushers:
|
||||
return
|
||||
|
||||
key = "%s:%s" % (app_id, pushkey)
|
||||
pushers_for_user = self.pusher_pool.pushers.get(user_id, {})
|
||||
pusher = pushers_for_user.pop(key, None)
|
||||
if pusher is None:
|
||||
return
|
||||
logger.info("Stopping pusher %r / %r", user_id, key)
|
||||
pusher.on_stop()
|
||||
|
||||
async def start_pusher(self, user_id, app_id, pushkey):
|
||||
if not self.notify_pushers:
|
||||
return
|
||||
|
||||
key = "%s:%s" % (app_id, pushkey)
|
||||
logger.info("Starting pusher %r / %r", user_id, key)
|
||||
return await self.pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)
|
||||
|
||||
def on_remote_server_up(self, server: str):
|
||||
"""Called when get a new REMOTE_SERVER_UP command."""
|
||||
|
||||
# Let's wake up the transaction queue for the server in case we have
|
||||
# pending stuff to send to it.
|
||||
if self.send_handler:
|
||||
self.send_handler.wake_destination(server)
|
||||
|
||||
|
||||
class FederationSenderHandler(object):
|
||||
"""Processes the replication stream and forwards the appropriate entries
|
||||
to the federation sender.
|
||||
"""
|
||||
|
||||
def __init__(self, hs: GenericWorkerServer, replication_client):
|
||||
self.store = hs.get_datastore()
|
||||
self._is_mine_id = hs.is_mine_id
|
||||
self.federation_sender = hs.get_federation_sender()
|
||||
self.replication_client = replication_client
|
||||
|
||||
self.federation_position = self.store.federation_out_pos_startup
|
||||
self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
|
||||
|
||||
self._last_ack = self.federation_position
|
||||
|
||||
self._room_serials = {}
|
||||
self._room_typing = {}
|
||||
|
||||
def on_start(self):
|
||||
# There may be some events that are persisted but haven't been sent,
|
||||
# so send them now.
|
||||
self.federation_sender.notify_new_events(
|
||||
self.store.get_room_max_stream_ordering()
|
||||
)
|
||||
|
||||
def wake_destination(self, server: str):
|
||||
self.federation_sender.wake_destination(server)
|
||||
|
||||
def stream_positions(self):
|
||||
return {"federation": self.federation_position}
|
||||
|
||||
def process_replication_rows(self, stream_name, token, rows):
|
||||
# The federation stream contains things that we want to send out, e.g.
|
||||
# presence, typing, etc.
|
||||
if stream_name == "federation":
|
||||
send_queue.process_rows_for_federation(self.federation_sender, rows)
|
||||
run_in_background(self.update_token, token)
|
||||
|
||||
# We also need to poke the federation sender when new events happen
|
||||
elif stream_name == "events":
|
||||
self.federation_sender.notify_new_events(token)
|
||||
|
||||
# ... and when new receipts happen
|
||||
elif stream_name == ReceiptsStream.NAME:
|
||||
run_as_background_process(
|
||||
"process_receipts_for_federation", self._on_new_receipts, rows
|
||||
)
|
||||
|
||||
# ... as well as device updates and messages
|
||||
elif stream_name == DeviceListsStream.NAME:
|
||||
hosts = {row.destination for row in rows}
|
||||
for host in hosts:
|
||||
self.federation_sender.send_device_messages(host)
|
||||
|
||||
elif stream_name == ToDeviceStream.NAME:
|
||||
# The to_device stream includes stuff to be pushed to both local
|
||||
# clients and remote servers, so we ignore entities that start with
|
||||
# '@' (since they'll be local users rather than destinations).
|
||||
hosts = {row.entity for row in rows if not row.entity.startswith("@")}
|
||||
for host in hosts:
|
||||
self.federation_sender.send_device_messages(host)
|
||||
|
||||
async def _on_new_receipts(self, rows):
|
||||
"""
|
||||
Args:
|
||||
rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]):
|
||||
new receipts to be processed
|
||||
"""
|
||||
for receipt in rows:
|
||||
# we only want to send on receipts for our own users
|
||||
if not self._is_mine_id(receipt.user_id):
|
||||
continue
|
||||
receipt_info = ReadReceipt(
|
||||
receipt.room_id,
|
||||
receipt.receipt_type,
|
||||
receipt.user_id,
|
||||
[receipt.event_id],
|
||||
receipt.data,
|
||||
)
|
||||
await self.federation_sender.send_read_receipt(receipt_info)
|
||||
|
||||
async def update_token(self, token):
|
||||
try:
|
||||
self.federation_position = token
|
||||
|
||||
# We linearize here to ensure we don't have races updating the token
|
||||
with (await self._fed_position_linearizer.queue(None)):
|
||||
if self._last_ack < self.federation_position:
|
||||
await self.store.update_federation_out_pos(
|
||||
"federation", self.federation_position
|
||||
)
|
||||
|
||||
# We ACK this token over replication so that the master can drop
|
||||
# its in memory queues
|
||||
self.replication_client.send_federation_ack(
|
||||
self.federation_position
|
||||
)
|
||||
self._last_ack = self.federation_position
|
||||
except Exception:
|
||||
logger.exception("Error updating federation stream position")
|
||||
|
||||
|
||||
def start(config_options):
|
||||
try:
|
||||
config = HomeServerConfig.load_config("Synapse worker", config_options)
|
||||
except ConfigError as e:
|
||||
sys.stderr.write("\n" + str(e) + "\n")
|
||||
sys.exit(1)
|
||||
|
||||
# For backwards compatibility let any of the old app names.
|
||||
assert config.worker_app in (
|
||||
"synapse.app.appservice",
|
||||
"synapse.app.client_reader",
|
||||
"synapse.app.event_creator",
|
||||
"synapse.app.federation_reader",
|
||||
"synapse.app.federation_sender",
|
||||
"synapse.app.frontend_proxy",
|
||||
"synapse.app.generic_worker",
|
||||
"synapse.app.media_repository",
|
||||
"synapse.app.pusher",
|
||||
"synapse.app.synchrotron",
|
||||
"synapse.app.user_dir",
|
||||
)
|
||||
|
||||
if config.worker_app == "synapse.app.appservice":
|
||||
if config.notify_appservices:
|
||||
sys.stderr.write(
|
||||
"\nThe appservices must be disabled in the main synapse process"
|
||||
"\nbefore they can be run in a separate worker."
|
||||
"\nPlease add ``notify_appservices: false`` to the main config"
|
||||
"\n"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
# Force the appservice to start since they will be disabled in the main config
|
||||
config.notify_appservices = True
|
||||
|
||||
if config.worker_app == "synapse.app.pusher":
|
||||
if config.start_pushers:
|
||||
sys.stderr.write(
|
||||
"\nThe pushers must be disabled in the main synapse process"
|
||||
"\nbefore they can be run in a separate worker."
|
||||
"\nPlease add ``start_pushers: false`` to the main config"
|
||||
"\n"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
# Force the pushers to start since they will be disabled in the main config
|
||||
config.start_pushers = True
|
||||
|
||||
if config.worker_app == "synapse.app.user_dir":
|
||||
if config.update_user_directory:
|
||||
sys.stderr.write(
|
||||
"\nThe update_user_directory must be disabled in the main synapse process"
|
||||
"\nbefore they can be run in a separate worker."
|
||||
"\nPlease add ``update_user_directory: false`` to the main config"
|
||||
"\n"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
# Force the pushers to start since they will be disabled in the main config
|
||||
config.update_user_directory = True
|
||||
|
||||
if config.worker_app == "synapse.app.federation_sender":
|
||||
if config.send_federation:
|
||||
sys.stderr.write(
|
||||
"\nThe send_federation must be disabled in the main synapse process"
|
||||
"\nbefore they can be run in a separate worker."
|
||||
"\nPlease add ``send_federation: false`` to the main config"
|
||||
"\n"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
# Force the pushers to start since they will be disabled in the main config
|
||||
config.send_federation = True
|
||||
|
||||
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
ss = GenericWorkerServer(
|
||||
config.server_name,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
)
|
||||
|
||||
_base.start_worker_reactor("synapse-generic-worker", config)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
with LoggingContext("main"):
|
||||
start(sys.argv[1:])
|
||||
@@ -298,6 +298,11 @@ class SynapseHomeServer(HomeServer):
|
||||
|
||||
# Gauges to expose monthly active user control metrics
|
||||
current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
|
||||
current_mau_by_service_gauge = Gauge(
|
||||
"synapse_admin_mau_current_mau_by_service",
|
||||
"Current MAU by service",
|
||||
["app_service"],
|
||||
)
|
||||
max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
|
||||
registered_reserved_users_mau_gauge = Gauge(
|
||||
"synapse_admin_mau:registered_reserved_users",
|
||||
@@ -403,7 +408,6 @@ def setup(config_options):
|
||||
|
||||
_base.start(hs, config.listeners)
|
||||
|
||||
hs.get_pusherpool().start()
|
||||
hs.get_datastore().db.updates.start_doing_background_updates()
|
||||
except Exception:
|
||||
# Print the exception and bail out.
|
||||
@@ -585,12 +589,20 @@ def run(hs):
|
||||
@defer.inlineCallbacks
|
||||
def generate_monthly_active_users():
|
||||
current_mau_count = 0
|
||||
current_mau_count_by_service = {}
|
||||
reserved_users = ()
|
||||
store = hs.get_datastore()
|
||||
if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
|
||||
current_mau_count = yield store.get_monthly_active_count()
|
||||
current_mau_count_by_service = (
|
||||
yield store.get_monthly_active_count_by_service()
|
||||
)
|
||||
reserved_users = yield store.get_registered_reserved_users()
|
||||
current_mau_gauge.set(float(current_mau_count))
|
||||
|
||||
for app_service, count in current_mau_count_by_service.items():
|
||||
current_mau_by_service_gauge.labels(app_service).set(float(count))
|
||||
|
||||
registered_reserved_users_mau_gauge.set(float(len(reserved_users)))
|
||||
max_mau_gauge.set(float(hs.config.max_mau_value))
|
||||
|
||||
|
||||
@@ -13,162 +13,11 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
import sys
|
||||
|
||||
from twisted.internet import reactor
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
import synapse
|
||||
from synapse import events
|
||||
from synapse.api.urls import LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
|
||||
from synapse.app import _base
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.http.server import JsonResource
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.slave.storage.room import RoomStore
|
||||
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.rest.admin import register_servlets_for_media_repo
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.data_stores.main.media_repository import MediaRepositoryStore
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
logger = logging.getLogger("synapse.app.media_repository")
|
||||
|
||||
|
||||
class MediaRepositorySlavedStore(
|
||||
RoomStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
SlavedClientIpStore,
|
||||
SlavedTransactionStore,
|
||||
BaseSlavedStore,
|
||||
MediaRepositoryStore,
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
class MediaRepositoryServer(HomeServer):
|
||||
DATASTORE_CLASS = MediaRepositorySlavedStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
bind_addresses = listener_config["bind_addresses"]
|
||||
site_tag = listener_config.get("tag", port)
|
||||
resources = {}
|
||||
for res in listener_config["resources"]:
|
||||
for name in res["names"]:
|
||||
if name == "metrics":
|
||||
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
||||
elif name == "media":
|
||||
media_repo = self.get_media_repository_resource()
|
||||
|
||||
# We need to serve the admin servlets for media on the
|
||||
# worker.
|
||||
admin_resource = JsonResource(self, canonical_json=False)
|
||||
register_servlets_for_media_repo(self, admin_resource)
|
||||
|
||||
resources.update(
|
||||
{
|
||||
MEDIA_PREFIX: media_repo,
|
||||
LEGACY_MEDIA_PREFIX: media_repo,
|
||||
"/_synapse/admin": admin_resource,
|
||||
}
|
||||
)
|
||||
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
_base.listen_tcp(
|
||||
bind_addresses,
|
||||
port,
|
||||
SynapseSite(
|
||||
"synapse.access.http.%s" % (site_tag,),
|
||||
site_tag,
|
||||
listener_config,
|
||||
root_resource,
|
||||
self.version_string,
|
||||
),
|
||||
)
|
||||
|
||||
logger.info("Synapse media repository now listening on port %d", port)
|
||||
|
||||
def start_listening(self, listeners):
|
||||
for listener in listeners:
|
||||
if listener["type"] == "http":
|
||||
self._listen_http(listener)
|
||||
elif listener["type"] == "manhole":
|
||||
_base.listen_tcp(
|
||||
listener["bind_addresses"],
|
||||
listener["port"],
|
||||
manhole(
|
||||
username="matrix", password="rabbithole", globals={"hs": self}
|
||||
),
|
||||
)
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warning(
|
||||
(
|
||||
"Metrics listener configured, but "
|
||||
"enable_metrics is not True!"
|
||||
)
|
||||
)
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"], listener["port"])
|
||||
else:
|
||||
logger.warning("Unrecognized listener type: %s", listener["type"])
|
||||
|
||||
self.get_tcp_replication().start_replication(self)
|
||||
|
||||
def build_tcp_replication(self):
|
||||
return ReplicationClientHandler(self.get_datastore())
|
||||
|
||||
|
||||
def start(config_options):
|
||||
try:
|
||||
config = HomeServerConfig.load_config(
|
||||
"Synapse media repository", config_options
|
||||
)
|
||||
except ConfigError as e:
|
||||
sys.stderr.write("\n" + str(e) + "\n")
|
||||
sys.exit(1)
|
||||
|
||||
assert config.worker_app == "synapse.app.media_repository"
|
||||
|
||||
if config.enable_media_repo:
|
||||
_base.quit_with_error(
|
||||
"enable_media_repo must be disabled in the main synapse process\n"
|
||||
"before the media repo can be run in a separate worker.\n"
|
||||
"Please add ``enable_media_repo: false`` to the main config\n"
|
||||
)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
ss = MediaRepositoryServer(
|
||||
config.server_name,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
)
|
||||
|
||||
_base.start_worker_reactor("synapse-media-repository", config)
|
||||
|
||||
from synapse.app.generic_worker import start
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
if __name__ == "__main__":
|
||||
with LoggingContext("main"):
|
||||
|
||||
@@ -13,213 +13,12 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
import sys
|
||||
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
import synapse
|
||||
from synapse import events
|
||||
from synapse.app import _base
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext, run_in_background
|
||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||
from synapse.replication.slave.storage._base import __func__
|
||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.pushers import SlavedPusherStore
|
||||
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||
from synapse.replication.slave.storage.room import RoomStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage import DataStore
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
logger = logging.getLogger("synapse.app.pusher")
|
||||
|
||||
|
||||
class PusherSlaveStore(
|
||||
SlavedEventStore,
|
||||
SlavedPusherStore,
|
||||
SlavedReceiptsStore,
|
||||
SlavedAccountDataStore,
|
||||
RoomStore,
|
||||
):
|
||||
update_pusher_last_stream_ordering_and_success = __func__(
|
||||
DataStore.update_pusher_last_stream_ordering_and_success
|
||||
)
|
||||
|
||||
update_pusher_failing_since = __func__(DataStore.update_pusher_failing_since)
|
||||
|
||||
update_pusher_last_stream_ordering = __func__(
|
||||
DataStore.update_pusher_last_stream_ordering
|
||||
)
|
||||
|
||||
get_throttle_params_by_room = __func__(DataStore.get_throttle_params_by_room)
|
||||
|
||||
set_throttle_params = __func__(DataStore.set_throttle_params)
|
||||
|
||||
get_time_of_last_push_action_before = __func__(
|
||||
DataStore.get_time_of_last_push_action_before
|
||||
)
|
||||
|
||||
get_profile_displayname = __func__(DataStore.get_profile_displayname)
|
||||
|
||||
|
||||
class PusherServer(HomeServer):
|
||||
DATASTORE_CLASS = PusherSlaveStore
|
||||
|
||||
def remove_pusher(self, app_id, push_key, user_id):
|
||||
self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
bind_addresses = listener_config["bind_addresses"]
|
||||
site_tag = listener_config.get("tag", port)
|
||||
resources = {}
|
||||
for res in listener_config["resources"]:
|
||||
for name in res["names"]:
|
||||
if name == "metrics":
|
||||
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
||||
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
_base.listen_tcp(
|
||||
bind_addresses,
|
||||
port,
|
||||
SynapseSite(
|
||||
"synapse.access.http.%s" % (site_tag,),
|
||||
site_tag,
|
||||
listener_config,
|
||||
root_resource,
|
||||
self.version_string,
|
||||
),
|
||||
)
|
||||
|
||||
logger.info("Synapse pusher now listening on port %d", port)
|
||||
|
||||
def start_listening(self, listeners):
|
||||
for listener in listeners:
|
||||
if listener["type"] == "http":
|
||||
self._listen_http(listener)
|
||||
elif listener["type"] == "manhole":
|
||||
_base.listen_tcp(
|
||||
listener["bind_addresses"],
|
||||
listener["port"],
|
||||
manhole(
|
||||
username="matrix", password="rabbithole", globals={"hs": self}
|
||||
),
|
||||
)
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warning(
|
||||
(
|
||||
"Metrics listener configured, but "
|
||||
"enable_metrics is not True!"
|
||||
)
|
||||
)
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"], listener["port"])
|
||||
else:
|
||||
logger.warning("Unrecognized listener type: %s", listener["type"])
|
||||
|
||||
self.get_tcp_replication().start_replication(self)
|
||||
|
||||
def build_tcp_replication(self):
|
||||
return PusherReplicationHandler(self)
|
||||
|
||||
|
||||
class PusherReplicationHandler(ReplicationClientHandler):
|
||||
def __init__(self, hs):
|
||||
super(PusherReplicationHandler, self).__init__(hs.get_datastore())
|
||||
|
||||
self.pusher_pool = hs.get_pusherpool()
|
||||
|
||||
async def on_rdata(self, stream_name, token, rows):
|
||||
await super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
run_in_background(self.poke_pushers, stream_name, token, rows)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def poke_pushers(self, stream_name, token, rows):
|
||||
try:
|
||||
if stream_name == "pushers":
|
||||
for row in rows:
|
||||
if row.deleted:
|
||||
yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
|
||||
else:
|
||||
yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
|
||||
elif stream_name == "events":
|
||||
yield self.pusher_pool.on_new_notifications(token, token)
|
||||
elif stream_name == "receipts":
|
||||
yield self.pusher_pool.on_new_receipts(
|
||||
token, token, set(row.room_id for row in rows)
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Error poking pushers")
|
||||
|
||||
def stop_pusher(self, user_id, app_id, pushkey):
|
||||
key = "%s:%s" % (app_id, pushkey)
|
||||
pushers_for_user = self.pusher_pool.pushers.get(user_id, {})
|
||||
pusher = pushers_for_user.pop(key, None)
|
||||
if pusher is None:
|
||||
return
|
||||
logger.info("Stopping pusher %r / %r", user_id, key)
|
||||
pusher.on_stop()
|
||||
|
||||
def start_pusher(self, user_id, app_id, pushkey):
|
||||
key = "%s:%s" % (app_id, pushkey)
|
||||
logger.info("Starting pusher %r / %r", user_id, key)
|
||||
return self.pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)
|
||||
|
||||
|
||||
def start(config_options):
|
||||
try:
|
||||
config = HomeServerConfig.load_config("Synapse pusher", config_options)
|
||||
except ConfigError as e:
|
||||
sys.stderr.write("\n" + str(e) + "\n")
|
||||
sys.exit(1)
|
||||
|
||||
assert config.worker_app == "synapse.app.pusher"
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
if config.start_pushers:
|
||||
sys.stderr.write(
|
||||
"\nThe pushers must be disabled in the main synapse process"
|
||||
"\nbefore they can be run in a separate worker."
|
||||
"\nPlease add ``start_pushers: false`` to the main config"
|
||||
"\n"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
# Force the pushers to start since they will be disabled in the main config
|
||||
config.start_pushers = True
|
||||
|
||||
ps = PusherServer(
|
||||
config.server_name,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
)
|
||||
|
||||
setup_logging(ps, config, use_worker_options=True)
|
||||
|
||||
ps.setup()
|
||||
|
||||
def start():
|
||||
_base.start(ps, config.worker_listeners)
|
||||
ps.get_pusherpool().start()
|
||||
|
||||
reactor.addSystemEventTrigger("before", "startup", start)
|
||||
|
||||
_base.start_worker_reactor("synapse-pusher", config)
|
||||
|
||||
from synapse.app.generic_worker import start
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
if __name__ == "__main__":
|
||||
with LoggingContext("main"):
|
||||
ps = start(sys.argv[1:])
|
||||
start(sys.argv[1:])
|
||||
|
||||
@@ -13,454 +13,11 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import contextlib
|
||||
import logging
|
||||
|
||||
import sys
|
||||
|
||||
from six import iteritems
|
||||
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
import synapse
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.app import _base
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.handlers.presence import PresenceHandler, get_interested_parties
|
||||
from synapse.http.server import JsonResource
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext, run_in_background
|
||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
|
||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
|
||||
from synapse.replication.slave.storage.devices import SlavedDeviceStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
|
||||
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
|
||||
from synapse.replication.slave.storage.presence import SlavedPresenceStore
|
||||
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
|
||||
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.slave.storage.room import RoomStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow
|
||||
from synapse.rest.client.v1 import events
|
||||
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
|
||||
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
|
||||
from synapse.rest.client.v2_alpha import sync
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.data_stores.main.monthly_active_users import (
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
)
|
||||
from synapse.storage.data_stores.main.presence import UserPresenceState
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.stringutils import random_string
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
logger = logging.getLogger("synapse.app.synchrotron")
|
||||
|
||||
|
||||
class SynchrotronSlavedStore(
|
||||
SlavedReceiptsStore,
|
||||
SlavedAccountDataStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
SlavedFilteringStore,
|
||||
SlavedPresenceStore,
|
||||
SlavedGroupServerStore,
|
||||
SlavedDeviceInboxStore,
|
||||
SlavedDeviceStore,
|
||||
SlavedPushRuleStore,
|
||||
SlavedEventStore,
|
||||
SlavedClientIpStore,
|
||||
RoomStore,
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
BaseSlavedStore,
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
UPDATE_SYNCING_USERS_MS = 10 * 1000
|
||||
|
||||
|
||||
class SynchrotronPresence(object):
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
self.http_client = hs.get_simple_http_client()
|
||||
self.store = hs.get_datastore()
|
||||
self.user_to_num_current_syncs = {}
|
||||
self.clock = hs.get_clock()
|
||||
self.notifier = hs.get_notifier()
|
||||
|
||||
active_presence = self.store.take_presence_startup_info()
|
||||
self.user_to_current_state = {state.user_id: state for state in active_presence}
|
||||
|
||||
# user_id -> last_sync_ms. Lists the users that have stopped syncing
|
||||
# but we haven't notified the master of that yet
|
||||
self.users_going_offline = {}
|
||||
|
||||
self._send_stop_syncing_loop = self.clock.looping_call(
|
||||
self.send_stop_syncing, 10 * 1000
|
||||
)
|
||||
|
||||
self.process_id = random_string(16)
|
||||
logger.info("Presence process_id is %r", self.process_id)
|
||||
|
||||
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
|
||||
if self.hs.config.use_presence:
|
||||
self.hs.get_tcp_replication().send_user_sync(
|
||||
user_id, is_syncing, last_sync_ms
|
||||
)
|
||||
|
||||
def mark_as_coming_online(self, user_id):
|
||||
"""A user has started syncing. Send a UserSync to the master, unless they
|
||||
had recently stopped syncing.
|
||||
|
||||
Args:
|
||||
user_id (str)
|
||||
"""
|
||||
going_offline = self.users_going_offline.pop(user_id, None)
|
||||
if not going_offline:
|
||||
# Safe to skip because we haven't yet told the master they were offline
|
||||
self.send_user_sync(user_id, True, self.clock.time_msec())
|
||||
|
||||
def mark_as_going_offline(self, user_id):
|
||||
"""A user has stopped syncing. We wait before notifying the master as
|
||||
its likely they'll come back soon. This allows us to avoid sending
|
||||
a stopped syncing immediately followed by a started syncing notification
|
||||
to the master
|
||||
|
||||
Args:
|
||||
user_id (str)
|
||||
"""
|
||||
self.users_going_offline[user_id] = self.clock.time_msec()
|
||||
|
||||
def send_stop_syncing(self):
|
||||
"""Check if there are any users who have stopped syncing a while ago
|
||||
and haven't come back yet. If there are poke the master about them.
|
||||
"""
|
||||
now = self.clock.time_msec()
|
||||
for user_id, last_sync_ms in list(self.users_going_offline.items()):
|
||||
if now - last_sync_ms > 10 * 1000:
|
||||
self.users_going_offline.pop(user_id, None)
|
||||
self.send_user_sync(user_id, False, last_sync_ms)
|
||||
|
||||
def set_state(self, user, state, ignore_status_msg=False):
|
||||
# TODO Hows this supposed to work?
|
||||
return defer.succeed(None)
|
||||
|
||||
get_states = __func__(PresenceHandler.get_states)
|
||||
get_state = __func__(PresenceHandler.get_state)
|
||||
current_state_for_users = __func__(PresenceHandler.current_state_for_users)
|
||||
|
||||
def user_syncing(self, user_id, affect_presence):
|
||||
if affect_presence:
|
||||
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
|
||||
self.user_to_num_current_syncs[user_id] = curr_sync + 1
|
||||
|
||||
# If we went from no in flight sync to some, notify replication
|
||||
if self.user_to_num_current_syncs[user_id] == 1:
|
||||
self.mark_as_coming_online(user_id)
|
||||
|
||||
def _end():
|
||||
# We check that the user_id is in user_to_num_current_syncs because
|
||||
# user_to_num_current_syncs may have been cleared if we are
|
||||
# shutting down.
|
||||
if affect_presence and user_id in self.user_to_num_current_syncs:
|
||||
self.user_to_num_current_syncs[user_id] -= 1
|
||||
|
||||
# If we went from one in flight sync to non, notify replication
|
||||
if self.user_to_num_current_syncs[user_id] == 0:
|
||||
self.mark_as_going_offline(user_id)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _user_syncing():
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
_end()
|
||||
|
||||
return defer.succeed(_user_syncing())
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def notify_from_replication(self, states, stream_id):
|
||||
parties = yield get_interested_parties(self.store, states)
|
||||
room_ids_to_states, users_to_states = parties
|
||||
|
||||
self.notifier.on_new_event(
|
||||
"presence_key",
|
||||
stream_id,
|
||||
rooms=room_ids_to_states.keys(),
|
||||
users=users_to_states.keys(),
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def process_replication_rows(self, token, rows):
|
||||
states = [
|
||||
UserPresenceState(
|
||||
row.user_id,
|
||||
row.state,
|
||||
row.last_active_ts,
|
||||
row.last_federation_update_ts,
|
||||
row.last_user_sync_ts,
|
||||
row.status_msg,
|
||||
row.currently_active,
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
|
||||
for state in states:
|
||||
self.user_to_current_state[state.user_id] = state
|
||||
|
||||
stream_id = token
|
||||
yield self.notify_from_replication(states, stream_id)
|
||||
|
||||
def get_currently_syncing_users(self):
|
||||
if self.hs.config.use_presence:
|
||||
return [
|
||||
user_id
|
||||
for user_id, count in iteritems(self.user_to_num_current_syncs)
|
||||
if count > 0
|
||||
]
|
||||
else:
|
||||
return set()
|
||||
|
||||
|
||||
class SynchrotronTyping(object):
|
||||
def __init__(self, hs):
|
||||
self._latest_room_serial = 0
|
||||
self._reset()
|
||||
|
||||
def _reset(self):
|
||||
"""
|
||||
Reset the typing handler's data caches.
|
||||
"""
|
||||
# map room IDs to serial numbers
|
||||
self._room_serials = {}
|
||||
# map room IDs to sets of users currently typing
|
||||
self._room_typing = {}
|
||||
|
||||
def stream_positions(self):
|
||||
# We must update this typing token from the response of the previous
|
||||
# sync. In particular, the stream id may "reset" back to zero/a low
|
||||
# value which we *must* use for the next replication request.
|
||||
return {"typing": self._latest_room_serial}
|
||||
|
||||
def process_replication_rows(self, token, rows):
|
||||
if self._latest_room_serial > token:
|
||||
# The master has gone backwards. To prevent inconsistent data, just
|
||||
# clear everything.
|
||||
self._reset()
|
||||
|
||||
# Set the latest serial token to whatever the server gave us.
|
||||
self._latest_room_serial = token
|
||||
|
||||
for row in rows:
|
||||
self._room_serials[row.room_id] = token
|
||||
self._room_typing[row.room_id] = row.user_ids
|
||||
|
||||
|
||||
class SynchrotronApplicationService(object):
|
||||
def notify_interested_services(self, event):
|
||||
pass
|
||||
|
||||
|
||||
class SynchrotronServer(HomeServer):
|
||||
DATASTORE_CLASS = SynchrotronSlavedStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
bind_addresses = listener_config["bind_addresses"]
|
||||
site_tag = listener_config.get("tag", port)
|
||||
resources = {}
|
||||
for res in listener_config["resources"]:
|
||||
for name in res["names"]:
|
||||
if name == "metrics":
|
||||
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
||||
elif name == "client":
|
||||
resource = JsonResource(self, canonical_json=False)
|
||||
sync.register_servlets(self, resource)
|
||||
events.register_servlets(self, resource)
|
||||
InitialSyncRestServlet(self).register(resource)
|
||||
RoomInitialSyncRestServlet(self).register(resource)
|
||||
resources.update(
|
||||
{
|
||||
"/_matrix/client/r0": resource,
|
||||
"/_matrix/client/unstable": resource,
|
||||
"/_matrix/client/v2_alpha": resource,
|
||||
"/_matrix/client/api/v1": resource,
|
||||
}
|
||||
)
|
||||
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
_base.listen_tcp(
|
||||
bind_addresses,
|
||||
port,
|
||||
SynapseSite(
|
||||
"synapse.access.http.%s" % (site_tag,),
|
||||
site_tag,
|
||||
listener_config,
|
||||
root_resource,
|
||||
self.version_string,
|
||||
),
|
||||
)
|
||||
|
||||
logger.info("Synapse synchrotron now listening on port %d", port)
|
||||
|
||||
def start_listening(self, listeners):
|
||||
for listener in listeners:
|
||||
if listener["type"] == "http":
|
||||
self._listen_http(listener)
|
||||
elif listener["type"] == "manhole":
|
||||
_base.listen_tcp(
|
||||
listener["bind_addresses"],
|
||||
listener["port"],
|
||||
manhole(
|
||||
username="matrix", password="rabbithole", globals={"hs": self}
|
||||
),
|
||||
)
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warning(
|
||||
(
|
||||
"Metrics listener configured, but "
|
||||
"enable_metrics is not True!"
|
||||
)
|
||||
)
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"], listener["port"])
|
||||
else:
|
||||
logger.warning("Unrecognized listener type: %s", listener["type"])
|
||||
|
||||
self.get_tcp_replication().start_replication(self)
|
||||
|
||||
def build_tcp_replication(self):
|
||||
return SyncReplicationHandler(self)
|
||||
|
||||
def build_presence_handler(self):
|
||||
return SynchrotronPresence(self)
|
||||
|
||||
def build_typing_handler(self):
|
||||
return SynchrotronTyping(self)
|
||||
|
||||
|
||||
class SyncReplicationHandler(ReplicationClientHandler):
|
||||
def __init__(self, hs):
|
||||
super(SyncReplicationHandler, self).__init__(hs.get_datastore())
|
||||
|
||||
self.store = hs.get_datastore()
|
||||
self.typing_handler = hs.get_typing_handler()
|
||||
# NB this is a SynchrotronPresence, not a normal PresenceHandler
|
||||
self.presence_handler = hs.get_presence_handler()
|
||||
self.notifier = hs.get_notifier()
|
||||
|
||||
async def on_rdata(self, stream_name, token, rows):
|
||||
await super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
run_in_background(self.process_and_notify, stream_name, token, rows)
|
||||
|
||||
def get_streams_to_replicate(self):
|
||||
args = super(SyncReplicationHandler, self).get_streams_to_replicate()
|
||||
args.update(self.typing_handler.stream_positions())
|
||||
return args
|
||||
|
||||
def get_currently_syncing_users(self):
|
||||
return self.presence_handler.get_currently_syncing_users()
|
||||
|
||||
async def process_and_notify(self, stream_name, token, rows):
|
||||
try:
|
||||
if stream_name == "events":
|
||||
# We shouldn't get multiple rows per token for events stream, so
|
||||
# we don't need to optimise this for multiple rows.
|
||||
for row in rows:
|
||||
if row.type != EventsStreamEventRow.TypeId:
|
||||
continue
|
||||
assert isinstance(row, EventsStreamRow)
|
||||
|
||||
event = await self.store.get_event(
|
||||
row.data.event_id, allow_rejected=True
|
||||
)
|
||||
if event.rejected_reason:
|
||||
continue
|
||||
|
||||
extra_users = ()
|
||||
if event.type == EventTypes.Member:
|
||||
extra_users = (event.state_key,)
|
||||
max_token = self.store.get_room_max_stream_ordering()
|
||||
self.notifier.on_new_room_event(
|
||||
event, token, max_token, extra_users
|
||||
)
|
||||
elif stream_name == "push_rules":
|
||||
self.notifier.on_new_event(
|
||||
"push_rules_key", token, users=[row.user_id for row in rows]
|
||||
)
|
||||
elif stream_name in ("account_data", "tag_account_data"):
|
||||
self.notifier.on_new_event(
|
||||
"account_data_key", token, users=[row.user_id for row in rows]
|
||||
)
|
||||
elif stream_name == "receipts":
|
||||
self.notifier.on_new_event(
|
||||
"receipt_key", token, rooms=[row.room_id for row in rows]
|
||||
)
|
||||
elif stream_name == "typing":
|
||||
self.typing_handler.process_replication_rows(token, rows)
|
||||
self.notifier.on_new_event(
|
||||
"typing_key", token, rooms=[row.room_id for row in rows]
|
||||
)
|
||||
elif stream_name == "to_device":
|
||||
entities = [row.entity for row in rows if row.entity.startswith("@")]
|
||||
if entities:
|
||||
self.notifier.on_new_event("to_device_key", token, users=entities)
|
||||
elif stream_name == "device_lists":
|
||||
all_room_ids = set()
|
||||
for row in rows:
|
||||
room_ids = await self.store.get_rooms_for_user(row.user_id)
|
||||
all_room_ids.update(room_ids)
|
||||
self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
|
||||
elif stream_name == "presence":
|
||||
await self.presence_handler.process_replication_rows(token, rows)
|
||||
elif stream_name == "receipts":
|
||||
self.notifier.on_new_event(
|
||||
"groups_key", token, users=[row.user_id for row in rows]
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Error processing replication")
|
||||
|
||||
|
||||
def start(config_options):
|
||||
try:
|
||||
config = HomeServerConfig.load_config("Synapse synchrotron", config_options)
|
||||
except ConfigError as e:
|
||||
sys.stderr.write("\n" + str(e) + "\n")
|
||||
sys.exit(1)
|
||||
|
||||
assert config.worker_app == "synapse.app.synchrotron"
|
||||
|
||||
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
ss = SynchrotronServer(
|
||||
config.server_name,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
application_service_handler=SynchrotronApplicationService(),
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
)
|
||||
|
||||
_base.start_worker_reactor("synapse-synchrotron", config)
|
||||
|
||||
from synapse.app.generic_worker import start
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
if __name__ == "__main__":
|
||||
with LoggingContext("main"):
|
||||
|
||||
@@ -14,217 +14,10 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
import synapse
|
||||
from synapse import events
|
||||
from synapse.app import _base
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.http.server import JsonResource
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext, run_in_background
|
||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.replication.tcp.streams.events import (
|
||||
EventsStream,
|
||||
EventsStreamCurrentStateRow,
|
||||
)
|
||||
from synapse.rest.client.v2_alpha import user_directory
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
|
||||
from synapse.storage.database import Database
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
logger = logging.getLogger("synapse.app.user_dir")
|
||||
|
||||
|
||||
class UserDirectorySlaveStore(
|
||||
SlavedEventStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
SlavedClientIpStore,
|
||||
UserDirectoryStore,
|
||||
BaseSlavedStore,
|
||||
):
|
||||
def __init__(self, database: Database, db_conn, hs):
|
||||
super(UserDirectorySlaveStore, self).__init__(database, db_conn, hs)
|
||||
|
||||
events_max = self._stream_id_gen.get_current_token()
|
||||
curr_state_delta_prefill, min_curr_state_delta_id = self.db.get_cache_dict(
|
||||
db_conn,
|
||||
"current_state_delta_stream",
|
||||
entity_column="room_id",
|
||||
stream_column="stream_id",
|
||||
max_value=events_max, # As we share the stream id with events token
|
||||
limit=1000,
|
||||
)
|
||||
self._curr_state_delta_stream_cache = StreamChangeCache(
|
||||
"_curr_state_delta_stream_cache",
|
||||
min_curr_state_delta_id,
|
||||
prefilled_cache=curr_state_delta_prefill,
|
||||
)
|
||||
|
||||
def stream_positions(self):
|
||||
result = super(UserDirectorySlaveStore, self).stream_positions()
|
||||
return result
|
||||
|
||||
def process_replication_rows(self, stream_name, token, rows):
|
||||
if stream_name == EventsStream.NAME:
|
||||
self._stream_id_gen.advance(token)
|
||||
for row in rows:
|
||||
if row.type != EventsStreamCurrentStateRow.TypeId:
|
||||
continue
|
||||
self._curr_state_delta_stream_cache.entity_has_changed(
|
||||
row.data.room_id, token
|
||||
)
|
||||
return super(UserDirectorySlaveStore, self).process_replication_rows(
|
||||
stream_name, token, rows
|
||||
)
|
||||
|
||||
|
||||
class UserDirectoryServer(HomeServer):
|
||||
DATASTORE_CLASS = UserDirectorySlaveStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
bind_addresses = listener_config["bind_addresses"]
|
||||
site_tag = listener_config.get("tag", port)
|
||||
resources = {}
|
||||
for res in listener_config["resources"]:
|
||||
for name in res["names"]:
|
||||
if name == "metrics":
|
||||
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
||||
elif name == "client":
|
||||
resource = JsonResource(self, canonical_json=False)
|
||||
user_directory.register_servlets(self, resource)
|
||||
resources.update(
|
||||
{
|
||||
"/_matrix/client/r0": resource,
|
||||
"/_matrix/client/unstable": resource,
|
||||
"/_matrix/client/v2_alpha": resource,
|
||||
"/_matrix/client/api/v1": resource,
|
||||
}
|
||||
)
|
||||
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
_base.listen_tcp(
|
||||
bind_addresses,
|
||||
port,
|
||||
SynapseSite(
|
||||
"synapse.access.http.%s" % (site_tag,),
|
||||
site_tag,
|
||||
listener_config,
|
||||
root_resource,
|
||||
self.version_string,
|
||||
),
|
||||
)
|
||||
|
||||
logger.info("Synapse user_dir now listening on port %d", port)
|
||||
|
||||
def start_listening(self, listeners):
|
||||
for listener in listeners:
|
||||
if listener["type"] == "http":
|
||||
self._listen_http(listener)
|
||||
elif listener["type"] == "manhole":
|
||||
_base.listen_tcp(
|
||||
listener["bind_addresses"],
|
||||
listener["port"],
|
||||
manhole(
|
||||
username="matrix", password="rabbithole", globals={"hs": self}
|
||||
),
|
||||
)
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warning(
|
||||
(
|
||||
"Metrics listener configured, but "
|
||||
"enable_metrics is not True!"
|
||||
)
|
||||
)
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"], listener["port"])
|
||||
else:
|
||||
logger.warning("Unrecognized listener type: %s", listener["type"])
|
||||
|
||||
self.get_tcp_replication().start_replication(self)
|
||||
|
||||
def build_tcp_replication(self):
|
||||
return UserDirectoryReplicationHandler(self)
|
||||
|
||||
|
||||
class UserDirectoryReplicationHandler(ReplicationClientHandler):
|
||||
def __init__(self, hs):
|
||||
super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore())
|
||||
self.user_directory = hs.get_user_directory_handler()
|
||||
|
||||
async def on_rdata(self, stream_name, token, rows):
|
||||
await super(UserDirectoryReplicationHandler, self).on_rdata(
|
||||
stream_name, token, rows
|
||||
)
|
||||
if stream_name == EventsStream.NAME:
|
||||
run_in_background(self._notify_directory)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _notify_directory(self):
|
||||
try:
|
||||
yield self.user_directory.notify_new_event()
|
||||
except Exception:
|
||||
logger.exception("Error notifiying user directory of state update")
|
||||
|
||||
|
||||
def start(config_options):
|
||||
try:
|
||||
config = HomeServerConfig.load_config("Synapse user directory", config_options)
|
||||
except ConfigError as e:
|
||||
sys.stderr.write("\n" + str(e) + "\n")
|
||||
sys.exit(1)
|
||||
|
||||
assert config.worker_app == "synapse.app.user_dir"
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
if config.update_user_directory:
|
||||
sys.stderr.write(
|
||||
"\nThe update_user_directory must be disabled in the main synapse process"
|
||||
"\nbefore they can be run in a separate worker."
|
||||
"\nPlease add ``update_user_directory: false`` to the main config"
|
||||
"\n"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
# Force the pushers to start since they will be disabled in the main config
|
||||
config.update_user_directory = True
|
||||
|
||||
ss = UserDirectoryServer(
|
||||
config.server_name,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
)
|
||||
|
||||
_base.start_worker_reactor("synapse-user-dir", config)
|
||||
|
||||
from synapse.app.generic_worker import start
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
if __name__ == "__main__":
|
||||
with LoggingContext("main"):
|
||||
|
||||
@@ -24,6 +24,7 @@ from synapse.config import (
|
||||
server,
|
||||
server_notices_config,
|
||||
spam_checker,
|
||||
sso,
|
||||
stats,
|
||||
third_party_event_rules,
|
||||
tls,
|
||||
@@ -57,6 +58,7 @@ class RootConfig:
|
||||
key: key.KeyConfig
|
||||
saml2: saml2_config.SAML2Config
|
||||
cas: cas.CasConfig
|
||||
sso: sso.SSOConfig
|
||||
jwt: jwt_config.JWTConfig
|
||||
password: password.PasswordConfig
|
||||
email: emailconfig.EmailConfig
|
||||
|
||||
@@ -27,6 +27,12 @@ import pkg_resources
|
||||
|
||||
from ._base import Config, ConfigError
|
||||
|
||||
MISSING_PASSWORD_RESET_CONFIG_ERROR = """\
|
||||
Password reset emails are enabled on this homeserver due to a partial
|
||||
'email' block. However, the following required keys are missing:
|
||||
%s
|
||||
"""
|
||||
|
||||
|
||||
class EmailConfig(Config):
|
||||
section = "email"
|
||||
@@ -142,24 +148,18 @@ class EmailConfig(Config):
|
||||
bleach
|
||||
|
||||
if self.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
|
||||
required = ["smtp_host", "smtp_port", "notif_from"]
|
||||
|
||||
missing = []
|
||||
for k in required:
|
||||
if k not in email_config:
|
||||
missing.append("email." + k)
|
||||
if not self.email_notif_from:
|
||||
missing.append("email.notif_from")
|
||||
|
||||
# public_baseurl is required to build password reset and validation links that
|
||||
# will be emailed to users
|
||||
if config.get("public_baseurl") is None:
|
||||
missing.append("public_baseurl")
|
||||
|
||||
if len(missing) > 0:
|
||||
raise RuntimeError(
|
||||
"Password resets emails are configured to be sent from "
|
||||
"this homeserver due to a partial 'email' block. "
|
||||
"However, the following required keys are missing: %s"
|
||||
% (", ".join(missing),)
|
||||
if missing:
|
||||
raise ConfigError(
|
||||
MISSING_PASSWORD_RESET_CONFIG_ERROR % (", ".join(missing),)
|
||||
)
|
||||
|
||||
# These email templates have placeholders in them, and thus must be
|
||||
@@ -245,32 +245,25 @@ class EmailConfig(Config):
|
||||
)
|
||||
|
||||
if self.email_enable_notifs:
|
||||
required = [
|
||||
"smtp_host",
|
||||
"smtp_port",
|
||||
"notif_from",
|
||||
"notif_template_html",
|
||||
"notif_template_text",
|
||||
]
|
||||
|
||||
missing = []
|
||||
for k in required:
|
||||
if k not in email_config:
|
||||
missing.append(k)
|
||||
|
||||
if len(missing) > 0:
|
||||
raise RuntimeError(
|
||||
"email.enable_notifs is True but required keys are missing: %s"
|
||||
% (", ".join(["email." + k for k in missing]),)
|
||||
)
|
||||
if not self.email_notif_from:
|
||||
missing.append("email.notif_from")
|
||||
|
||||
if config.get("public_baseurl") is None:
|
||||
raise RuntimeError(
|
||||
"email.enable_notifs is True but no public_baseurl is set"
|
||||
missing.append("public_baseurl")
|
||||
|
||||
if missing:
|
||||
raise ConfigError(
|
||||
"email.enable_notifs is True but required keys are missing: %s"
|
||||
% (", ".join(missing),)
|
||||
)
|
||||
|
||||
self.email_notif_template_html = email_config["notif_template_html"]
|
||||
self.email_notif_template_text = email_config["notif_template_text"]
|
||||
self.email_notif_template_html = email_config.get(
|
||||
"notif_template_html", "notif_mail.html"
|
||||
)
|
||||
self.email_notif_template_text = email_config.get(
|
||||
"notif_template_text", "notif_mail.txt"
|
||||
)
|
||||
|
||||
for f in self.email_notif_template_text, self.email_notif_template_html:
|
||||
p = os.path.join(self.email_template_dir, f)
|
||||
@@ -323,10 +316,6 @@ class EmailConfig(Config):
|
||||
#
|
||||
#require_transport_security: true
|
||||
|
||||
# Enable sending emails for messages that the user has missed
|
||||
#
|
||||
#enable_notifs: false
|
||||
|
||||
# notif_from defines the "From" address to use when sending emails.
|
||||
# It must be set if email sending is enabled.
|
||||
#
|
||||
@@ -344,6 +333,11 @@ class EmailConfig(Config):
|
||||
#
|
||||
#app_name: my_branded_matrix_server
|
||||
|
||||
# Uncomment the following to enable sending emails for messages that the user
|
||||
# has missed. Disabled by default.
|
||||
#
|
||||
#enable_notifs: true
|
||||
|
||||
# Uncomment the following to disable automatic subscription to email
|
||||
# notifications for new users. Enabled by default.
|
||||
#
|
||||
|
||||
@@ -38,6 +38,7 @@ from .saml2_config import SAML2Config
|
||||
from .server import ServerConfig
|
||||
from .server_notices_config import ServerNoticesConfig
|
||||
from .spam_checker import SpamCheckerConfig
|
||||
from .sso import SSOConfig
|
||||
from .stats import StatsConfig
|
||||
from .third_party_event_rules import ThirdPartyRulesConfig
|
||||
from .tls import TlsConfig
|
||||
@@ -65,6 +66,7 @@ class HomeServerConfig(RootConfig):
|
||||
KeyConfig,
|
||||
SAML2Config,
|
||||
CasConfig,
|
||||
SSOConfig,
|
||||
JWTConfig,
|
||||
PasswordConfig,
|
||||
EmailConfig,
|
||||
|
||||
@@ -15,6 +15,9 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
||||
import pkg_resources
|
||||
|
||||
from synapse.python_dependencies import DependencyException, check_requirements
|
||||
from synapse.util.module_loader import load_module, load_python_module
|
||||
@@ -160,6 +163,14 @@ class SAML2Config(Config):
|
||||
saml2_config.get("saml_session_lifetime", "5m")
|
||||
)
|
||||
|
||||
template_dir = saml2_config.get("template_dir")
|
||||
if not template_dir:
|
||||
template_dir = pkg_resources.resource_filename("synapse", "res/templates",)
|
||||
|
||||
self.saml2_error_html_content = self.read_file(
|
||||
os.path.join(template_dir, "saml_error.html"), "saml2_config.saml_error",
|
||||
)
|
||||
|
||||
def _default_saml_config_dict(
|
||||
self, required_attributes: set, optional_attributes: set
|
||||
):
|
||||
@@ -325,6 +336,25 @@ class SAML2Config(Config):
|
||||
# The default is 'uid'.
|
||||
#
|
||||
#grandfathered_mxid_source_attribute: upn
|
||||
|
||||
# Directory in which Synapse will try to find the template files below.
|
||||
# If not set, default templates from within the Synapse package will be used.
|
||||
#
|
||||
# DO NOT UNCOMMENT THIS SETTING unless you want to customise the templates.
|
||||
# If you *do* uncomment it, you will need to make sure that all the templates
|
||||
# below are in the directory.
|
||||
#
|
||||
# Synapse will look for the following templates in this directory:
|
||||
#
|
||||
# * HTML page to display to users if something goes wrong during the
|
||||
# authentication process: 'saml_error.html'.
|
||||
#
|
||||
# This template doesn't currently need any variable to render.
|
||||
#
|
||||
# You can see the default templates at:
|
||||
# https://github.com/matrix-org/synapse/tree/master/synapse/res/templates
|
||||
#
|
||||
#template_dir: "res/templates"
|
||||
""" % {
|
||||
"config_dir_path": config_dir_path
|
||||
}
|
||||
|
||||
@@ -1066,12 +1066,12 @@ KNOWN_RESOURCES = (
|
||||
|
||||
|
||||
def _check_resource_config(listeners):
|
||||
resource_names = set(
|
||||
resource_names = {
|
||||
res_name
|
||||
for listener in listeners
|
||||
for res in listener.get("resources", [])
|
||||
for res_name in res.get("names", [])
|
||||
)
|
||||
}
|
||||
|
||||
for resource in resource_names:
|
||||
if resource not in KNOWN_RESOURCES:
|
||||
|
||||
92
synapse/config/sso.py
Normal file
92
synapse/config/sso.py
Normal file
@@ -0,0 +1,92 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from typing import Any, Dict
|
||||
|
||||
import pkg_resources
|
||||
|
||||
from ._base import Config
|
||||
|
||||
|
||||
class SSOConfig(Config):
|
||||
"""SSO Configuration
|
||||
"""
|
||||
|
||||
section = "sso"
|
||||
|
||||
def read_config(self, config, **kwargs):
|
||||
sso_config = config.get("sso") or {} # type: Dict[str, Any]
|
||||
|
||||
# Pick a template directory in order of:
|
||||
# * The sso-specific template_dir
|
||||
# * /path/to/synapse/install/res/templates
|
||||
template_dir = sso_config.get("template_dir")
|
||||
if not template_dir:
|
||||
template_dir = pkg_resources.resource_filename("synapse", "res/templates",)
|
||||
|
||||
self.sso_redirect_confirm_template_dir = template_dir
|
||||
|
||||
self.sso_client_whitelist = sso_config.get("client_whitelist") or []
|
||||
|
||||
def generate_config_section(self, **kwargs):
|
||||
return """\
|
||||
# Additional settings to use with single-sign on systems such as SAML2 and CAS.
|
||||
#
|
||||
sso:
|
||||
# A list of client URLs which are whitelisted so that the user does not
|
||||
# have to confirm giving access to their account to the URL. Any client
|
||||
# whose URL starts with an entry in the following list will not be subject
|
||||
# to an additional confirmation step after the SSO login is completed.
|
||||
#
|
||||
# WARNING: An entry such as "https://my.client" is insecure, because it
|
||||
# will also match "https://my.client.evil.site", exposing your users to
|
||||
# phishing attacks from evil.site. To avoid this, include a slash after the
|
||||
# hostname: "https://my.client/".
|
||||
#
|
||||
# By default, this list is empty.
|
||||
#
|
||||
#client_whitelist:
|
||||
# - https://riot.im/develop
|
||||
# - https://my.custom.client/
|
||||
|
||||
# Directory in which Synapse will try to find the template files below.
|
||||
# If not set, default templates from within the Synapse package will be used.
|
||||
#
|
||||
# DO NOT UNCOMMENT THIS SETTING unless you want to customise the templates.
|
||||
# If you *do* uncomment it, you will need to make sure that all the templates
|
||||
# below are in the directory.
|
||||
#
|
||||
# Synapse will look for the following templates in this directory:
|
||||
#
|
||||
# * HTML page for a confirmation step before redirecting back to the client
|
||||
# with the login token: 'sso_redirect_confirm.html'.
|
||||
#
|
||||
# When rendering, this template is given three variables:
|
||||
# * redirect_url: the URL the user is about to be redirected to. Needs
|
||||
# manual escaping (see
|
||||
# https://jinja.palletsprojects.com/en/2.11.x/templates/#html-escaping).
|
||||
#
|
||||
# * display_url: the same as `redirect_url`, but with the query
|
||||
# parameters stripped. The intention is to have a
|
||||
# human-readable URL to show to users, not to use it as
|
||||
# the final address to redirect to. Needs manual escaping
|
||||
# (see https://jinja.palletsprojects.com/en/2.11.x/templates/#html-escaping).
|
||||
#
|
||||
# * server_name: the homeserver's name.
|
||||
#
|
||||
# You can see the default templates at:
|
||||
# https://github.com/matrix-org/synapse/tree/master/synapse/res/templates
|
||||
#
|
||||
#template_dir: "res/templates"
|
||||
"""
|
||||
@@ -260,7 +260,7 @@ class TlsConfig(Config):
|
||||
crypto.FILETYPE_ASN1, self.tls_certificate
|
||||
)
|
||||
sha256_fingerprint = encode_base64(sha256(x509_certificate_bytes).digest())
|
||||
sha256_fingerprints = set(f["sha256"] for f in self.tls_fingerprints)
|
||||
sha256_fingerprints = {f["sha256"] for f in self.tls_fingerprints}
|
||||
if sha256_fingerprint not in sha256_fingerprints:
|
||||
self.tls_fingerprints.append({"sha256": sha256_fingerprint})
|
||||
|
||||
|
||||
@@ -75,7 +75,7 @@ class ServerContextFactory(ContextFactory):
|
||||
|
||||
|
||||
@implementer(IPolicyForHTTPS)
|
||||
class ClientTLSOptionsFactory(object):
|
||||
class FederationPolicyForHTTPS(object):
|
||||
"""Factory for Twisted SSLClientConnectionCreators that are used to make connections
|
||||
to remote servers for federation.
|
||||
|
||||
@@ -103,15 +103,15 @@ class ClientTLSOptionsFactory(object):
|
||||
# let us do).
|
||||
minTLS = _TLS_VERSION_MAP[config.federation_client_minimum_tls_version]
|
||||
|
||||
self._verify_ssl = CertificateOptions(
|
||||
_verify_ssl = CertificateOptions(
|
||||
trustRoot=trust_root, insecurelyLowerMinimumTo=minTLS
|
||||
)
|
||||
self._verify_ssl_context = self._verify_ssl.getContext()
|
||||
self._verify_ssl_context.set_info_callback(self._context_info_cb)
|
||||
self._verify_ssl_context = _verify_ssl.getContext()
|
||||
self._verify_ssl_context.set_info_callback(_context_info_cb)
|
||||
|
||||
self._no_verify_ssl = CertificateOptions(insecurelyLowerMinimumTo=minTLS)
|
||||
self._no_verify_ssl_context = self._no_verify_ssl.getContext()
|
||||
self._no_verify_ssl_context.set_info_callback(self._context_info_cb)
|
||||
_no_verify_ssl = CertificateOptions(insecurelyLowerMinimumTo=minTLS)
|
||||
self._no_verify_ssl_context = _no_verify_ssl.getContext()
|
||||
self._no_verify_ssl_context.set_info_callback(_context_info_cb)
|
||||
|
||||
def get_options(self, host: bytes):
|
||||
|
||||
@@ -136,23 +136,6 @@ class ClientTLSOptionsFactory(object):
|
||||
|
||||
return SSLClientConnectionCreator(host, ssl_context, should_verify)
|
||||
|
||||
@staticmethod
|
||||
def _context_info_cb(ssl_connection, where, ret):
|
||||
"""The 'information callback' for our openssl context object."""
|
||||
# we assume that the app_data on the connection object has been set to
|
||||
# a TLSMemoryBIOProtocol object. (This is done by SSLClientConnectionCreator)
|
||||
tls_protocol = ssl_connection.get_app_data()
|
||||
try:
|
||||
# ... we further assume that SSLClientConnectionCreator has set the
|
||||
# '_synapse_tls_verifier' attribute to a ConnectionVerifier object.
|
||||
tls_protocol._synapse_tls_verifier.verify_context_info_cb(
|
||||
ssl_connection, where
|
||||
)
|
||||
except: # noqa: E722, taken from the twisted implementation
|
||||
logger.exception("Error during info_callback")
|
||||
f = Failure()
|
||||
tls_protocol.failVerification(f)
|
||||
|
||||
def creatorForNetloc(self, hostname, port):
|
||||
"""Implements the IPolicyForHTTPS interace so that this can be passed
|
||||
directly to agents.
|
||||
@@ -160,6 +143,43 @@ class ClientTLSOptionsFactory(object):
|
||||
return self.get_options(hostname)
|
||||
|
||||
|
||||
@implementer(IPolicyForHTTPS)
|
||||
class RegularPolicyForHTTPS(object):
|
||||
"""Factory for Twisted SSLClientConnectionCreators that are used to make connections
|
||||
to remote servers, for other than federation.
|
||||
|
||||
Always uses the same OpenSSL context object, which uses the default OpenSSL CA
|
||||
trust root.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
trust_root = platformTrust()
|
||||
self._ssl_context = CertificateOptions(trustRoot=trust_root).getContext()
|
||||
self._ssl_context.set_info_callback(_context_info_cb)
|
||||
|
||||
def creatorForNetloc(self, hostname, port):
|
||||
return SSLClientConnectionCreator(hostname, self._ssl_context, True)
|
||||
|
||||
|
||||
def _context_info_cb(ssl_connection, where, ret):
|
||||
"""The 'information callback' for our openssl context objects.
|
||||
|
||||
Note: Once this is set as the info callback on a Context object, the Context should
|
||||
only be used with the SSLClientConnectionCreator.
|
||||
"""
|
||||
# we assume that the app_data on the connection object has been set to
|
||||
# a TLSMemoryBIOProtocol object. (This is done by SSLClientConnectionCreator)
|
||||
tls_protocol = ssl_connection.get_app_data()
|
||||
try:
|
||||
# ... we further assume that SSLClientConnectionCreator has set the
|
||||
# '_synapse_tls_verifier' attribute to a ConnectionVerifier object.
|
||||
tls_protocol._synapse_tls_verifier.verify_context_info_cb(ssl_connection, where)
|
||||
except: # noqa: E722, taken from the twisted implementation
|
||||
logger.exception("Error during info_callback")
|
||||
f = Failure()
|
||||
tls_protocol.failVerification(f)
|
||||
|
||||
|
||||
@implementer(IOpenSSLClientConnectionCreator)
|
||||
class SSLClientConnectionCreator(object):
|
||||
"""Creates openssl connection objects for client connections.
|
||||
|
||||
@@ -140,7 +140,7 @@ def compute_event_signature(
|
||||
Returns:
|
||||
a dictionary in the same format of an event's signatures field.
|
||||
"""
|
||||
redact_json = prune_event_dict(event_dict)
|
||||
redact_json = prune_event_dict(room_version, event_dict)
|
||||
redact_json.pop("age_ts", None)
|
||||
redact_json.pop("unsigned", None)
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
|
||||
@@ -326,9 +326,7 @@ class Keyring(object):
|
||||
verify_requests (list[VerifyJsonRequest]): list of verify requests
|
||||
"""
|
||||
|
||||
remaining_requests = set(
|
||||
(rq for rq in verify_requests if not rq.key_ready.called)
|
||||
)
|
||||
remaining_requests = {rq for rq in verify_requests if not rq.key_ready.called}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def do_iterations():
|
||||
@@ -396,7 +394,7 @@ class Keyring(object):
|
||||
|
||||
results = yield fetcher.get_keys(missing_keys)
|
||||
|
||||
completed = list()
|
||||
completed = []
|
||||
for verify_request in remaining_requests:
|
||||
server_name = verify_request.server_name
|
||||
|
||||
|
||||
@@ -137,7 +137,7 @@ def check(
|
||||
raise AuthError(403, "This room has been marked as unfederatable.")
|
||||
|
||||
# 4. If type is m.room.aliases
|
||||
if event.type == EventTypes.Aliases:
|
||||
if event.type == EventTypes.Aliases and room_version_obj.special_case_aliases_auth:
|
||||
# 4a. If event has no state_key, reject
|
||||
if not event.is_state():
|
||||
raise AuthError(403, "Alias event must be a state event")
|
||||
@@ -152,10 +152,8 @@ def check(
|
||||
)
|
||||
|
||||
# 4c. Otherwise, allow.
|
||||
# This is removed by https://github.com/matrix-org/matrix-doc/pull/2260
|
||||
if room_version_obj.special_case_aliases_auth:
|
||||
logger.debug("Allowing! %s", event)
|
||||
return
|
||||
logger.debug("Allowing! %s", event)
|
||||
return
|
||||
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logger.debug("Auth events: %s", [a.event_id for a in auth_events.values()])
|
||||
|
||||
@@ -15,9 +15,10 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import abc
|
||||
import os
|
||||
from distutils.util import strtobool
|
||||
from typing import Optional, Type
|
||||
from typing import Dict, Optional, Type
|
||||
|
||||
import six
|
||||
|
||||
@@ -199,15 +200,25 @@ class _EventInternalMetadata(object):
|
||||
return self._dict.get("redacted", False)
|
||||
|
||||
|
||||
class EventBase(object):
|
||||
class EventBase(metaclass=abc.ABCMeta):
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def format_version(self) -> int:
|
||||
"""The EventFormatVersion implemented by this event"""
|
||||
...
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
event_dict,
|
||||
signatures={},
|
||||
unsigned={},
|
||||
internal_metadata_dict={},
|
||||
rejected_reason=None,
|
||||
event_dict: JsonDict,
|
||||
room_version: RoomVersion,
|
||||
signatures: Dict[str, Dict[str, str]],
|
||||
unsigned: JsonDict,
|
||||
internal_metadata_dict: JsonDict,
|
||||
rejected_reason: Optional[str],
|
||||
):
|
||||
assert room_version.event_format == self.format_version
|
||||
|
||||
self.room_version = room_version
|
||||
self.signatures = signatures
|
||||
self.unsigned = unsigned
|
||||
self.rejected_reason = rejected_reason
|
||||
@@ -303,7 +314,13 @@ class EventBase(object):
|
||||
class FrozenEvent(EventBase):
|
||||
format_version = EventFormatVersions.V1 # All events of this type are V1
|
||||
|
||||
def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None):
|
||||
def __init__(
|
||||
self,
|
||||
event_dict: JsonDict,
|
||||
room_version: RoomVersion,
|
||||
internal_metadata_dict: JsonDict = {},
|
||||
rejected_reason: Optional[str] = None,
|
||||
):
|
||||
event_dict = dict(event_dict)
|
||||
|
||||
# Signatures is a dict of dicts, and this is faster than doing a
|
||||
@@ -326,8 +343,9 @@ class FrozenEvent(EventBase):
|
||||
|
||||
self._event_id = event_dict["event_id"]
|
||||
|
||||
super(FrozenEvent, self).__init__(
|
||||
super().__init__(
|
||||
frozen_dict,
|
||||
room_version=room_version,
|
||||
signatures=signatures,
|
||||
unsigned=unsigned,
|
||||
internal_metadata_dict=internal_metadata_dict,
|
||||
@@ -352,7 +370,13 @@ class FrozenEvent(EventBase):
|
||||
class FrozenEventV2(EventBase):
|
||||
format_version = EventFormatVersions.V2 # All events of this type are V2
|
||||
|
||||
def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None):
|
||||
def __init__(
|
||||
self,
|
||||
event_dict: JsonDict,
|
||||
room_version: RoomVersion,
|
||||
internal_metadata_dict: JsonDict = {},
|
||||
rejected_reason: Optional[str] = None,
|
||||
):
|
||||
event_dict = dict(event_dict)
|
||||
|
||||
# Signatures is a dict of dicts, and this is faster than doing a
|
||||
@@ -377,8 +401,9 @@ class FrozenEventV2(EventBase):
|
||||
|
||||
self._event_id = None
|
||||
|
||||
super(FrozenEventV2, self).__init__(
|
||||
super().__init__(
|
||||
frozen_dict,
|
||||
room_version=room_version,
|
||||
signatures=signatures,
|
||||
unsigned=unsigned,
|
||||
internal_metadata_dict=internal_metadata_dict,
|
||||
@@ -445,7 +470,7 @@ class FrozenEventV3(FrozenEventV2):
|
||||
return self._event_id
|
||||
|
||||
|
||||
def event_type_from_format_version(format_version: int) -> Type[EventBase]:
|
||||
def _event_type_from_format_version(format_version: int) -> Type[EventBase]:
|
||||
"""Returns the python type to use to construct an Event object for the
|
||||
given event format version.
|
||||
|
||||
@@ -474,5 +499,5 @@ def make_event_from_dict(
|
||||
rejected_reason: Optional[str] = None,
|
||||
) -> EventBase:
|
||||
"""Construct an EventBase from the given event dict"""
|
||||
event_type = event_type_from_format_version(room_version.event_format)
|
||||
return event_type(event_dict, internal_metadata_dict, rejected_reason)
|
||||
event_type = _event_type_from_format_version(room_version.event_format)
|
||||
return event_type(event_dict, room_version, internal_metadata_dict, rejected_reason)
|
||||
|
||||
@@ -23,6 +23,7 @@ from frozendict import frozendict
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes, RelationTypes
|
||||
from synapse.api.room_versions import RoomVersion
|
||||
from synapse.util.async_helpers import yieldable_gather_results
|
||||
|
||||
from . import EventBase
|
||||
@@ -35,26 +36,20 @@ from . import EventBase
|
||||
SPLIT_FIELD_REGEX = re.compile(r"(?<!\\)\.")
|
||||
|
||||
|
||||
def prune_event(event):
|
||||
def prune_event(event: EventBase) -> EventBase:
|
||||
""" Returns a pruned version of the given event, which removes all keys we
|
||||
don't know about or think could potentially be dodgy.
|
||||
|
||||
This is used when we "redact" an event. We want to remove all fields that
|
||||
the user has specified, but we do want to keep necessary information like
|
||||
type, state_key etc.
|
||||
|
||||
Args:
|
||||
event (FrozenEvent)
|
||||
|
||||
Returns:
|
||||
FrozenEvent
|
||||
"""
|
||||
pruned_event_dict = prune_event_dict(event.get_dict())
|
||||
pruned_event_dict = prune_event_dict(event.room_version, event.get_dict())
|
||||
|
||||
from . import event_type_from_format_version
|
||||
from . import make_event_from_dict
|
||||
|
||||
pruned_event = event_type_from_format_version(event.format_version)(
|
||||
pruned_event_dict, event.internal_metadata.get_dict()
|
||||
pruned_event = make_event_from_dict(
|
||||
pruned_event_dict, event.room_version, event.internal_metadata.get_dict()
|
||||
)
|
||||
|
||||
# Mark the event as redacted
|
||||
@@ -63,15 +58,12 @@ def prune_event(event):
|
||||
return pruned_event
|
||||
|
||||
|
||||
def prune_event_dict(event_dict):
|
||||
def prune_event_dict(room_version: RoomVersion, event_dict: dict) -> dict:
|
||||
"""Redacts the event_dict in the same way as `prune_event`, except it
|
||||
operates on dicts rather than event objects
|
||||
|
||||
Args:
|
||||
event_dict (dict)
|
||||
|
||||
Returns:
|
||||
dict: A copy of the pruned event dict
|
||||
A copy of the pruned event dict
|
||||
"""
|
||||
|
||||
allowed_keys = [
|
||||
@@ -118,7 +110,7 @@ def prune_event_dict(event_dict):
|
||||
"kick",
|
||||
"redact",
|
||||
)
|
||||
elif event_type == EventTypes.Aliases:
|
||||
elif event_type == EventTypes.Aliases and room_version.special_case_aliases_auth:
|
||||
add_fields("aliases")
|
||||
elif event_type == EventTypes.RoomHistoryVisibility:
|
||||
add_fields("history_visibility")
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user