Compare commits

..

3 Commits

Author SHA1 Message Date
Erik Johnston
930f5b0a25 Use *args 2024-10-04 16:21:40 +01:00
Erik Johnston
d56af0d83c Fixup 2024-10-04 15:47:27 +01:00
Erik Johnston
5039693b0a Run extensions in parallel 2024-10-04 15:43:46 +01:00
41 changed files with 243 additions and 636 deletions

View File

@@ -1,50 +1,3 @@
# Synapse 1.117.0 (2024-10-15)
No significant changes since 1.117.0rc1.
# Synapse 1.117.0rc1 (2024-10-08)
### Features
- Add config option `redis.password_path`. ([\#17717](https://github.com/element-hq/synapse/issues/17717))
### Bugfixes
- Fix a rare bug introduced in v1.29.0 where invalidating a user's access token from a worker could raise an error. ([\#17779](https://github.com/element-hq/synapse/issues/17779))
- In the response to `GET /_matrix/client/versions`, set the `unstable_features` flag for [MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140) to `false` when server configuration disables support for delayed events. ([\#17780](https://github.com/element-hq/synapse/issues/17780))
- Improve input validation and room membership checks in admin redaction API. ([\#17792](https://github.com/element-hq/synapse/issues/17792))
### Improved Documentation
- Clarify the docstring of `test_forget_when_not_left`. ([\#17628](https://github.com/element-hq/synapse/issues/17628))
- Add documentation note about PYTHONMALLOC for accurate jemalloc memory tracking. Contributed by @hensg. ([\#17709](https://github.com/element-hq/synapse/issues/17709))
- Remove spurious "TODO UPDATE ALL THIS" note in the Debian installation docs. ([\#17749](https://github.com/element-hq/synapse/issues/17749))
- Explain how load balancing works for `federation_sender_instances`. ([\#17776](https://github.com/element-hq/synapse/issues/17776))
### Internal Changes
- Minor performance increase for large accounts using sliding sync. ([\#17751](https://github.com/element-hq/synapse/issues/17751))
- Increase performance of the notifier when there are many syncing users. ([\#17765](https://github.com/element-hq/synapse/issues/17765), [\#17766](https://github.com/element-hq/synapse/issues/17766))
- Fix performance of streams that don't change often. ([\#17767](https://github.com/element-hq/synapse/issues/17767))
- Improve performance of sliding sync connections that do not ask for any rooms. ([\#17768](https://github.com/element-hq/synapse/issues/17768))
- Reduce overhead of sliding sync E2EE loops. ([\#17771](https://github.com/element-hq/synapse/issues/17771))
- Sliding sync minor performance speed up using new table. ([\#17787](https://github.com/element-hq/synapse/issues/17787))
- Sliding sync minor performance improvement by omitting unchanged data from incremental responses. ([\#17788](https://github.com/element-hq/synapse/issues/17788))
- Speed up sliding sync when there are many active subscriptions. ([\#17789](https://github.com/element-hq/synapse/issues/17789))
- Add missing license headers on new source files. ([\#17799](https://github.com/element-hq/synapse/issues/17799))
### Updates to locked dependencies
* Bump phonenumbers from 8.13.45 to 8.13.46. ([\#17773](https://github.com/element-hq/synapse/issues/17773))
* Bump python-multipart from 0.0.10 to 0.0.12. ([\#17772](https://github.com/element-hq/synapse/issues/17772))
* Bump regex from 1.10.6 to 1.11.0. ([\#17770](https://github.com/element-hq/synapse/issues/17770))
* Bump ruff from 0.6.7 to 0.6.8. ([\#17774](https://github.com/element-hq/synapse/issues/17774))
# Synapse 1.116.0 (2024-10-01) # Synapse 1.116.0 (2024-10-01)
No significant changes since 1.116.0rc2. No significant changes since 1.116.0rc2.

1
changelog.d/17749.doc Normal file
View File

@@ -0,0 +1 @@
Remove spurious "TODO UPDATE ALL THIS" note in the Debian installation docs.

1
changelog.d/17751.misc Normal file
View File

@@ -0,0 +1 @@
Minor performance increase for large accounts using sliding sync.

1
changelog.d/17765.misc Normal file
View File

@@ -0,0 +1 @@
Increase performance of the notifier when there are many syncing users.

1
changelog.d/17766.misc Normal file
View File

@@ -0,0 +1 @@
Increase performance of the notifier when there are many syncing users.

1
changelog.d/17767.misc Normal file
View File

@@ -0,0 +1 @@
Fix performance of streams that don't change often.

1
changelog.d/17768.misc Normal file
View File

@@ -0,0 +1 @@
Improve performance of sliding sync connections that do not ask for any rooms.

1
changelog.d/17771.misc Normal file
View File

@@ -0,0 +1 @@
Reduce overhead of sliding sync E2EE loops.

12
debian/changelog vendored
View File

@@ -1,15 +1,3 @@
matrix-synapse-py3 (1.117.0) stable; urgency=medium
* New Synapse release 1.117.0.
-- Synapse Packaging team <packages@matrix.org> Tue, 15 Oct 2024 10:46:30 +0100
matrix-synapse-py3 (1.117.0~rc1) stable; urgency=medium
* New Synapse release 1.117.0rc1.
-- Synapse Packaging team <packages@matrix.org> Tue, 08 Oct 2024 14:37:11 +0100
matrix-synapse-py3 (1.116.0) stable; urgency=medium matrix-synapse-py3 (1.116.0) stable; urgency=medium
* New Synapse release 1.116.0. * New Synapse release 1.116.0.

View File

@@ -255,8 +255,6 @@ line to `/etc/default/matrix-synapse`:
LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2 LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2
*Note*: You may need to set `PYTHONMALLOC=malloc` to ensure that `jemalloc` can accurately calculate memory usage. By default, Python uses its internal small-object allocator, which may interfere with jemalloc's ability to track memory consumption correctly. This could prevent the [cache_autotuning](../configuration/config_documentation.md#caches-and-associated-values) feature from functioning as expected, as the Python allocator may not reach the memory threshold set by `max_cache_memory_usage`, thus not triggering the cache eviction process.
This made a significant difference on Python 2.7 - it's unclear how This made a significant difference on Python 2.7 - it's unclear how
much of an improvement it provides on Python 3.x. much of an improvement it provides on Python 3.x.

View File

@@ -4370,12 +4370,6 @@ a `federation_sender_instances` map. Doing so will remove handling of this funct
the main process. Multiple workers can be added to this map, in which case the work is the main process. Multiple workers can be added to this map, in which case the work is
balanced across them. balanced across them.
The way that the load balancing works is any outbound federation request will be assigned
to a federation sender worker based on the hash of the destination server name. This
means that all requests being sent to the same destination will be processed by the same
worker instance. Multiple `federation_sender_instances` are useful if there is a federation
with multiple servers.
This configuration setting must be shared between all workers handling federation This configuration setting must be shared between all workers handling federation
sending, and if changed all federation sender workers must be stopped at the same time sending, and if changed all federation sender workers must be stopped at the same time
and then started, to ensure that all instances are running with the same config (otherwise and then started, to ensure that all instances are running with the same config (otherwise
@@ -4524,9 +4518,6 @@ This setting has the following sub-options:
* `path`: The full path to a local Unix socket file. **If this is used, `host` and * `path`: The full path to a local Unix socket file. **If this is used, `host` and
`port` are ignored.** Defaults to `/tmp/redis.sock' `port` are ignored.** Defaults to `/tmp/redis.sock'
* `password`: Optional password if configured on the Redis instance. * `password`: Optional password if configured on the Redis instance.
* `password_path`: Alternative to `password`, reading the password from an
external file. The file should be a plain text file, containing only the
password. Synapse reads the password from the given file once at startup.
* `dbid`: Optional redis dbid if needs to connect to specific redis logical db. * `dbid`: Optional redis dbid if needs to connect to specific redis logical db.
* `use_tls`: Whether to use tls connection. Defaults to false. * `use_tls`: Whether to use tls connection. Defaults to false.
* `certificate_file`: Optional path to the certificate file * `certificate_file`: Optional path to the certificate file
@@ -4540,16 +4531,13 @@ This setting has the following sub-options:
_Changed in Synapse 1.85.0: Added path option to use a local Unix socket_ _Changed in Synapse 1.85.0: Added path option to use a local Unix socket_
_Changed in Synapse 1.116.0: Added password\_path_
Example configuration: Example configuration:
```yaml ```yaml
redis: redis:
enabled: true enabled: true
host: localhost host: localhost
port: 6379 port: 6379
password_path: <path_to_the_password_file> password: <secret_password>
# OR password: <secret_password>
dbid: <dbid> dbid: <dbid>
#use_tls: True #use_tls: True
#certificate_file: <path_to_the_certificate_file> #certificate_file: <path_to_the_certificate_file>

6
poetry.lock generated
View File

@@ -1974,13 +1974,13 @@ six = ">=1.5"
[[package]] [[package]]
name = "python-multipart" name = "python-multipart"
version = "0.0.12" version = "0.0.10"
description = "A streaming multipart parser for Python" description = "A streaming multipart parser for Python"
optional = false optional = false
python-versions = ">=3.8" python-versions = ">=3.8"
files = [ files = [
{file = "python_multipart-0.0.12-py3-none-any.whl", hash = "sha256:43dcf96cf65888a9cd3423544dd0d75ac10f7aa0c3c28a175bbcd00c9ce1aebf"}, {file = "python_multipart-0.0.10-py3-none-any.whl", hash = "sha256:2b06ad9e8d50c7a8db80e3b56dab590137b323410605af2be20d62a5f1ba1dc8"},
{file = "python_multipart-0.0.12.tar.gz", hash = "sha256:045e1f98d719c1ce085ed7f7e1ef9d8ccc8c02ba02b5566d5f7521410ced58cb"}, {file = "python_multipart-0.0.10.tar.gz", hash = "sha256:46eb3c6ce6fdda5fb1a03c7e11d490e407c6930a2703fe7aef4da71c374688fa"},
] ]
[[package]] [[package]]

View File

@@ -97,7 +97,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry] [tool.poetry]
name = "matrix-synapse" name = "matrix-synapse"
version = "1.117.0" version = "1.116.0"
description = "Homeserver for the Matrix decentralised comms protocol" description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"] authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "AGPL-3.0-or-later" license = "AGPL-3.0-or-later"

View File

@@ -3,7 +3,7 @@
# #
# Copyright 2020 The Matrix.org Foundation C.I.C. # Copyright 2020 The Matrix.org Foundation C.I.C.
# Copyright 2016 OpenMarket Ltd # Copyright 2016 OpenMarket Ltd
# Copyright (C) 2023-2024 New Vector, Ltd # Copyright (C) 2023 New Vector, Ltd
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as # it under the terms of the GNU Affero General Public License as

View File

@@ -21,15 +21,10 @@
from typing import Any from typing import Any
from synapse.config._base import Config, ConfigError, read_file from synapse.config._base import Config
from synapse.types import JsonDict from synapse.types import JsonDict
from synapse.util.check_dependencies import check_requirements from synapse.util.check_dependencies import check_requirements
CONFLICTING_PASSWORD_OPTS_ERROR = """\
You have configured both `redis.password` and `redis.password_path`.
These are mutually incompatible.
"""
class RedisConfig(Config): class RedisConfig(Config):
section = "redis" section = "redis"
@@ -48,17 +43,6 @@ class RedisConfig(Config):
self.redis_path = redis_config.get("path", None) self.redis_path = redis_config.get("path", None)
self.redis_dbid = redis_config.get("dbid", None) self.redis_dbid = redis_config.get("dbid", None)
self.redis_password = redis_config.get("password") self.redis_password = redis_config.get("password")
redis_password_path = redis_config.get("password_path")
if redis_password_path:
if self.redis_password:
raise ConfigError(CONFLICTING_PASSWORD_OPTS_ERROR)
self.redis_password = read_file(
redis_password_path,
(
"redis",
"password_path",
),
).strip()
self.redis_use_tls = redis_config.get("use_tls", False) self.redis_use_tls = redis_config.get("use_tls", False)
self.redis_certificate = redis_config.get("certificate_file", None) self.redis_certificate = redis_config.get("certificate_file", None)

View File

@@ -443,8 +443,8 @@ class AdminHandler:
["m.room.member", "m.room.message"], ["m.room.member", "m.room.message"],
) )
if not event_ids: if not event_ids:
# nothing to redact in this room # there's nothing to redact
continue return TaskStatus.COMPLETE, result, None
events = await self._store.get_events_as_list(event_ids) events = await self._store.get_events_as_list(event_ids)
for event in events: for event in events:

View File

@@ -1,17 +1,3 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2024 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
import logging import logging
from typing import TYPE_CHECKING, List, Optional, Set, Tuple from typing import TYPE_CHECKING, List, Optional, Set, Tuple

View File

@@ -49,7 +49,6 @@ from synapse.types import (
Requester, Requester,
SlidingSyncStreamToken, SlidingSyncStreamToken,
StateMap, StateMap,
StrCollection,
StreamKeyType, StreamKeyType,
StreamToken, StreamToken,
) )
@@ -294,6 +293,7 @@ class SlidingSyncHandler:
# to record rooms as having updates even if there might not actually # to record rooms as having updates even if there might not actually
# be anything new for the user (e.g. due to event filters, events # be anything new for the user (e.g. due to event filters, events
# having happened after the user left, etc). # having happened after the user left, etc).
unsent_room_ids = []
if from_token: if from_token:
# The set of rooms that the client (may) care about, but aren't # The set of rooms that the client (may) care about, but aren't
# in any list range (or subscribed to). # in any list range (or subscribed to).
@@ -305,15 +305,6 @@ class SlidingSyncHandler:
# TODO: Replace this with something faster. When we land the # TODO: Replace this with something faster. When we land the
# sliding sync tables that record the most recent event # sliding sync tables that record the most recent event
# positions we can use that. # positions we can use that.
unsent_room_ids: StrCollection
if await self.store.have_finished_sliding_sync_background_jobs():
unsent_room_ids = await (
self.store.get_rooms_that_have_updates_since_sliding_sync_table(
room_ids=missing_rooms,
from_key=from_token.stream_token.room_key,
)
)
else:
missing_event_map_by_room = ( missing_event_map_by_room = (
await self.store.get_room_events_stream_for_rooms( await self.store.get_room_events_stream_for_rooms(
room_ids=missing_rooms, room_ids=missing_rooms,
@@ -1057,42 +1048,22 @@ class SlidingSyncHandler:
) )
) )
# Figure out the last bump event in the room. If the bump stamp hasn't # Figure out the last bump event in the room
# changed we omit it from the response. #
bump_stamp = None # By default, just choose the membership event position for any non-join membership
bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
always_return_bump_stamp = (
# We use the membership event position for any non-join
room_membership_for_user_at_to_token.membership != Membership.JOIN
# We didn't fetch any timeline events but we should still check for
# a bump_stamp that might be somewhere
or limited is None
# There might be a bump event somewhere before the timeline events
# that we fetched, that we didn't previously send down
or limited is True
# Always give the client some frame of reference if this is the
# first time they are seeing the room down the connection
or initial
)
# If we're joined to the room, we need to find the last bump event before the # If we're joined to the room, we need to find the last bump event before the
# `to_token` # `to_token`
if room_membership_for_user_at_to_token.membership == Membership.JOIN: if room_membership_for_user_at_to_token.membership == Membership.JOIN:
# Try and get a bump stamp # Try and get a bump stamp, if not we just fall back to the
# membership token.
new_bump_stamp = await self._get_bump_stamp( new_bump_stamp = await self._get_bump_stamp(
room_id, room_id, to_token, timeline_events
to_token,
timeline_events,
check_outside_timeline=always_return_bump_stamp,
) )
if new_bump_stamp is not None: if new_bump_stamp is not None:
bump_stamp = new_bump_stamp bump_stamp = new_bump_stamp
if bump_stamp is None and always_return_bump_stamp: if bump_stamp < 0:
# By default, just choose the membership event position for any non-join membership
bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
if bump_stamp is not None and bump_stamp < 0:
# We never want to send down negative stream orderings, as you can't # We never want to send down negative stream orderings, as you can't
# sensibly compare positive and negative stream orderings (they have # sensibly compare positive and negative stream orderings (they have
# different meanings). # different meanings).
@@ -1185,23 +1156,14 @@ class SlidingSyncHandler:
@trace @trace
async def _get_bump_stamp( async def _get_bump_stamp(
self, self, room_id: str, to_token: StreamToken, timeline: List[EventBase]
room_id: str,
to_token: StreamToken,
timeline: List[EventBase],
check_outside_timeline: bool,
) -> Optional[int]: ) -> Optional[int]:
"""Get a bump stamp for the room, if we have a bump event and it has """Get a bump stamp for the room, if we have a bump event
changed.
Args: Args:
room_id room_id
to_token: The upper bound of token to return to_token: The upper bound of token to return
timeline: The list of events we have fetched. timeline: The list of events we have fetched.
limited: If the timeline was limited.
check_outside_timeline: Whether we need to check for bump stamp for
events before the timeline if we didn't find a bump stamp in
the timeline events.
""" """
# First check the timeline events we're returning to see if one of # First check the timeline events we're returning to see if one of
@@ -1221,11 +1183,6 @@ class SlidingSyncHandler:
if new_bump_stamp > 0: if new_bump_stamp > 0:
return new_bump_stamp return new_bump_stamp
if not check_outside_timeline:
# If we are not a limited sync, then we know the bump stamp can't
# have changed.
return None
# We can quickly query for the latest bump event in the room using the # We can quickly query for the latest bump event in the room using the
# sliding sync tables. # sliding sync tables.
latest_room_bump_stamp = await self.store.get_latest_bump_stamp_for_room( latest_room_bump_stamp = await self.store.get_latest_bump_stamp_for_room(

View File

@@ -49,7 +49,10 @@ from synapse.types.handlers.sliding_sync import (
SlidingSyncConfig, SlidingSyncConfig,
SlidingSyncResult, SlidingSyncResult,
) )
from synapse.util.async_helpers import concurrently_execute from synapse.util.async_helpers import (
concurrently_execute,
gather_optional_coroutines,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from synapse.server import HomeServer from synapse.server import HomeServer
@@ -97,26 +100,26 @@ class SlidingSyncExtensionHandler:
if sync_config.extensions is None: if sync_config.extensions is None:
return SlidingSyncResult.Extensions() return SlidingSyncResult.Extensions()
to_device_response = None to_device_coro = None
if sync_config.extensions.to_device is not None: if sync_config.extensions.to_device is not None:
to_device_response = await self.get_to_device_extension_response( to_device_coro = self.get_to_device_extension_response(
sync_config=sync_config, sync_config=sync_config,
to_device_request=sync_config.extensions.to_device, to_device_request=sync_config.extensions.to_device,
to_token=to_token, to_token=to_token,
) )
e2ee_response = None e2ee_coro = None
if sync_config.extensions.e2ee is not None: if sync_config.extensions.e2ee is not None:
e2ee_response = await self.get_e2ee_extension_response( e2ee_coro = self.get_e2ee_extension_response(
sync_config=sync_config, sync_config=sync_config,
e2ee_request=sync_config.extensions.e2ee, e2ee_request=sync_config.extensions.e2ee,
to_token=to_token, to_token=to_token,
from_token=from_token, from_token=from_token,
) )
account_data_response = None account_data_coro = None
if sync_config.extensions.account_data is not None: if sync_config.extensions.account_data is not None:
account_data_response = await self.get_account_data_extension_response( account_data_coro = self.get_account_data_extension_response(
sync_config=sync_config, sync_config=sync_config,
previous_connection_state=previous_connection_state, previous_connection_state=previous_connection_state,
new_connection_state=new_connection_state, new_connection_state=new_connection_state,
@@ -127,9 +130,9 @@ class SlidingSyncExtensionHandler:
from_token=from_token, from_token=from_token,
) )
receipts_response = None receipts_coro = None
if sync_config.extensions.receipts is not None: if sync_config.extensions.receipts is not None:
receipts_response = await self.get_receipts_extension_response( receipts_coro = self.get_receipts_extension_response(
sync_config=sync_config, sync_config=sync_config,
previous_connection_state=previous_connection_state, previous_connection_state=previous_connection_state,
new_connection_state=new_connection_state, new_connection_state=new_connection_state,
@@ -141,9 +144,9 @@ class SlidingSyncExtensionHandler:
from_token=from_token, from_token=from_token,
) )
typing_response = None typing_coro = None
if sync_config.extensions.typing is not None: if sync_config.extensions.typing is not None:
typing_response = await self.get_typing_extension_response( typing_coro = self.get_typing_extension_response(
sync_config=sync_config, sync_config=sync_config,
actual_lists=actual_lists, actual_lists=actual_lists,
actual_room_ids=actual_room_ids, actual_room_ids=actual_room_ids,
@@ -153,6 +156,20 @@ class SlidingSyncExtensionHandler:
from_token=from_token, from_token=from_token,
) )
(
to_device_response,
e2ee_response,
account_data_response,
receipts_response,
typing_response,
) = await gather_optional_coroutines(
to_device_coro,
e2ee_coro,
account_data_coro,
receipts_coro,
typing_coro,
)
return SlidingSyncResult.Extensions( return SlidingSyncResult.Extensions(
to_device=to_device_response, to_device=to_device_response,
e2ee=e2ee_response, e2ee=e2ee_response,

View File

@@ -500,16 +500,6 @@ class SlidingSyncRoomLists:
# depending on the `required_state` requested (see below). # depending on the `required_state` requested (see below).
partial_state_rooms = await self.store.get_partial_rooms() partial_state_rooms = await self.store.get_partial_rooms()
# Fetch any rooms that we have not already fetched from the database.
subscription_sliding_sync_rooms = (
await self.store.get_sliding_sync_room_for_user_batch(
user_id,
sync_config.room_subscriptions.keys()
- room_membership_for_user_map.keys(),
)
)
room_membership_for_user_map.update(subscription_sliding_sync_rooms)
for ( for (
room_id, room_id,
room_subscription, room_subscription,
@@ -517,11 +507,17 @@ class SlidingSyncRoomLists:
# Check if we have a membership for the room, but didn't pull it out # Check if we have a membership for the room, but didn't pull it out
# above. This could be e.g. a leave that we don't pull out by # above. This could be e.g. a leave that we don't pull out by
# default. # default.
current_room_entry = room_membership_for_user_map.get(room_id) current_room_entry = (
await self.store.get_sliding_sync_room_for_user(
user_id, room_id
)
)
if not current_room_entry: if not current_room_entry:
# TODO: Handle rooms the user isn't in. # TODO: Handle rooms the user isn't in.
continue continue
room_membership_for_user_map[room_id] = current_room_entry
all_rooms.add(room_id) all_rooms.add(room_id)
# Take the superset of the `RoomSyncConfig` for each room. # Take the superset of the `RoomSyncConfig` for each room.

View File

@@ -1039,7 +1039,7 @@ class _MultipartParserProtocol(protocol.Protocol):
self.deferred = deferred self.deferred = deferred
self.boundary = boundary self.boundary = boundary
self.max_length = max_length self.max_length = max_length
self.parser: Optional[multipart.MultipartParser] = None self.parser = None
self.multipart_response = MultipartResponse() self.multipart_response = MultipartResponse()
self.has_redirect = False self.has_redirect = False
self.in_json = False self.in_json = False
@@ -1097,7 +1097,7 @@ class _MultipartParserProtocol(protocol.Protocol):
self.deferred.errback() self.deferred.errback()
self.file_length += end - start self.file_length += end - start
callbacks: "multipart.multipart.MultipartCallbacks" = { callbacks = {
"on_header_field": on_header_field, "on_header_field": on_header_field,
"on_header_value": on_header_value, "on_header_value": on_header_value,
"on_part_data": on_part_data, "on_part_data": on_part_data,
@@ -1113,7 +1113,7 @@ class _MultipartParserProtocol(protocol.Protocol):
self.transport.abortConnection() self.transport.abortConnection()
try: try:
self.parser.write(incoming_data) self.parser.write(incoming_data) # type: ignore[attr-defined]
except Exception as e: except Exception as e:
logger.warning(f"Exception writing to multipart parser: {e}") logger.warning(f"Exception writing to multipart parser: {e}")
self.deferred.errback() self.deferred.errback()

View File

@@ -37,6 +37,7 @@ import warnings
from types import TracebackType from types import TracebackType
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Any,
Awaitable, Awaitable,
Callable, Callable,
Optional, Optional,
@@ -850,6 +851,32 @@ def run_in_background(
return d return d
def run_coroutine_in_background(
coroutine: typing.Coroutine[Any, Any, R],
) -> "defer.Deferred[R]":
current = current_context()
d = defer.ensureDeferred(coroutine)
# The function may have reset the context before returning, so
# we need to restore it now.
ctx = set_current_context(current)
# The original context will be restored when the deferred
# completes, but there is nothing waiting for it, so it will
# get leaked into the reactor or some other function which
# wasn't expecting it. We therefore need to reset the context
# here.
#
# (If this feels asymmetric, consider it this way: we are
# effectively forking a new thread of execution. We are
# probably currently within a ``with LoggingContext()`` block,
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
d.addBoth(_set_context_cb, ctx)
return d
T = TypeVar("T") T = TypeVar("T")

View File

@@ -1,7 +1,7 @@
# #
# This file is licensed under the Affero General Public License (AGPL) version 3. # This file is licensed under the Affero General Public License (AGPL) version 3.
# #
# Copyright (C) 2023-2024 New Vector, Ltd # Copyright (C) 2023 New Vector, Ltd
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as # it under the terms of the GNU Affero General Public License as

View File

@@ -1,17 +1,3 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2024 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
import logging import logging
from typing import TYPE_CHECKING, Dict, Optional, Tuple from typing import TYPE_CHECKING, Dict, Optional, Tuple

View File

@@ -48,7 +48,7 @@ class ReplicationRemovePusherRestServlet(ReplicationEndpoint):
""" """
NAME = "remove_pusher" NAME = "add_user_account_data"
PATH_ARGS = ("user_id",) PATH_ARGS = ("user_id",)
CACHE = False CACHE = False

View File

@@ -27,7 +27,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union
import attr import attr
from synapse._pydantic_compat import StrictBool, StrictInt, StrictStr from synapse._pydantic_compat import StrictBool
from synapse.api.constants import Direction, UserTypes from synapse.api.constants import Direction, UserTypes
from synapse.api.errors import Codes, NotFoundError, SynapseError from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.http.servlet import ( from synapse.http.servlet import (
@@ -1421,39 +1421,40 @@ class RedactUser(RestServlet):
self._store = hs.get_datastores().main self._store = hs.get_datastores().main
self.admin_handler = hs.get_admin_handler() self.admin_handler = hs.get_admin_handler()
class PostBody(RequestBodyModel):
rooms: List[StrictStr]
reason: Optional[StrictStr]
limit: Optional[StrictInt]
async def on_POST( async def on_POST(
self, request: SynapseRequest, user_id: str self, request: SynapseRequest, user_id: str
) -> Tuple[int, JsonDict]: ) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request) requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester) await assert_user_is_admin(self._auth, requester)
# parse provided user id to check that it is valid body = parse_json_object_from_request(request, allow_empty_body=True)
UserID.from_string(user_id) rooms = body.get("rooms")
if rooms is None:
raise SynapseError(
HTTPStatus.BAD_REQUEST, "Must provide a value for rooms."
)
body = parse_and_validate_json_object_from_request(request, self.PostBody) reason = body.get("reason")
if reason:
if not isinstance(reason, str):
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"If a reason is provided it must be a string.",
)
limit = body.limit limit = body.get("limit")
if limit and limit <= 0: if limit:
if not isinstance(limit, int) or limit <= 0:
raise SynapseError( raise SynapseError(
HTTPStatus.BAD_REQUEST, HTTPStatus.BAD_REQUEST,
"If limit is provided it must be a non-negative integer greater than 0.", "If limit is provided it must be a non-negative integer greater than 0.",
) )
rooms = body.rooms
if not rooms: if not rooms:
current_rooms = list(await self._store.get_rooms_for_user(user_id)) rooms = await self._store.get_rooms_for_user(user_id)
banned_rooms = list(
await self._store.get_rooms_user_currently_banned_from(user_id)
)
rooms = current_rooms + banned_rooms
redact_id = await self.admin_handler.start_redact_events( redact_id = await self.admin_handler.start_redact_events(
user_id, rooms, requester.serialize(), body.reason, limit user_id, list(rooms), requester.serialize(), reason, limit
) )
return HTTPStatus.OK, {"redact_id": redact_id} return HTTPStatus.OK, {"redact_id": redact_id}

View File

@@ -1,17 +1,3 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2024 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# This module contains REST servlets to do with delayed events: /delayed_events/<paths> # This module contains REST servlets to do with delayed events: /delayed_events/<paths>
import logging import logging

View File

@@ -1010,13 +1010,11 @@ class SlidingSyncRestServlet(RestServlet):
serialized_rooms: Dict[str, JsonDict] = {} serialized_rooms: Dict[str, JsonDict] = {}
for room_id, room_result in rooms.items(): for room_id, room_result in rooms.items():
serialized_rooms[room_id] = { serialized_rooms[room_id] = {
"bump_stamp": room_result.bump_stamp,
"notification_count": room_result.notification_count, "notification_count": room_result.notification_count,
"highlight_count": room_result.highlight_count, "highlight_count": room_result.highlight_count,
} }
if room_result.bump_stamp is not None:
serialized_rooms[room_id]["bump_stamp"] = room_result.bump_stamp
if room_result.joined_count is not None: if room_result.joined_count is not None:
serialized_rooms[room_id]["joined_count"] = room_result.joined_count serialized_rooms[room_id]["joined_count"] = room_result.joined_count

View File

@@ -172,7 +172,7 @@ class VersionsRestServlet(RestServlet):
) )
), ),
# MSC4140: Delayed events # MSC4140: Delayed events
"org.matrix.msc4140": bool(self.config.server.max_event_delay_ms), "org.matrix.msc4140": True,
# MSC4151: Report room API (Client-Server API) # MSC4151: Report room API (Client-Server API)
"org.matrix.msc4151": self.config.experimental.msc4151_enabled, "org.matrix.msc4151": self.config.experimental.msc4151_enabled,
# Simplified sliding sync # Simplified sliding sync

View File

@@ -1,17 +1,3 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2024 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
import logging import logging
from typing import List, NewType, Optional, Tuple from typing import List, NewType, Optional, Tuple

View File

@@ -711,27 +711,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
return {row[0] for row in txn} return {row[0] for row in txn}
async def get_rooms_user_currently_banned_from(
self, user_id: str
) -> FrozenSet[str]:
"""Returns a set of room_ids the user is currently banned from.
If a remote user only returns rooms this server is currently
participating in.
"""
room_ids = await self.db_pool.simple_select_onecol(
table="current_state_events",
keyvalues={
"type": EventTypes.Member,
"membership": Membership.BAN,
"state_key": user_id,
},
retcol="room_id",
desc="get_rooms_user_currently_banned_from",
)
return frozenset(room_ids)
@cached(max_entries=500000, iterable=True) @cached(max_entries=500000, iterable=True)
async def get_rooms_for_user(self, user_id: str) -> FrozenSet[str]: async def get_rooms_for_user(self, user_id: str) -> FrozenSet[str]:
"""Returns a set of room_ids the user is currently joined to. """Returns a set of room_ids the user is currently joined to.
@@ -1520,57 +1499,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
"get_sliding_sync_room_for_user", get_sliding_sync_room_for_user_txn "get_sliding_sync_room_for_user", get_sliding_sync_room_for_user_txn
) )
async def get_sliding_sync_room_for_user_batch(
self, user_id: str, room_ids: StrCollection
) -> Dict[str, RoomsForUserSlidingSync]:
"""Get the sliding sync room entry for the given user and rooms."""
if not room_ids:
return {}
def get_sliding_sync_room_for_user_batch_txn(
txn: LoggingTransaction,
) -> Dict[str, RoomsForUserSlidingSync]:
clause, args = make_in_list_sql_clause(
self.database_engine, "m.room_id", room_ids
)
sql = f"""
SELECT m.room_id, m.sender, m.membership, m.membership_event_id,
r.room_version,
m.event_instance_name, m.event_stream_ordering,
m.has_known_state,
COALESCE(j.room_type, m.room_type),
COALESCE(j.is_encrypted, m.is_encrypted)
FROM sliding_sync_membership_snapshots AS m
INNER JOIN rooms AS r USING (room_id)
LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join')
WHERE m.forgotten = 0
AND {clause}
AND user_id = ?
"""
args.append(user_id)
txn.execute(sql, args)
return {
row[0]: RoomsForUserSlidingSync(
room_id=row[0],
sender=row[1],
membership=row[2],
event_id=row[3],
room_version_id=row[4],
event_pos=PersistedEventPosition(row[5], row[6]),
has_known_state=bool(row[7]),
room_type=row[8],
is_encrypted=row[9],
)
for row in txn
}
return await self.db_pool.runInteraction(
"get_sliding_sync_room_for_user_batch",
get_sliding_sync_room_for_user_batch_txn,
)
class RoomMemberBackgroundUpdateStore(SQLBaseStore): class RoomMemberBackgroundUpdateStore(SQLBaseStore):
def __init__( def __init__(

View File

@@ -751,48 +751,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
if self._events_stream_cache.has_entity_changed(room_id, from_id) if self._events_stream_cache.has_entity_changed(room_id, from_id)
} }
async def get_rooms_that_have_updates_since_sliding_sync_table(
self,
room_ids: StrCollection,
from_key: RoomStreamToken,
) -> StrCollection:
"""Return the rooms that probably have had updates since the given
token (changes that are > `from_key`)."""
# If the stream change cache is valid for the stream token, we can just
# use the result of that.
if from_key.stream >= self._events_stream_cache.get_earliest_known_position():
return self._events_stream_cache.get_entities_changed(
room_ids, from_key.stream
)
def get_rooms_that_have_updates_since_sliding_sync_table_txn(
txn: LoggingTransaction,
) -> StrCollection:
sql = """
SELECT room_id
FROM sliding_sync_joined_rooms
WHERE {clause}
AND event_stream_ordering > ?
"""
results: Set[str] = set()
for batch in batch_iter(room_ids, 1000):
clause, args = make_in_list_sql_clause(
self.database_engine, "room_id", batch
)
args.append(from_key.stream)
txn.execute(sql.format(clause=clause), args)
results.update(row[0] for row in txn)
return results
return await self.db_pool.runInteraction(
"get_rooms_that_have_updates_since_sliding_sync_table",
get_rooms_that_have_updates_since_sliding_sync_table_txn,
)
async def paginate_room_events_by_stream_ordering( async def paginate_room_events_by_stream_ordering(
self, self,
*, *,

View File

@@ -1,16 +1,3 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2024 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
CREATE TABLE delayed_events ( CREATE TABLE delayed_events (
delay_id TEXT NOT NULL, delay_id TEXT NOT NULL,
user_localpart TEXT NOT NULL, user_localpart TEXT NOT NULL,

View File

@@ -158,7 +158,6 @@ class SlidingSyncResult:
name changes to mark the room as unread and bump it to the top. For name changes to mark the room as unread and bump it to the top. For
encrypted rooms, we just have to consider any activity as a bump because we encrypted rooms, we just have to consider any activity as a bump because we
can't see the content and the client has to figure it out for themselves. can't see the content and the client has to figure it out for themselves.
This may not be included if there hasn't been a change.
joined_count: The number of users with membership of join, including the client's joined_count: The number of users with membership of join, including the client's
own user ID. (same as sync `v2 m.joined_member_count`) own user ID. (same as sync `v2 m.joined_member_count`)
invited_count: The number of users with membership of invite. (same as sync v2 invited_count: The number of users with membership of invite. (same as sync v2
@@ -194,7 +193,7 @@ class SlidingSyncResult:
limited: Optional[bool] limited: Optional[bool]
# Only optional because it won't be included for invite/knock rooms with `stripped_state` # Only optional because it won't be included for invite/knock rooms with `stripped_state`
num_live: Optional[int] num_live: Optional[int]
bump_stamp: Optional[int] bump_stamp: int
joined_count: Optional[int] joined_count: Optional[int]
invited_count: Optional[int] invited_count: Optional[int]
notification_count: int notification_count: int

View File

@@ -51,7 +51,7 @@ from typing import (
) )
import attr import attr
from typing_extensions import Concatenate, Literal, ParamSpec from typing_extensions import Concatenate, Literal, ParamSpec, Unpack
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.defer import CancelledError from twisted.internet.defer import CancelledError
@@ -61,6 +61,7 @@ from twisted.python.failure import Failure
from synapse.logging.context import ( from synapse.logging.context import (
PreserveLoggingContext, PreserveLoggingContext,
make_deferred_yieldable, make_deferred_yieldable,
run_coroutine_in_background,
run_in_background, run_in_background,
) )
from synapse.util import Clock from synapse.util import Clock
@@ -344,6 +345,7 @@ T1 = TypeVar("T1")
T2 = TypeVar("T2") T2 = TypeVar("T2")
T3 = TypeVar("T3") T3 = TypeVar("T3")
T4 = TypeVar("T4") T4 = TypeVar("T4")
T5 = TypeVar("T5")
@overload @overload
@@ -402,6 +404,98 @@ def gather_results( # type: ignore[misc]
return deferred.addCallback(tuple) return deferred.addCallback(tuple)
@overload
async def gather_optional_coroutines(
*coroutines: Unpack[Tuple[Optional[Coroutine[Any, Any, T1]]]],
) -> Tuple[Optional[T1]]: ...
@overload
async def gather_optional_coroutines(
*coroutines: Unpack[
Tuple[
Optional[Coroutine[Any, Any, T1]],
Optional[Coroutine[Any, Any, T2]],
]
],
) -> Tuple[Optional[T1], Optional[T2]]: ...
@overload
async def gather_optional_coroutines(
*coroutines: Unpack[
Tuple[
Optional[Coroutine[Any, Any, T1]],
Optional[Coroutine[Any, Any, T2]],
Optional[Coroutine[Any, Any, T3]],
]
],
) -> Tuple[Optional[T1], Optional[T2], Optional[T3]]: ...
@overload
async def gather_optional_coroutines(
*coroutines: Unpack[
Tuple[
Optional[Coroutine[Any, Any, T1]],
Optional[Coroutine[Any, Any, T2]],
Optional[Coroutine[Any, Any, T3]],
Optional[Coroutine[Any, Any, T4]],
]
],
) -> Tuple[Optional[T1], Optional[T2], Optional[T3], Optional[T4]]: ...
@overload
async def gather_optional_coroutines(
*coroutines: Unpack[
Tuple[
Optional[Coroutine[Any, Any, T1]],
Optional[Coroutine[Any, Any, T2]],
Optional[Coroutine[Any, Any, T3]],
Optional[Coroutine[Any, Any, T4]],
Optional[Coroutine[Any, Any, T5]],
]
],
) -> Tuple[Optional[T1], Optional[T2], Optional[T3], Optional[T4], Optional[T5]]: ...
async def gather_optional_coroutines(
*coroutines: Unpack[Tuple[Optional[Coroutine[Any, Any, T1]], ...]],
) -> Tuple[Optional[T1], ...]:
try:
results = await make_deferred_yieldable(
defer.gatherResults(
[
run_coroutine_in_background(coroutine)
for coroutine in coroutines
if coroutine
],
consumeErrors=True,
)
)
results_iter = iter(results)
return tuple(
next(results_iter) if coroutine else None for coroutine in coroutines
)
except defer.FirstError as dfe:
# unwrap the error from defer.gatherResults.
# The raised exception's traceback only includes func() etc if
# the 'await' happens before the exception is thrown - ie if the failure
# happens *asynchronously* - otherwise Twisted throws away the traceback as it
# could be large.
#
# We could maybe reconstruct a fake traceback from Failure.frames. Or maybe
# we could throw Twisted into the fires of Mordor.
# suppress exception chaining, because the FirstError doesn't tell us anything
# very interesting.
assert isinstance(dfe.subFailure.value, BaseException)
raise dfe.subFailure.value from None
@attr.s(slots=True, auto_attribs=True) @attr.s(slots=True, auto_attribs=True)
class _LinearizerEntry: class _LinearizerEntry:
# The number of things executing. # The number of things executing.

View File

@@ -19,23 +19,13 @@
# [This file includes modifications made by New Vector Limited] # [This file includes modifications made by New Vector Limited]
# #
# #
import tempfile
from typing import Callable
import yaml import yaml
from parameterized import parameterized
from synapse.config import ConfigError from synapse.config import ConfigError
from synapse.config._base import RootConfig
from synapse.config.homeserver import HomeServerConfig from synapse.config.homeserver import HomeServerConfig
from tests.config.utils import ConfigFileTestCase from tests.config.utils import ConfigFileTestCase
try:
import hiredis
except ImportError:
hiredis = None # type: ignore
class ConfigLoadingFileTestCase(ConfigFileTestCase): class ConfigLoadingFileTestCase(ConfigFileTestCase):
def test_load_fails_if_server_name_missing(self) -> None: def test_load_fails_if_server_name_missing(self) -> None:
@@ -126,49 +116,3 @@ class ConfigLoadingFileTestCase(ConfigFileTestCase):
self.add_lines_to_config(["trust_identity_server_for_password_resets: true"]) self.add_lines_to_config(["trust_identity_server_for_password_resets: true"])
with self.assertRaises(ConfigError): with self.assertRaises(ConfigError):
HomeServerConfig.load_config("", ["-c", self.config_file]) HomeServerConfig.load_config("", ["-c", self.config_file])
@parameterized.expand(
[
"turn_shared_secret_path: /does/not/exist",
"registration_shared_secret_path: /does/not/exist",
*["redis:\n enabled: true\n password_path: /does/not/exist"]
* (hiredis is not None),
]
)
def test_secret_files_missing(self, config_str: str) -> None:
self.generate_config()
self.add_lines_to_config(["", config_str])
with self.assertRaises(ConfigError):
HomeServerConfig.load_config("", ["-c", self.config_file])
@parameterized.expand(
[
(
"turn_shared_secret_path: {}",
lambda c: c.voip.turn_shared_secret,
),
(
"registration_shared_secret_path: {}",
lambda c: c.registration.registration_shared_secret,
),
*[
(
"redis:\n enabled: true\n password_path: {}",
lambda c: c.redis.redis_password,
)
]
* (hiredis is not None),
]
)
def test_secret_files_existing(
self, config_line: str, get_secret: Callable[[RootConfig], str]
) -> None:
self.generate_config_and_remove_lines_containing("registration_shared_secret")
with tempfile.NamedTemporaryFile(buffering=0) as secret_file:
secret_file.write(b"53C237")
self.add_lines_to_config(["", config_line.format(secret_file.name)])
config = HomeServerConfig.load_config("", ["-c", self.config_file])
self.assertEqual(get_secret(config), "53C237")

View File

@@ -380,7 +380,7 @@ class RoomMemberMasterHandlerTestCase(HomeserverTestCase):
) )
def test_forget_when_not_left(self) -> None: def test_forget_when_not_left(self) -> None:
"""Tests that a user cannot forget a room that they are still in.""" """Tests that a user cannot not forgets a room that has not left."""
self.get_failure(self.handler.forget(self.alice_ID, self.room_id), SynapseError) self.get_failure(self.handler.forget(self.alice_ID, self.room_id), SynapseError)
def test_nonlocal_room_user_action(self) -> None: def test_nonlocal_room_user_action(self) -> None:

View File

@@ -5288,26 +5288,19 @@ class UserRedactionTestCase(unittest.HomeserverTestCase):
self.assertEqual(len(matched), len(rm2_originals)) self.assertEqual(len(matched), len(rm2_originals))
def test_admin_redact_works_if_user_kicked_or_banned(self) -> None: def test_admin_redact_works_if_user_kicked_or_banned(self) -> None:
originals1 = [] originals = []
originals2 = []
for rm in [self.rm1, self.rm2, self.rm3]: for rm in [self.rm1, self.rm2, self.rm3]:
join = self.helper.join(rm, self.bad_user, tok=self.bad_user_tok) join = self.helper.join(rm, self.bad_user, tok=self.bad_user_tok)
if rm in [self.rm1, self.rm3]: originals.append(join["event_id"])
originals1.append(join["event_id"])
else:
originals2.append(join["event_id"])
for i in range(5): for i in range(5):
event = {"body": f"hello{i}", "msgtype": "m.text"} event = {"body": f"hello{i}", "msgtype": "m.text"}
res = self.helper.send_event( res = self.helper.send_event(
rm, "m.room.message", event, tok=self.bad_user_tok rm, "m.room.message", event, tok=self.bad_user_tok
) )
if rm in [self.rm1, self.rm3]: originals.append(res["event_id"])
originals1.append(res["event_id"])
else:
originals2.append(res["event_id"])
# kick user from rooms 1 and 3 # kick user from rooms 1 and 3
for r in [self.rm1, self.rm3]: for r in [self.rm1, self.rm2]:
channel = self.make_request( channel = self.make_request(
"POST", "POST",
f"/_matrix/client/r0/rooms/{r}/kick", f"/_matrix/client/r0/rooms/{r}/kick",
@@ -5337,70 +5330,32 @@ class UserRedactionTestCase(unittest.HomeserverTestCase):
failed_redactions = channel2.json_body.get("failed_redactions") failed_redactions = channel2.json_body.get("failed_redactions")
self.assertEqual(failed_redactions, {}) self.assertEqual(failed_redactions, {})
# double check # ban user
for rm in [self.rm1, self.rm3]:
filter = json.dumps({"types": [EventTypes.Redaction]})
channel3 = self.make_request( channel3 = self.make_request(
"GET",
f"rooms/{rm}/messages?filter={filter}&limit=50",
access_token=self.admin_tok,
)
self.assertEqual(channel3.code, 200)
matches = []
for event in channel3.json_body["chunk"]:
for event_id in originals1:
if (
event["type"] == "m.room.redaction"
and event["redacts"] == event_id
):
matches.append((event_id, event))
# we redacted 6 messages
self.assertEqual(len(matches), 6)
# ban user from room 2
channel4 = self.make_request(
"POST", "POST",
f"/_matrix/client/r0/rooms/{self.rm2}/ban", f"/_matrix/client/r0/rooms/{self.rm2}/ban",
content={"reason": "being a bummer", "user_id": self.bad_user}, content={"reason": "being a bummer", "user_id": self.bad_user},
access_token=self.admin_tok, access_token=self.admin_tok,
) )
self.assertEqual(channel4.code, HTTPStatus.OK, channel4.result) self.assertEqual(channel3.code, HTTPStatus.OK, channel3.result)
# make a request to ban all user's messages # redact messages in room 2
channel5 = self.make_request( channel4 = self.make_request(
"POST", "POST",
f"/_synapse/admin/v1/user/{self.bad_user}/redact", f"/_synapse/admin/v1/user/{self.bad_user}/redact",
content={"rooms": []}, content={"rooms": [self.rm2]},
access_token=self.admin_tok, access_token=self.admin_tok,
) )
self.assertEqual(channel5.code, 200) self.assertEqual(channel4.code, 200)
id2 = channel5.json_body.get("redact_id") id2 = channel1.json_body.get("redact_id")
# check that there were no failed redactions in room 2 # check that there were no failed redactions in room 2
channel6 = self.make_request( channel5 = self.make_request(
"GET", "GET",
f"/_synapse/admin/v1/user/redact_status/{id2}", f"/_synapse/admin/v1/user/redact_status/{id2}",
access_token=self.admin_tok, access_token=self.admin_tok,
) )
self.assertEqual(channel6.code, 200) self.assertEqual(channel5.code, 200)
self.assertEqual(channel6.json_body.get("status"), "complete") self.assertEqual(channel5.json_body.get("status"), "complete")
failed_redactions = channel6.json_body.get("failed_redactions") failed_redactions = channel5.json_body.get("failed_redactions")
self.assertEqual(failed_redactions, {}) self.assertEqual(failed_redactions, {})
# double check messages in room 2 were redacted
filter = json.dumps({"types": [EventTypes.Redaction]})
channel7 = self.make_request(
"GET",
f"rooms/{self.rm2}/messages?filter={filter}&limit=50",
access_token=self.admin_tok,
)
self.assertEqual(channel7.code, 200)
matches = []
for event in channel7.json_body["chunk"]:
for event_id in originals2:
if event["type"] == "m.room.redaction" and event["redacts"] == event_id:
matches.append((event_id, event))
# we redacted 6 messages
self.assertEqual(len(matches), 6)

View File

@@ -1096,92 +1096,6 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
self.assertGreater(response_body["rooms"][room_id]["bump_stamp"], 0) self.assertGreater(response_body["rooms"][room_id]["bump_stamp"], 0)
def test_rooms_bump_stamp_no_change_incremental(self) -> None:
"""Test that the bump stamp is omitted if there has been no change"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(
user1_id,
tok=user1_tok,
)
# Make the Sliding Sync request
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 100,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# Initial sync so we expect to see a bump stamp
self.assertIn("bump_stamp", response_body["rooms"][room_id1])
# Send an event that is not in the bump events list
self.helper.send_event(
room_id1, type="org.matrix.test", content={}, tok=user1_tok
)
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
# There hasn't been a change to the bump stamps, so we ignore it
self.assertNotIn("bump_stamp", response_body["rooms"][room_id1])
def test_rooms_bump_stamp_change_incremental(self) -> None:
"""Test that the bump stamp is included if there has been a change, even
if its not in the timeline"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(
user1_id,
tok=user1_tok,
)
# Make the Sliding Sync request
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 2,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# Initial sync so we expect to see a bump stamp
self.assertIn("bump_stamp", response_body["rooms"][room_id1])
first_bump_stamp = response_body["rooms"][room_id1]["bump_stamp"]
# Send a bump event at the start.
self.helper.send(room_id1, "test", tok=user1_tok)
# Send events that are not in the bump events list to fill the timeline
for _ in range(5):
self.helper.send_event(
room_id1, type="org.matrix.test", content={}, tok=user1_tok
)
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
# There was a bump event in the timeline gap, so we should see the bump
# stamp be updated.
self.assertIn("bump_stamp", response_body["rooms"][room_id1])
second_bump_stamp = response_body["rooms"][room_id1]["bump_stamp"]
self.assertGreater(second_bump_stamp, first_bump_stamp)
def test_rooms_bump_stamp_invites(self) -> None: def test_rooms_bump_stamp_invites(self) -> None:
""" """
Test that `bump_stamp` is present and points to the membership event, Test that `bump_stamp` is present and points to the membership event,

View File

@@ -1,17 +1,3 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2024 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
"""Tests REST events for /delayed_events paths.""" """Tests REST events for /delayed_events paths."""
from http import HTTPStatus from http import HTTPStatus
@@ -22,12 +8,11 @@ from parameterized import parameterized
from twisted.test.proto_helpers import MemoryReactor from twisted.test.proto_helpers import MemoryReactor
from synapse.api.errors import Codes from synapse.api.errors import Codes
from synapse.rest.client import delayed_events, room, versions from synapse.rest.client import delayed_events, room
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.types import JsonDict from synapse.types import JsonDict
from synapse.util import Clock from synapse.util import Clock
from tests import unittest
from tests.unittest import HomeserverTestCase from tests.unittest import HomeserverTestCase
PATH_PREFIX = "/_matrix/client/unstable/org.matrix.msc4140/delayed_events" PATH_PREFIX = "/_matrix/client/unstable/org.matrix.msc4140/delayed_events"
@@ -36,21 +21,6 @@ _HS_NAME = "red"
_EVENT_TYPE = "com.example.test" _EVENT_TYPE = "com.example.test"
class DelayedEventsUnstableSupportTestCase(HomeserverTestCase):
servlets = [versions.register_servlets]
def test_false_by_default(self) -> None:
channel = self.make_request("GET", "/_matrix/client/versions")
self.assertEqual(channel.code, 200, channel.result)
self.assertFalse(channel.json_body["unstable_features"]["org.matrix.msc4140"])
@unittest.override_config({"max_event_delay_duration": "24h"})
def test_true_if_enabled(self) -> None:
channel = self.make_request("GET", "/_matrix/client/versions")
self.assertEqual(channel.code, 200, channel.result)
self.assertTrue(channel.json_body["unstable_features"]["org.matrix.msc4140"])
class DelayedEventsTestCase(HomeserverTestCase): class DelayedEventsTestCase(HomeserverTestCase):
"""Tests getting and managing delayed events.""" """Tests getting and managing delayed events."""

View File

@@ -4,7 +4,7 @@
# Copyright 2019 The Matrix.org Foundation C.I.C. # Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright 2017 Vector Creations Ltd # Copyright 2017 Vector Creations Ltd
# Copyright 2014-2016 OpenMarket Ltd # Copyright 2014-2016 OpenMarket Ltd
# Copyright (C) 2023-2024 New Vector, Ltd # Copyright (C) 2023 New Vector, Ltd
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as # it under the terms of the GNU Affero General Public License as