mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-07 01:20:16 +00:00
Compare commits
23 Commits
dmr/help-e
...
dmr/rate-l
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
80e2e0b4f2 | ||
|
|
5cf6700c2e | ||
|
|
fda6252233 | ||
|
|
47acf465ef | ||
|
|
8377172c4c | ||
|
|
81eb4ab86a | ||
|
|
e16294ef3b | ||
|
|
9d9253109b | ||
|
|
4da8f29ff6 | ||
|
|
dcb16831e8 | ||
|
|
121590a0c9 | ||
|
|
240e32f264 | ||
|
|
7a14b94698 | ||
|
|
0bb4122726 | ||
|
|
6b47e82ca2 | ||
|
|
4230112526 | ||
|
|
ae788ca796 | ||
|
|
77de15927a | ||
|
|
9d4cdae33a | ||
|
|
c594ab774b | ||
|
|
c2e3025b33 | ||
|
|
453f621d23 | ||
|
|
bd873e6571 |
1
changelog.d/13169.feature
Normal file
1
changelog.d/13169.feature
Normal file
@@ -0,0 +1 @@
|
||||
Add per-room rate limiting for room joins. For each room, Synapse now monitors the rate of join events in that room and throttles additional joins if that rate grows too large.
|
||||
@@ -67,6 +67,10 @@ rc_joins:
|
||||
per_second: 9999
|
||||
burst_count: 9999
|
||||
|
||||
rc_joins_per_room:
|
||||
per_second: 9999
|
||||
burst_count: 9999
|
||||
|
||||
rc_3pid_validation:
|
||||
per_second: 1000
|
||||
burst_count: 1000
|
||||
|
||||
@@ -89,6 +89,18 @@ process, for example:
|
||||
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
||||
```
|
||||
|
||||
# Upgrading to v1.63.0
|
||||
|
||||
## Changes to the event replication streams
|
||||
|
||||
Synapse now includes a flag indicating if an event is an outlier when
|
||||
replicating it to other workers. This is a forwards- and backwards-incompatible
|
||||
change: v1.62 and workers cannot process events replicated by v1.63 workers, and
|
||||
vice versa.
|
||||
|
||||
Once all workers are upgraded to v1.63 (or downgraded to v1.62), event
|
||||
replication will resume as normal.
|
||||
|
||||
# Upgrading to v1.62.0
|
||||
|
||||
## New signatures for spam checker callbacks
|
||||
|
||||
@@ -1379,6 +1379,25 @@ rc_joins:
|
||||
per_second: 0.03
|
||||
burst_count: 12
|
||||
```
|
||||
---
|
||||
### `rc_joins_per_room`
|
||||
|
||||
This option allows admins to ratelimit joins to a room based on the number of recent
|
||||
joins (local or remote) to that room. It is intended to mitigate mass-join spam
|
||||
waves which target multiple homeservers.
|
||||
|
||||
By default, one join is permitted to a room every second, with an accumulating
|
||||
buffer of up to ten instantaneous joins.
|
||||
|
||||
Example configuration (default values):
|
||||
```yaml
|
||||
rc_joins_per_room:
|
||||
per_second: 1
|
||||
burst_count: 10
|
||||
```
|
||||
|
||||
_Added in Synapse 1.63.0._
|
||||
|
||||
---
|
||||
### `rc_3pid_validation`
|
||||
|
||||
|
||||
@@ -27,6 +27,33 @@ class Ratelimiter:
|
||||
"""
|
||||
Ratelimit actions marked by arbitrary keys.
|
||||
|
||||
(Note that the source code speaks of "actions" and "burst_count" rather than
|
||||
"tokens" and a "bucket_size".)
|
||||
|
||||
This is a "leaky bucket as a meter". For each key to be tracked there is a bucket
|
||||
containing some number 0 <= T <= `burst_count` of tokens corresponding to previously
|
||||
permitted requests for that key. Each bucket starts empty, and gradually leaks
|
||||
tokens at a rate of `rate_hz`.
|
||||
|
||||
Upon an incoming request, we must determine:
|
||||
- the key that this request falls under (which bucket to inspect), and
|
||||
- the cost C of this request in tokens.
|
||||
Then, if there is room in the bucket for C tokens (T + C <= `burst_count`),
|
||||
the request is permitted and `cost` tokens are added to the bucket.
|
||||
Otherwise the request is denied, and the bucket continues to hold T tokens.
|
||||
|
||||
This means that the limiter enforces an average request frequency of `rate_hz`,
|
||||
while accumulating a buffer of up to `burst_count` requests which can be consumed
|
||||
instantaneously.
|
||||
|
||||
The tricky bit is the leaking. We do not want to have a periodic process which
|
||||
leaks every bucket! Instead, we track
|
||||
- the time point when the bucket was last completely empty, and
|
||||
- how many tokens have added to the bucket permitted since then.
|
||||
Then for each incoming request, we can calculate how many tokens have leaked
|
||||
since this time point, and use that to decide if we should accept or reject the
|
||||
request.
|
||||
|
||||
Args:
|
||||
clock: A homeserver clock, for retrieving the current time
|
||||
rate_hz: The long term number of actions that can be performed in a second.
|
||||
@@ -41,14 +68,36 @@ class Ratelimiter:
|
||||
self.burst_count = burst_count
|
||||
self.store = store
|
||||
|
||||
# A ordered dictionary keeping track of actions, when they were last
|
||||
# performed and how often. Each entry is a mapping from a key of arbitrary type
|
||||
# to a tuple representing:
|
||||
# * How many times an action has occurred since a point in time
|
||||
# * The point in time
|
||||
# * The rate_hz of this particular entry. This can vary per request
|
||||
# An ordered dictionary representing the token buckets tracked by this rate
|
||||
# limiter. Each entry maps a key of arbitrary type to a tuple representing:
|
||||
# * The number of tokens currently in the bucket,
|
||||
# * The time point when the bucket was last completely empty, and
|
||||
# * The rate_hz (leak rate) of this particular bucket.
|
||||
self.actions: OrderedDict[Hashable, Tuple[float, float, float]] = OrderedDict()
|
||||
|
||||
def _get_key(
|
||||
self, requester: Optional[Requester], key: Optional[Hashable]
|
||||
) -> Hashable:
|
||||
"""Use the requester's MXID as a fallback key if no key is provided.
|
||||
|
||||
Pulled out so that `can_do_action` and `record_action` are consistent.
|
||||
"""
|
||||
if key is None:
|
||||
if not requester:
|
||||
raise ValueError("Must supply at least one of `requester` or `key`")
|
||||
|
||||
key = requester.user.to_string()
|
||||
return key
|
||||
|
||||
def _get_action_counts(
|
||||
self, key: Hashable, time_now_s: float
|
||||
) -> Tuple[float, float, float]:
|
||||
"""Retrieve the action counts, with a fallback representing an empty bucket.
|
||||
|
||||
Pulled out so that `can_do_action` and `record_action` are consistent.
|
||||
"""
|
||||
return self.actions.get(key, (0.0, time_now_s, 0.0))
|
||||
|
||||
async def can_do_action(
|
||||
self,
|
||||
requester: Optional[Requester],
|
||||
@@ -88,11 +137,7 @@ class Ratelimiter:
|
||||
* The reactor timestamp for when the action can be performed next.
|
||||
-1 if rate_hz is less than or equal to zero
|
||||
"""
|
||||
if key is None:
|
||||
if not requester:
|
||||
raise ValueError("Must supply at least one of `requester` or `key`")
|
||||
|
||||
key = requester.user.to_string()
|
||||
key = self._get_key(requester, key)
|
||||
|
||||
if requester:
|
||||
# Disable rate limiting of users belonging to any AS that is configured
|
||||
@@ -121,7 +166,7 @@ class Ratelimiter:
|
||||
self._prune_message_counts(time_now_s)
|
||||
|
||||
# Check if there is an existing count entry for this key
|
||||
action_count, time_start, _ = self.actions.get(key, (0.0, time_now_s, 0.0))
|
||||
action_count, time_start, _ = self._get_action_counts(key, time_now_s)
|
||||
|
||||
# Check whether performing another action is allowed
|
||||
time_delta = time_now_s - time_start
|
||||
@@ -164,6 +209,37 @@ class Ratelimiter:
|
||||
|
||||
return allowed, time_allowed
|
||||
|
||||
def record_action(
|
||||
self,
|
||||
requester: Optional[Requester],
|
||||
key: Optional[Hashable] = None,
|
||||
n_actions: int = 1,
|
||||
_time_now_s: Optional[float] = None,
|
||||
) -> None:
|
||||
"""Record that an action(s) took place, even if they violate the rate limit.
|
||||
|
||||
This is useful for tracking the frequency of events that happen across
|
||||
federation which we still want to impose local rate limits on. For instance, if
|
||||
we are alice.com monitoring a particular room, we cannot prevent bob.com
|
||||
from joining users to that room. However, we can track the number of recent
|
||||
joins in the room and refuse to serve new joins ourselves if there have been too
|
||||
many in the room across both homeservers.
|
||||
|
||||
Args:
|
||||
requester: The requester that is doing the action, if any.
|
||||
key: An arbitrary key used to classify an action. Defaults to the
|
||||
requester's user ID.
|
||||
n_actions: The number of times the user wants to do this action. If the user
|
||||
cannot do all of the actions, the user's action count is not incremented
|
||||
at all.
|
||||
_time_now_s: The current time. Optional, defaults to the current time according
|
||||
to self.clock. Only used by tests.
|
||||
"""
|
||||
key = self._get_key(requester, key)
|
||||
time_now_s = _time_now_s if _time_now_s is not None else self.clock.time()
|
||||
action_count, time_start, rate_hz = self._get_action_counts(key, time_now_s)
|
||||
self.actions[key] = (action_count + n_actions, time_start, rate_hz)
|
||||
|
||||
def _prune_message_counts(self, time_now_s: float) -> None:
|
||||
"""Remove message count entries that have not exceeded their defined
|
||||
rate_hz limit
|
||||
|
||||
@@ -112,6 +112,13 @@ class RatelimitConfig(Config):
|
||||
defaults={"per_second": 0.01, "burst_count": 10},
|
||||
)
|
||||
|
||||
# Track the rate of joins to a given room. If there are too many, temporarily
|
||||
# prevent local joins and remote joins via this server.
|
||||
self.rc_joins_per_room = RateLimitConfig(
|
||||
config.get("rc_joins_per_room", {}),
|
||||
defaults={"per_second": 1, "burst_count": 10},
|
||||
)
|
||||
|
||||
# Ratelimit cross-user key requests:
|
||||
# * For local requests this is keyed by the sending device.
|
||||
# * For requests received over federation this is keyed by the origin.
|
||||
|
||||
@@ -117,6 +117,7 @@ class FederationServer(FederationBase):
|
||||
self._federation_event_handler = hs.get_federation_event_handler()
|
||||
self.state = hs.get_state_handler()
|
||||
self._event_auth_handler = hs.get_event_auth_handler()
|
||||
self._room_member_handler = hs.get_room_member_handler()
|
||||
|
||||
self._state_storage_controller = hs.get_storage_controllers().state
|
||||
|
||||
@@ -620,6 +621,15 @@ class FederationServer(FederationBase):
|
||||
)
|
||||
raise IncompatibleRoomVersionError(room_version=room_version)
|
||||
|
||||
# Refuse the request if that room has seen too many joins recently.
|
||||
# This is in addition to the HS-level rate limiting applied by
|
||||
# BaseFederationServlet.
|
||||
# type-ignore: mypy doesn't seem able to deduce the type of the limiter(!?)
|
||||
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
|
||||
requester=None,
|
||||
key=room_id,
|
||||
update=False,
|
||||
)
|
||||
pdu = await self.handler.on_make_join_request(origin, room_id, user_id)
|
||||
return {"event": pdu.get_templated_pdu_json(), "room_version": room_version}
|
||||
|
||||
@@ -654,6 +664,12 @@ class FederationServer(FederationBase):
|
||||
room_id: str,
|
||||
caller_supports_partial_state: bool = False,
|
||||
) -> Dict[str, Any]:
|
||||
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
|
||||
requester=None,
|
||||
key=room_id,
|
||||
update=False,
|
||||
)
|
||||
|
||||
event, context = await self._on_send_membership_event(
|
||||
origin, content, Membership.JOIN, room_id
|
||||
)
|
||||
|
||||
@@ -2063,6 +2063,10 @@ class FederationEventHandler:
|
||||
event, event_pos, max_stream_token, extra_users=extra_users
|
||||
)
|
||||
|
||||
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
|
||||
# TODO retrieve the previous state, and exclude join -> join transitions
|
||||
self._notifier.notify_user_joined_room(event.event_id, event.room_id)
|
||||
|
||||
def _sanity_check_event(self, ev: EventBase) -> None:
|
||||
"""
|
||||
Do some early sanity checks of a received event
|
||||
|
||||
@@ -461,6 +461,7 @@ class EventCreationHandler:
|
||||
)
|
||||
self._events_shard_config = self.config.worker.events_shard_config
|
||||
self._instance_name = hs.get_instance_name()
|
||||
self._notifier = hs.get_notifier()
|
||||
|
||||
self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state
|
||||
|
||||
@@ -1515,6 +1516,16 @@ class EventCreationHandler:
|
||||
requester, is_admin_redaction=is_admin_redaction
|
||||
)
|
||||
|
||||
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
|
||||
(
|
||||
current_membership,
|
||||
_,
|
||||
) = await self.store.get_local_current_membership_for_user_in_room(
|
||||
event.state_key, event.room_id
|
||||
)
|
||||
if current_membership != Membership.JOIN:
|
||||
self._notifier.notify_user_joined_room(event.event_id, event.room_id)
|
||||
|
||||
await self._maybe_kick_guest_users(event, context)
|
||||
|
||||
if event.type == EventTypes.CanonicalAlias:
|
||||
|
||||
@@ -94,12 +94,29 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
|
||||
burst_count=hs.config.ratelimiting.rc_joins_local.burst_count,
|
||||
)
|
||||
# Tracks joins from local users to rooms this server isn't a member of.
|
||||
# I.e. joins this server makes by requesting /make_join /send_join from
|
||||
# another server.
|
||||
self._join_rate_limiter_remote = Ratelimiter(
|
||||
store=self.store,
|
||||
clock=self.clock,
|
||||
rate_hz=hs.config.ratelimiting.rc_joins_remote.per_second,
|
||||
burst_count=hs.config.ratelimiting.rc_joins_remote.burst_count,
|
||||
)
|
||||
# TODO: find a better place to keep this Ratelimiter.
|
||||
# It needs to be
|
||||
# - written to by event persistence code
|
||||
# - written to by something which can snoop on replication streams
|
||||
# - read by the RoomMemberHandler to rate limit joins from local users
|
||||
# - read by the FederationServer to rate limit make_joins and send_joins from
|
||||
# other homeservers
|
||||
# I wonder if a homeserver-wide collection of rate limiters might be cleaner?
|
||||
self._join_rate_per_room_limiter = Ratelimiter(
|
||||
store=self.store,
|
||||
clock=self.clock,
|
||||
rate_hz=hs.config.ratelimiting.rc_joins_per_room.per_second,
|
||||
burst_count=hs.config.ratelimiting.rc_joins_per_room.burst_count,
|
||||
)
|
||||
|
||||
self._invites_per_room_limiter = Ratelimiter(
|
||||
store=self.store,
|
||||
@@ -122,6 +139,18 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
)
|
||||
|
||||
self.request_ratelimiter = hs.get_request_ratelimiter()
|
||||
hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room)
|
||||
|
||||
def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
|
||||
"""Notify the rate limiter that a room join has occurred.
|
||||
|
||||
Use this to inform the RoomMemberHandler about joins that have either
|
||||
- taken place on another homeserver, or
|
||||
- on another worker in this homeserver.
|
||||
Joins actioned by this worker should use the usual `ratelimit` method, which
|
||||
checks the limit and increments the counter in one go.
|
||||
"""
|
||||
self._join_rate_per_room_limiter.record_action(requester=None, key=room_id)
|
||||
|
||||
@abc.abstractmethod
|
||||
async def _remote_join(
|
||||
@@ -375,6 +404,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
# up blocking profile updates.
|
||||
if newly_joined and ratelimit:
|
||||
await self._join_rate_limiter_local.ratelimit(requester)
|
||||
await self._join_rate_per_room_limiter.ratelimit(
|
||||
requester, key=room_id, update=False
|
||||
)
|
||||
|
||||
result_event = await self.event_creation_handler.handle_new_client_event(
|
||||
requester,
|
||||
@@ -823,6 +855,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
await self._join_rate_limiter_remote.ratelimit(
|
||||
requester,
|
||||
)
|
||||
await self._join_rate_per_room_limiter.ratelimit(
|
||||
requester,
|
||||
key=room_id,
|
||||
update=False,
|
||||
)
|
||||
|
||||
inviter = await self._get_inviter(target.to_string(), room_id)
|
||||
if inviter and not self.hs.is_mine(inviter):
|
||||
|
||||
@@ -228,6 +228,7 @@ class Notifier:
|
||||
|
||||
# Called when there are new things to stream over replication
|
||||
self.replication_callbacks: List[Callable[[], None]] = []
|
||||
self._new_join_in_room_callbacks: List[Callable[[str, str], None]] = []
|
||||
|
||||
self._federation_client = hs.get_federation_http_client()
|
||||
|
||||
@@ -280,6 +281,19 @@ class Notifier:
|
||||
"""
|
||||
self.replication_callbacks.append(cb)
|
||||
|
||||
def add_new_join_in_room_callback(self, cb: Callable[[str, str], None]) -> None:
|
||||
"""Add a callback that will be called when a user joins a room.
|
||||
|
||||
This only fires on genuine membership changes, e.g. "invite" -> "join".
|
||||
Membership transitions like "join" -> "join" (for e.g. displayname changes) do
|
||||
not trigger the callback.
|
||||
|
||||
When called, the callback receives two arguments: the event ID and the room ID.
|
||||
It should *not* return a Deferred - if it needs to do any asynchronous work, a
|
||||
background thread should be started and wrapped with run_as_background_process.
|
||||
"""
|
||||
self._new_join_in_room_callbacks.append(cb)
|
||||
|
||||
async def on_new_room_event(
|
||||
self,
|
||||
event: EventBase,
|
||||
@@ -723,6 +737,10 @@ class Notifier:
|
||||
for cb in self.replication_callbacks:
|
||||
cb()
|
||||
|
||||
def notify_user_joined_room(self, event_id: str, room_id: str) -> None:
|
||||
for cb in self._new_join_in_room_callbacks:
|
||||
cb(event_id, room_id)
|
||||
|
||||
def notify_remote_server_up(self, server: str) -> None:
|
||||
"""Notify any replication that a remote server has come back up"""
|
||||
# We call federation_sender directly rather than registering as a
|
||||
|
||||
@@ -21,7 +21,7 @@ from twisted.internet.interfaces import IAddress, IConnector
|
||||
from twisted.internet.protocol import ReconnectingClientFactory
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from synapse.api.constants import EventTypes, ReceiptTypes
|
||||
from synapse.api.constants import EventTypes, Membership, ReceiptTypes
|
||||
from synapse.federation import send_queue
|
||||
from synapse.federation.sender import FederationSender
|
||||
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
|
||||
@@ -219,6 +219,21 @@ class ReplicationDataHandler:
|
||||
membership=row.data.membership,
|
||||
)
|
||||
|
||||
# If this event is a join, make a note of it so we have an accurate
|
||||
# cross-worker room rate limit.
|
||||
# TODO: Erik said we should exclude rows that came from ex_outliers
|
||||
# here, but I don't see how we can determine that. I guess we could
|
||||
# add a flag to row.data?
|
||||
if (
|
||||
row.data.type == EventTypes.Member
|
||||
and row.data.membership == Membership.JOIN
|
||||
and not row.data.outlier
|
||||
):
|
||||
# TODO retrieve the previous state, and exclude join -> join transitions
|
||||
self.notifier.notify_user_joined_room(
|
||||
row.data.event_id, row.data.room_id
|
||||
)
|
||||
|
||||
await self._presence_handler.process_replication_rows(
|
||||
stream_name, instance_name, token, rows
|
||||
)
|
||||
|
||||
@@ -98,6 +98,7 @@ class EventsStreamEventRow(BaseEventsStreamRow):
|
||||
relates_to: Optional[str]
|
||||
membership: Optional[str]
|
||||
rejected: bool
|
||||
outlier: bool
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
|
||||
@@ -1465,7 +1465,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
async def get_all_new_forward_event_rows(
|
||||
self, instance_name: str, last_id: int, current_id: int, limit: int
|
||||
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
|
||||
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
|
||||
"""Returns new events, for the Events replication stream
|
||||
|
||||
Args:
|
||||
@@ -1481,10 +1481,11 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
def get_all_new_forward_event_rows(
|
||||
txn: LoggingTransaction,
|
||||
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
|
||||
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
|
||||
sql = (
|
||||
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
|
||||
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL,"
|
||||
" e.outlier"
|
||||
" FROM events AS e"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events AS se USING (event_id)"
|
||||
@@ -1498,7 +1499,8 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
)
|
||||
txn.execute(sql, (last_id, current_id, instance_name, limit))
|
||||
return cast(
|
||||
List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall()
|
||||
List[Tuple[int, str, str, str, str, str, str, str, bool, bool]],
|
||||
txn.fetchall(),
|
||||
)
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
@@ -1507,7 +1509,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
async def get_ex_outlier_stream_rows(
|
||||
self, instance_name: str, last_id: int, current_id: int
|
||||
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
|
||||
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
|
||||
"""Returns de-outliered events, for the Events replication stream
|
||||
|
||||
Args:
|
||||
@@ -1522,11 +1524,14 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
def get_ex_outlier_stream_rows_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
|
||||
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
|
||||
sql = (
|
||||
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
|
||||
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL,"
|
||||
" e.outlier"
|
||||
" FROM events AS e"
|
||||
# NB: the next line (inner join) is what makes this query different from
|
||||
# get_all_new_forward_event_rows.
|
||||
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events AS se USING (event_id)"
|
||||
@@ -1541,7 +1546,8 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
txn.execute(sql, (last_id, current_id, instance_name))
|
||||
return cast(
|
||||
List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall()
|
||||
List[Tuple[int, str, str, str, str, str, str, str, bool, bool]],
|
||||
txn.fetchall(),
|
||||
)
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
|
||||
@@ -314,3 +314,77 @@ class TestRatelimiter(unittest.HomeserverTestCase):
|
||||
|
||||
# Check that we get rate limited after using that token.
|
||||
self.assertFalse(consume_at(11.1))
|
||||
|
||||
def test_record_action_which_doesnt_fill_bucket(self) -> None:
|
||||
limiter = Ratelimiter(
|
||||
store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=3
|
||||
)
|
||||
|
||||
# Observe two actions, leaving room in the bucket for one more.
|
||||
limiter.record_action(requester=None, key="a", n_actions=2, _time_now_s=0.0)
|
||||
|
||||
# We should be able to take a new action now.
|
||||
success, _ = self.get_success_or_raise(
|
||||
limiter.can_do_action(requester=None, key="a", _time_now_s=0.0)
|
||||
)
|
||||
self.assertTrue(success)
|
||||
|
||||
# ... but not two.
|
||||
success, _ = self.get_success_or_raise(
|
||||
limiter.can_do_action(requester=None, key="a", _time_now_s=0.0)
|
||||
)
|
||||
self.assertFalse(success)
|
||||
|
||||
def test_record_action_which_fills_bucket(self) -> None:
|
||||
limiter = Ratelimiter(
|
||||
store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=3
|
||||
)
|
||||
|
||||
# Observe three actions, filling up the bucket.
|
||||
limiter.record_action(requester=None, key="a", n_actions=3, _time_now_s=0.0)
|
||||
|
||||
# We should be unable to take a new action now.
|
||||
success, _ = self.get_success_or_raise(
|
||||
limiter.can_do_action(requester=None, key="a", _time_now_s=0.0)
|
||||
)
|
||||
self.assertFalse(success)
|
||||
|
||||
# If we wait 10 seconds to leak a token, we should be able to take one action...
|
||||
success, _ = self.get_success_or_raise(
|
||||
limiter.can_do_action(requester=None, key="a", _time_now_s=10.0)
|
||||
)
|
||||
self.assertTrue(success)
|
||||
|
||||
# ... but not two.
|
||||
success, _ = self.get_success_or_raise(
|
||||
limiter.can_do_action(requester=None, key="a", _time_now_s=10.0)
|
||||
)
|
||||
self.assertFalse(success)
|
||||
|
||||
def test_record_action_which_overfills_bucket(self) -> None:
|
||||
limiter = Ratelimiter(
|
||||
store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=3
|
||||
)
|
||||
|
||||
# Observe four actions, exceeding the bucket.
|
||||
limiter.record_action(requester=None, key="a", n_actions=4, _time_now_s=0.0)
|
||||
|
||||
# We should be prevented from taking a new action now.
|
||||
success, _ = self.get_success_or_raise(
|
||||
limiter.can_do_action(requester=None, key="a", _time_now_s=0.0)
|
||||
)
|
||||
self.assertFalse(success)
|
||||
|
||||
# If we wait 10 seconds to leak a token, we should be unable to take an action
|
||||
# because the bucket is still full.
|
||||
success, _ = self.get_success_or_raise(
|
||||
limiter.can_do_action(requester=None, key="a", _time_now_s=10.0)
|
||||
)
|
||||
self.assertFalse(success)
|
||||
|
||||
# But after another 10 seconds we leak a second token, giving us room for
|
||||
# action.
|
||||
success, _ = self.get_success_or_raise(
|
||||
limiter.can_do_action(requester=None, key="a", _time_now_s=20.0)
|
||||
)
|
||||
self.assertTrue(success)
|
||||
|
||||
@@ -45,7 +45,7 @@ class FederationClientTest(FederatingHomeserverTestCase):
|
||||
# mock up some events to use in the response.
|
||||
# In real life, these would have things in `prev_events` and `auth_events`, but that's
|
||||
# a bit annoying to mock up, and the code under test doesn't care, so we don't bother.
|
||||
create_event_dict = self.add_hashes_and_signatures(
|
||||
create_event_dict = self.add_hashes_and_signatures_from_other_server(
|
||||
{
|
||||
"room_id": test_room_id,
|
||||
"type": "m.room.create",
|
||||
@@ -57,7 +57,7 @@ class FederationClientTest(FederatingHomeserverTestCase):
|
||||
"origin_server_ts": 500,
|
||||
}
|
||||
)
|
||||
member_event_dict = self.add_hashes_and_signatures(
|
||||
member_event_dict = self.add_hashes_and_signatures_from_other_server(
|
||||
{
|
||||
"room_id": test_room_id,
|
||||
"type": "m.room.member",
|
||||
@@ -69,7 +69,7 @@ class FederationClientTest(FederatingHomeserverTestCase):
|
||||
"origin_server_ts": 600,
|
||||
}
|
||||
)
|
||||
pl_event_dict = self.add_hashes_and_signatures(
|
||||
pl_event_dict = self.add_hashes_and_signatures_from_other_server(
|
||||
{
|
||||
"room_id": test_room_id,
|
||||
"type": "m.room.power_levels",
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from http import HTTPStatus
|
||||
|
||||
from parameterized import parameterized
|
||||
|
||||
@@ -20,7 +21,6 @@ from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
from synapse.config.server import DEFAULT_ROOM_VERSION
|
||||
from synapse.crypto.event_signing import add_hashes_and_signatures
|
||||
from synapse.events import make_event_from_dict
|
||||
from synapse.federation.federation_server import server_matches_acl_event
|
||||
from synapse.rest import admin
|
||||
@@ -148,7 +148,7 @@ class SendJoinFederationTests(unittest.FederatingHomeserverTestCase):
|
||||
tok2 = self.login("fozzie", "bear")
|
||||
self.helper.join(self._room_id, second_member_user_id, tok=tok2)
|
||||
|
||||
def _make_join(self, user_id) -> JsonDict:
|
||||
def _make_join(self, user_id: str) -> JsonDict:
|
||||
channel = self.make_signed_federation_request(
|
||||
"GET",
|
||||
f"/_matrix/federation/v1/make_join/{self._room_id}/{user_id}"
|
||||
@@ -163,11 +163,9 @@ class SendJoinFederationTests(unittest.FederatingHomeserverTestCase):
|
||||
join_result = self._make_join(joining_user)
|
||||
|
||||
join_event_dict = join_result["event"]
|
||||
add_hashes_and_signatures(
|
||||
KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION],
|
||||
self.add_hashes_and_signatures_from_other_server(
|
||||
join_event_dict,
|
||||
signature_name=self.OTHER_SERVER_NAME,
|
||||
signing_key=self.OTHER_SERVER_SIGNATURE_KEY,
|
||||
KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION],
|
||||
)
|
||||
channel = self.make_signed_federation_request(
|
||||
"PUT",
|
||||
@@ -220,11 +218,9 @@ class SendJoinFederationTests(unittest.FederatingHomeserverTestCase):
|
||||
join_result = self._make_join(joining_user)
|
||||
|
||||
join_event_dict = join_result["event"]
|
||||
add_hashes_and_signatures(
|
||||
KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION],
|
||||
self.add_hashes_and_signatures_from_other_server(
|
||||
join_event_dict,
|
||||
signature_name=self.OTHER_SERVER_NAME,
|
||||
signing_key=self.OTHER_SERVER_SIGNATURE_KEY,
|
||||
KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION],
|
||||
)
|
||||
channel = self.make_signed_federation_request(
|
||||
"PUT",
|
||||
@@ -264,6 +260,67 @@ class SendJoinFederationTests(unittest.FederatingHomeserverTestCase):
|
||||
)
|
||||
self.assertEqual(r[("m.room.member", joining_user)].membership, "join")
|
||||
|
||||
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 3}})
|
||||
def test_make_join_respects_room_join_rate_limit(self) -> None:
|
||||
# In the test setup, two users join the room. Since the rate limiter burst
|
||||
# count is 3, a new make_join request to the room should be accepted.
|
||||
|
||||
joining_user = "@ronniecorbett:" + self.OTHER_SERVER_NAME
|
||||
self._make_join(joining_user)
|
||||
|
||||
# Now have a new local user join the room. This saturates the rate limiter
|
||||
# bucket, so the next make_join should be denied.
|
||||
new_local_user = self.register_user("animal", "animal")
|
||||
token = self.login("animal", "animal")
|
||||
self.helper.join(self._room_id, new_local_user, tok=token)
|
||||
|
||||
joining_user = "@ronniebarker:" + self.OTHER_SERVER_NAME
|
||||
channel = self.make_signed_federation_request(
|
||||
"GET",
|
||||
f"/_matrix/federation/v1/make_join/{self._room_id}/{joining_user}"
|
||||
f"?ver={DEFAULT_ROOM_VERSION}",
|
||||
)
|
||||
self.assertEqual(channel.code, HTTPStatus.TOO_MANY_REQUESTS, channel.json_body)
|
||||
|
||||
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 3}})
|
||||
def test_send_join_contributes_to_room_join_rate_limit_and_is_limited(self) -> None:
|
||||
# Make two make_join requests up front. (These are rate limited, but do not
|
||||
# contribute to the rate limit.)
|
||||
join_event_dicts = []
|
||||
for i in range(2):
|
||||
joining_user = f"@misspiggy{i}:{self.OTHER_SERVER_NAME}"
|
||||
join_result = self._make_join(joining_user)
|
||||
join_event_dict = join_result["event"]
|
||||
self.add_hashes_and_signatures_from_other_server(
|
||||
join_event_dict,
|
||||
KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION],
|
||||
)
|
||||
join_event_dicts.append(join_event_dict)
|
||||
|
||||
# In the test setup, two users join the room. Since the rate limiter burst
|
||||
# count is 3, the first send_join should be accepted...
|
||||
channel = self.make_signed_federation_request(
|
||||
"PUT",
|
||||
f"/_matrix/federation/v2/send_join/{self._room_id}/join0",
|
||||
content=join_event_dicts[0],
|
||||
)
|
||||
self.assertEqual(channel.code, 200, channel.json_body)
|
||||
|
||||
# ... but the second should be denied.
|
||||
channel = self.make_signed_federation_request(
|
||||
"PUT",
|
||||
f"/_matrix/federation/v2/send_join/{self._room_id}/join1",
|
||||
content=join_event_dicts[1],
|
||||
)
|
||||
self.assertEqual(channel.code, HTTPStatus.TOO_MANY_REQUESTS, channel.json_body)
|
||||
|
||||
# NB: we could write a test which checks that the send_join event is seen
|
||||
# by other workers over replication, and that they update their rate limit
|
||||
# buckets accordingly. I'm going to assume that the join event gets sent over
|
||||
# replication, at which point the tests.handlers.room_member test
|
||||
# test_local_users_joining_on_another_worker_contribute_to_rate_limit
|
||||
# is probably sufficient to reassure that the bucket is updated.
|
||||
|
||||
|
||||
def _create_acl_event(content):
|
||||
return make_event_from_dict(
|
||||
|
||||
@@ -256,7 +256,7 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase):
|
||||
]
|
||||
for _ in range(0, 8):
|
||||
event = make_event_from_dict(
|
||||
self.add_hashes_and_signatures(
|
||||
self.add_hashes_and_signatures_from_other_server(
|
||||
{
|
||||
"origin_server_ts": 1,
|
||||
"type": "m.room.message",
|
||||
|
||||
@@ -104,7 +104,7 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
|
||||
# mock up a load of state events which we are missing
|
||||
state_events = [
|
||||
make_event_from_dict(
|
||||
self.add_hashes_and_signatures(
|
||||
self.add_hashes_and_signatures_from_other_server(
|
||||
{
|
||||
"type": "test_state_type",
|
||||
"state_key": f"state_{i}",
|
||||
@@ -131,7 +131,7 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
|
||||
# Depending on the test, we either persist this upfront (as an outlier),
|
||||
# or let the server request it.
|
||||
prev_event = make_event_from_dict(
|
||||
self.add_hashes_and_signatures(
|
||||
self.add_hashes_and_signatures_from_other_server(
|
||||
{
|
||||
"type": "test_regular_type",
|
||||
"room_id": room_id,
|
||||
@@ -165,7 +165,7 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
|
||||
|
||||
# mock up a regular event to pass into _process_pulled_event
|
||||
pulled_event = make_event_from_dict(
|
||||
self.add_hashes_and_signatures(
|
||||
self.add_hashes_and_signatures_from_other_server(
|
||||
{
|
||||
"type": "test_regular_type",
|
||||
"room_id": room_id,
|
||||
|
||||
290
tests/handlers/test_room_member.py
Normal file
290
tests/handlers/test_room_member.py
Normal file
@@ -0,0 +1,290 @@
|
||||
from http import HTTPStatus
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
import synapse.rest.admin
|
||||
import synapse.rest.client.login
|
||||
import synapse.rest.client.room
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import LimitExceededError
|
||||
from synapse.crypto.event_signing import add_hashes_and_signatures
|
||||
from synapse.events import FrozenEventV3
|
||||
from synapse.federation.federation_client import SendJoinResult
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import UserID, create_requester
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests.replication._base import RedisMultiWorkerStreamTestCase
|
||||
from tests.server import make_request
|
||||
from tests.test_utils import make_awaitable
|
||||
from tests.unittest import FederatingHomeserverTestCase, override_config
|
||||
|
||||
|
||||
class TestJoinsLimitedByPerRoomRateLimiter(FederatingHomeserverTestCase):
|
||||
servlets = [
|
||||
synapse.rest.admin.register_servlets,
|
||||
synapse.rest.client.login.register_servlets,
|
||||
synapse.rest.client.room.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.handler = hs.get_room_member_handler()
|
||||
|
||||
# Create three users.
|
||||
self.alice = self.register_user("alice", "pass")
|
||||
self.alice_token = self.login("alice", "pass")
|
||||
self.bob = self.register_user("bob", "pass")
|
||||
self.bob_token = self.login("bob", "pass")
|
||||
self.chris = self.register_user("chris", "pass")
|
||||
self.chris_token = self.login("chris", "pass")
|
||||
|
||||
# Create a room on this homeserver. Note that this counts as a join: it
|
||||
# contributes to the rate limter's count of actions
|
||||
self.room_id = self.helper.create_room_as(self.alice, tok=self.alice_token)
|
||||
|
||||
self.intially_unjoined_room_id = f"!example:{self.OTHER_SERVER_NAME}"
|
||||
|
||||
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 2}})
|
||||
def test_local_user_local_joins_contribute_to_limit_and_are_limited(self) -> None:
|
||||
# The rate limiter has accumulated one token from Alice's join after the create
|
||||
# event.
|
||||
# Try joining the room as Bob.
|
||||
self.get_success(
|
||||
self.handler.update_membership(
|
||||
requester=create_requester(self.bob),
|
||||
target=UserID.from_string(self.bob),
|
||||
room_id=self.room_id,
|
||||
action=Membership.JOIN,
|
||||
)
|
||||
)
|
||||
|
||||
# The rate limiter bucket is full. A second join should be denied.
|
||||
self.get_failure(
|
||||
self.handler.update_membership(
|
||||
requester=create_requester(self.chris),
|
||||
target=UserID.from_string(self.chris),
|
||||
room_id=self.room_id,
|
||||
action=Membership.JOIN,
|
||||
),
|
||||
LimitExceededError,
|
||||
)
|
||||
|
||||
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 2}})
|
||||
def test_local_user_profile_edits_dont_contribute_to_limit(self) -> None:
|
||||
# The rate limiter has accumulated one token from Alice's join after the create
|
||||
# event. Alice should still be able to change her displayname.
|
||||
self.get_success(
|
||||
self.handler.update_membership(
|
||||
requester=create_requester(self.alice),
|
||||
target=UserID.from_string(self.alice),
|
||||
room_id=self.room_id,
|
||||
action=Membership.JOIN,
|
||||
content={"displayname": "Alice Cooper"},
|
||||
)
|
||||
)
|
||||
|
||||
# Still room in the limiter bucket. Chris's join should be accepted.
|
||||
self.get_success(
|
||||
self.handler.update_membership(
|
||||
requester=create_requester(self.chris),
|
||||
target=UserID.from_string(self.chris),
|
||||
room_id=self.room_id,
|
||||
action=Membership.JOIN,
|
||||
)
|
||||
)
|
||||
|
||||
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 1}})
|
||||
def test_remote_joins_contribute_to_rate_limit(self) -> None:
|
||||
# Join once, to fill the rate limiter bucket.
|
||||
#
|
||||
# To do this we have to mock the responses from the remote homeserver.
|
||||
# We also patch out a bunch of event checks on our end. All we're really
|
||||
# trying to check here is that remote joins will bump the rate limter when
|
||||
# they are persisted.
|
||||
create_event_source = {
|
||||
"auth_events": [],
|
||||
"content": {
|
||||
"creator": f"@creator:{self.OTHER_SERVER_NAME}",
|
||||
"room_version": self.hs.config.server.default_room_version.identifier,
|
||||
},
|
||||
"depth": 0,
|
||||
"origin_server_ts": 0,
|
||||
"prev_events": [],
|
||||
"room_id": self.intially_unjoined_room_id,
|
||||
"sender": f"@creator:{self.OTHER_SERVER_NAME}",
|
||||
"state_key": "",
|
||||
"type": EventTypes.Create,
|
||||
}
|
||||
self.add_hashes_and_signatures_from_other_server(
|
||||
create_event_source,
|
||||
self.hs.config.server.default_room_version,
|
||||
)
|
||||
create_event = FrozenEventV3(
|
||||
create_event_source,
|
||||
self.hs.config.server.default_room_version,
|
||||
{},
|
||||
None,
|
||||
)
|
||||
|
||||
join_event_source = {
|
||||
"auth_events": [create_event.event_id],
|
||||
"content": {"membership": "join"},
|
||||
"depth": 1,
|
||||
"origin_server_ts": 100,
|
||||
"prev_events": [create_event.event_id],
|
||||
"sender": self.bob,
|
||||
"state_key": self.bob,
|
||||
"room_id": self.intially_unjoined_room_id,
|
||||
"type": EventTypes.Member,
|
||||
}
|
||||
add_hashes_and_signatures(
|
||||
self.hs.config.server.default_room_version,
|
||||
join_event_source,
|
||||
self.hs.hostname,
|
||||
self.hs.signing_key,
|
||||
)
|
||||
join_event = FrozenEventV3(
|
||||
join_event_source,
|
||||
self.hs.config.server.default_room_version,
|
||||
{},
|
||||
None,
|
||||
)
|
||||
|
||||
mock_make_membership_event = Mock(
|
||||
return_value=make_awaitable(
|
||||
(
|
||||
self.OTHER_SERVER_NAME,
|
||||
join_event,
|
||||
self.hs.config.server.default_room_version,
|
||||
)
|
||||
)
|
||||
)
|
||||
mock_send_join = Mock(
|
||||
return_value=make_awaitable(
|
||||
SendJoinResult(
|
||||
join_event,
|
||||
self.OTHER_SERVER_NAME,
|
||||
state=[create_event],
|
||||
auth_chain=[create_event],
|
||||
partial_state=False,
|
||||
servers_in_room=[],
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
with patch.object(
|
||||
self.handler.federation_handler.federation_client,
|
||||
"make_membership_event",
|
||||
mock_make_membership_event,
|
||||
), patch.object(
|
||||
self.handler.federation_handler.federation_client,
|
||||
"send_join",
|
||||
mock_send_join,
|
||||
), patch(
|
||||
"synapse.event_auth._is_membership_change_allowed",
|
||||
return_value=None,
|
||||
), patch(
|
||||
"synapse.handlers.federation_event.check_state_dependent_auth_rules",
|
||||
return_value=None,
|
||||
):
|
||||
self.get_success(
|
||||
self.handler.update_membership(
|
||||
requester=create_requester(self.bob),
|
||||
target=UserID.from_string(self.bob),
|
||||
room_id=self.intially_unjoined_room_id,
|
||||
action=Membership.JOIN,
|
||||
remote_room_hosts=[self.OTHER_SERVER_NAME],
|
||||
)
|
||||
)
|
||||
|
||||
# Try to join as Chris. Should get denied.
|
||||
self.get_failure(
|
||||
self.handler.update_membership(
|
||||
requester=create_requester(self.chris),
|
||||
target=UserID.from_string(self.chris),
|
||||
room_id=self.intially_unjoined_room_id,
|
||||
action=Membership.JOIN,
|
||||
remote_room_hosts=[self.OTHER_SERVER_NAME],
|
||||
),
|
||||
LimitExceededError,
|
||||
)
|
||||
|
||||
# TODO: test that remote joins to a room are rate limited.
|
||||
# Could do this by setting the burst count to 1, then:
|
||||
# - remote-joining a room
|
||||
# - immediately leaving
|
||||
# - trying to remote-join again.
|
||||
|
||||
|
||||
class TestReplicatedJoinsLimitedByPerRoomRateLimiter(RedisMultiWorkerStreamTestCase):
|
||||
servlets = [
|
||||
synapse.rest.admin.register_servlets,
|
||||
synapse.rest.client.login.register_servlets,
|
||||
synapse.rest.client.room.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.handler = hs.get_room_member_handler()
|
||||
|
||||
# Create three users.
|
||||
self.alice = self.register_user("alice", "pass")
|
||||
self.alice_token = self.login("alice", "pass")
|
||||
self.bob = self.register_user("bob", "pass")
|
||||
self.bob_token = self.login("bob", "pass")
|
||||
self.chris = self.register_user("chris", "pass")
|
||||
self.chris_token = self.login("chris", "pass")
|
||||
|
||||
# Create a room on this homeserver.
|
||||
# Note that this counts as a
|
||||
self.room_id = self.helper.create_room_as(self.alice, tok=self.alice_token)
|
||||
self.intially_unjoined_room_id = "!example:otherhs"
|
||||
|
||||
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 2}})
|
||||
def test_local_users_joining_on_another_worker_contribute_to_rate_limit(
|
||||
self,
|
||||
) -> None:
|
||||
# The rate limiter has accumulated one token from Alice's join after the create
|
||||
# event.
|
||||
self.replicate()
|
||||
|
||||
# Spawn another worker and have bob join via it.
|
||||
worker_app = self.make_worker_hs(
|
||||
"synapse.app.generic_worker", extra_config={"worker_name": "other worker"}
|
||||
)
|
||||
worker_site = self._hs_to_site[worker_app]
|
||||
channel = make_request(
|
||||
self.reactor,
|
||||
worker_site,
|
||||
"POST",
|
||||
f"/_matrix/client/v3/rooms/{self.room_id}/join",
|
||||
access_token=self.bob_token,
|
||||
)
|
||||
self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
|
||||
|
||||
# wait for join to arrive over replication
|
||||
self.replicate()
|
||||
|
||||
# Try to join as Chris on the worker. Should get denied because Alice
|
||||
# and Bob have both joined the room.
|
||||
self.get_failure(
|
||||
worker_app.get_room_member_handler().update_membership(
|
||||
requester=create_requester(self.chris),
|
||||
target=UserID.from_string(self.chris),
|
||||
room_id=self.room_id,
|
||||
action=Membership.JOIN,
|
||||
),
|
||||
LimitExceededError,
|
||||
)
|
||||
|
||||
# Try to join as Chris on the original worker. Should get denied because Alice
|
||||
# and Bob have both joined the room.
|
||||
self.get_failure(
|
||||
self.handler.update_membership(
|
||||
requester=create_requester(self.chris),
|
||||
target=UserID.from_string(self.chris),
|
||||
room_id=self.room_id,
|
||||
action=Membership.JOIN,
|
||||
),
|
||||
LimitExceededError,
|
||||
)
|
||||
@@ -225,7 +225,7 @@ class OptionsResourceTests(unittest.TestCase):
|
||||
parse_listener_def({"type": "http", "port": 0}),
|
||||
self.resource,
|
||||
"1.0",
|
||||
max_request_body_size=1234,
|
||||
max_request_body_size=4096,
|
||||
reactor=self.reactor,
|
||||
)
|
||||
|
||||
|
||||
@@ -272,7 +272,7 @@ class FilterEventsForClientTestCase(unittest.FederatingHomeserverTestCase):
|
||||
"state_key": "@user:test",
|
||||
"content": {"membership": "invite"},
|
||||
}
|
||||
self.add_hashes_and_signatures(invite_pdu)
|
||||
self.add_hashes_and_signatures_from_other_server(invite_pdu)
|
||||
invite_event_id = make_event_from_dict(invite_pdu, RoomVersions.V9).event_id
|
||||
|
||||
self.get_success(
|
||||
|
||||
@@ -285,7 +285,7 @@ class HomeserverTestCase(TestCase):
|
||||
config=self.hs.config.server.listeners[0],
|
||||
resource=self.resource,
|
||||
server_version_string="1",
|
||||
max_request_body_size=1234,
|
||||
max_request_body_size=4096,
|
||||
reactor=self.reactor,
|
||||
)
|
||||
|
||||
@@ -780,7 +780,7 @@ class FederatingHomeserverTestCase(HomeserverTestCase):
|
||||
verify_key_id,
|
||||
FetchKeyResult(
|
||||
verify_key=verify_key,
|
||||
valid_until_ts=clock.time_msec() + 1000,
|
||||
valid_until_ts=clock.time_msec() + 10000,
|
||||
),
|
||||
)
|
||||
],
|
||||
@@ -838,7 +838,7 @@ class FederatingHomeserverTestCase(HomeserverTestCase):
|
||||
client_ip=client_ip,
|
||||
)
|
||||
|
||||
def add_hashes_and_signatures(
|
||||
def add_hashes_and_signatures_from_other_server(
|
||||
self,
|
||||
event_dict: JsonDict,
|
||||
room_version: RoomVersion = KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION],
|
||||
|
||||
@@ -151,6 +151,7 @@ def default_config(name, parse=False):
|
||||
"local": {"per_second": 10000, "burst_count": 10000},
|
||||
"remote": {"per_second": 10000, "burst_count": 10000},
|
||||
},
|
||||
"rc_joins_per_room": {"per_second": 10000, "burst_count": 10000},
|
||||
"rc_invites": {
|
||||
"per_room": {"per_second": 10000, "burst_count": 10000},
|
||||
"per_user": {"per_second": 10000, "burst_count": 10000},
|
||||
|
||||
Reference in New Issue
Block a user