mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-19 02:20:44 +00:00
Compare commits
23 Commits
erikj/fix_
...
dmr/rate-l
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
80e2e0b4f2 | ||
|
|
5cf6700c2e | ||
|
|
fda6252233 | ||
|
|
47acf465ef | ||
|
|
8377172c4c | ||
|
|
81eb4ab86a | ||
|
|
e16294ef3b | ||
|
|
9d9253109b | ||
|
|
4da8f29ff6 | ||
|
|
dcb16831e8 | ||
|
|
121590a0c9 | ||
|
|
240e32f264 | ||
|
|
7a14b94698 | ||
|
|
0bb4122726 | ||
|
|
6b47e82ca2 | ||
|
|
4230112526 | ||
|
|
ae788ca796 | ||
|
|
77de15927a | ||
|
|
9d4cdae33a | ||
|
|
c594ab774b | ||
|
|
c2e3025b33 | ||
|
|
453f621d23 | ||
|
|
bd873e6571 |
1
changelog.d/13169.feature
Normal file
1
changelog.d/13169.feature
Normal file
@@ -0,0 +1 @@
|
|||||||
|
Add per-room rate limiting for room joins. For each room, Synapse now monitors the rate of join events in that room and throttles additional joins if that rate grows too large.
|
||||||
@@ -67,6 +67,10 @@ rc_joins:
|
|||||||
per_second: 9999
|
per_second: 9999
|
||||||
burst_count: 9999
|
burst_count: 9999
|
||||||
|
|
||||||
|
rc_joins_per_room:
|
||||||
|
per_second: 9999
|
||||||
|
burst_count: 9999
|
||||||
|
|
||||||
rc_3pid_validation:
|
rc_3pid_validation:
|
||||||
per_second: 1000
|
per_second: 1000
|
||||||
burst_count: 1000
|
burst_count: 1000
|
||||||
|
|||||||
@@ -89,6 +89,18 @@ process, for example:
|
|||||||
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
||||||
```
|
```
|
||||||
|
|
||||||
|
# Upgrading to v1.63.0
|
||||||
|
|
||||||
|
## Changes to the event replication streams
|
||||||
|
|
||||||
|
Synapse now includes a flag indicating if an event is an outlier when
|
||||||
|
replicating it to other workers. This is a forwards- and backwards-incompatible
|
||||||
|
change: v1.62 and workers cannot process events replicated by v1.63 workers, and
|
||||||
|
vice versa.
|
||||||
|
|
||||||
|
Once all workers are upgraded to v1.63 (or downgraded to v1.62), event
|
||||||
|
replication will resume as normal.
|
||||||
|
|
||||||
# Upgrading to v1.62.0
|
# Upgrading to v1.62.0
|
||||||
|
|
||||||
## New signatures for spam checker callbacks
|
## New signatures for spam checker callbacks
|
||||||
|
|||||||
@@ -1379,6 +1379,25 @@ rc_joins:
|
|||||||
per_second: 0.03
|
per_second: 0.03
|
||||||
burst_count: 12
|
burst_count: 12
|
||||||
```
|
```
|
||||||
|
---
|
||||||
|
### `rc_joins_per_room`
|
||||||
|
|
||||||
|
This option allows admins to ratelimit joins to a room based on the number of recent
|
||||||
|
joins (local or remote) to that room. It is intended to mitigate mass-join spam
|
||||||
|
waves which target multiple homeservers.
|
||||||
|
|
||||||
|
By default, one join is permitted to a room every second, with an accumulating
|
||||||
|
buffer of up to ten instantaneous joins.
|
||||||
|
|
||||||
|
Example configuration (default values):
|
||||||
|
```yaml
|
||||||
|
rc_joins_per_room:
|
||||||
|
per_second: 1
|
||||||
|
burst_count: 10
|
||||||
|
```
|
||||||
|
|
||||||
|
_Added in Synapse 1.63.0._
|
||||||
|
|
||||||
---
|
---
|
||||||
### `rc_3pid_validation`
|
### `rc_3pid_validation`
|
||||||
|
|
||||||
|
|||||||
@@ -27,6 +27,33 @@ class Ratelimiter:
|
|||||||
"""
|
"""
|
||||||
Ratelimit actions marked by arbitrary keys.
|
Ratelimit actions marked by arbitrary keys.
|
||||||
|
|
||||||
|
(Note that the source code speaks of "actions" and "burst_count" rather than
|
||||||
|
"tokens" and a "bucket_size".)
|
||||||
|
|
||||||
|
This is a "leaky bucket as a meter". For each key to be tracked there is a bucket
|
||||||
|
containing some number 0 <= T <= `burst_count` of tokens corresponding to previously
|
||||||
|
permitted requests for that key. Each bucket starts empty, and gradually leaks
|
||||||
|
tokens at a rate of `rate_hz`.
|
||||||
|
|
||||||
|
Upon an incoming request, we must determine:
|
||||||
|
- the key that this request falls under (which bucket to inspect), and
|
||||||
|
- the cost C of this request in tokens.
|
||||||
|
Then, if there is room in the bucket for C tokens (T + C <= `burst_count`),
|
||||||
|
the request is permitted and `cost` tokens are added to the bucket.
|
||||||
|
Otherwise the request is denied, and the bucket continues to hold T tokens.
|
||||||
|
|
||||||
|
This means that the limiter enforces an average request frequency of `rate_hz`,
|
||||||
|
while accumulating a buffer of up to `burst_count` requests which can be consumed
|
||||||
|
instantaneously.
|
||||||
|
|
||||||
|
The tricky bit is the leaking. We do not want to have a periodic process which
|
||||||
|
leaks every bucket! Instead, we track
|
||||||
|
- the time point when the bucket was last completely empty, and
|
||||||
|
- how many tokens have added to the bucket permitted since then.
|
||||||
|
Then for each incoming request, we can calculate how many tokens have leaked
|
||||||
|
since this time point, and use that to decide if we should accept or reject the
|
||||||
|
request.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
clock: A homeserver clock, for retrieving the current time
|
clock: A homeserver clock, for retrieving the current time
|
||||||
rate_hz: The long term number of actions that can be performed in a second.
|
rate_hz: The long term number of actions that can be performed in a second.
|
||||||
@@ -41,14 +68,36 @@ class Ratelimiter:
|
|||||||
self.burst_count = burst_count
|
self.burst_count = burst_count
|
||||||
self.store = store
|
self.store = store
|
||||||
|
|
||||||
# A ordered dictionary keeping track of actions, when they were last
|
# An ordered dictionary representing the token buckets tracked by this rate
|
||||||
# performed and how often. Each entry is a mapping from a key of arbitrary type
|
# limiter. Each entry maps a key of arbitrary type to a tuple representing:
|
||||||
# to a tuple representing:
|
# * The number of tokens currently in the bucket,
|
||||||
# * How many times an action has occurred since a point in time
|
# * The time point when the bucket was last completely empty, and
|
||||||
# * The point in time
|
# * The rate_hz (leak rate) of this particular bucket.
|
||||||
# * The rate_hz of this particular entry. This can vary per request
|
|
||||||
self.actions: OrderedDict[Hashable, Tuple[float, float, float]] = OrderedDict()
|
self.actions: OrderedDict[Hashable, Tuple[float, float, float]] = OrderedDict()
|
||||||
|
|
||||||
|
def _get_key(
|
||||||
|
self, requester: Optional[Requester], key: Optional[Hashable]
|
||||||
|
) -> Hashable:
|
||||||
|
"""Use the requester's MXID as a fallback key if no key is provided.
|
||||||
|
|
||||||
|
Pulled out so that `can_do_action` and `record_action` are consistent.
|
||||||
|
"""
|
||||||
|
if key is None:
|
||||||
|
if not requester:
|
||||||
|
raise ValueError("Must supply at least one of `requester` or `key`")
|
||||||
|
|
||||||
|
key = requester.user.to_string()
|
||||||
|
return key
|
||||||
|
|
||||||
|
def _get_action_counts(
|
||||||
|
self, key: Hashable, time_now_s: float
|
||||||
|
) -> Tuple[float, float, float]:
|
||||||
|
"""Retrieve the action counts, with a fallback representing an empty bucket.
|
||||||
|
|
||||||
|
Pulled out so that `can_do_action` and `record_action` are consistent.
|
||||||
|
"""
|
||||||
|
return self.actions.get(key, (0.0, time_now_s, 0.0))
|
||||||
|
|
||||||
async def can_do_action(
|
async def can_do_action(
|
||||||
self,
|
self,
|
||||||
requester: Optional[Requester],
|
requester: Optional[Requester],
|
||||||
@@ -88,11 +137,7 @@ class Ratelimiter:
|
|||||||
* The reactor timestamp for when the action can be performed next.
|
* The reactor timestamp for when the action can be performed next.
|
||||||
-1 if rate_hz is less than or equal to zero
|
-1 if rate_hz is less than or equal to zero
|
||||||
"""
|
"""
|
||||||
if key is None:
|
key = self._get_key(requester, key)
|
||||||
if not requester:
|
|
||||||
raise ValueError("Must supply at least one of `requester` or `key`")
|
|
||||||
|
|
||||||
key = requester.user.to_string()
|
|
||||||
|
|
||||||
if requester:
|
if requester:
|
||||||
# Disable rate limiting of users belonging to any AS that is configured
|
# Disable rate limiting of users belonging to any AS that is configured
|
||||||
@@ -121,7 +166,7 @@ class Ratelimiter:
|
|||||||
self._prune_message_counts(time_now_s)
|
self._prune_message_counts(time_now_s)
|
||||||
|
|
||||||
# Check if there is an existing count entry for this key
|
# Check if there is an existing count entry for this key
|
||||||
action_count, time_start, _ = self.actions.get(key, (0.0, time_now_s, 0.0))
|
action_count, time_start, _ = self._get_action_counts(key, time_now_s)
|
||||||
|
|
||||||
# Check whether performing another action is allowed
|
# Check whether performing another action is allowed
|
||||||
time_delta = time_now_s - time_start
|
time_delta = time_now_s - time_start
|
||||||
@@ -164,6 +209,37 @@ class Ratelimiter:
|
|||||||
|
|
||||||
return allowed, time_allowed
|
return allowed, time_allowed
|
||||||
|
|
||||||
|
def record_action(
|
||||||
|
self,
|
||||||
|
requester: Optional[Requester],
|
||||||
|
key: Optional[Hashable] = None,
|
||||||
|
n_actions: int = 1,
|
||||||
|
_time_now_s: Optional[float] = None,
|
||||||
|
) -> None:
|
||||||
|
"""Record that an action(s) took place, even if they violate the rate limit.
|
||||||
|
|
||||||
|
This is useful for tracking the frequency of events that happen across
|
||||||
|
federation which we still want to impose local rate limits on. For instance, if
|
||||||
|
we are alice.com monitoring a particular room, we cannot prevent bob.com
|
||||||
|
from joining users to that room. However, we can track the number of recent
|
||||||
|
joins in the room and refuse to serve new joins ourselves if there have been too
|
||||||
|
many in the room across both homeservers.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
requester: The requester that is doing the action, if any.
|
||||||
|
key: An arbitrary key used to classify an action. Defaults to the
|
||||||
|
requester's user ID.
|
||||||
|
n_actions: The number of times the user wants to do this action. If the user
|
||||||
|
cannot do all of the actions, the user's action count is not incremented
|
||||||
|
at all.
|
||||||
|
_time_now_s: The current time. Optional, defaults to the current time according
|
||||||
|
to self.clock. Only used by tests.
|
||||||
|
"""
|
||||||
|
key = self._get_key(requester, key)
|
||||||
|
time_now_s = _time_now_s if _time_now_s is not None else self.clock.time()
|
||||||
|
action_count, time_start, rate_hz = self._get_action_counts(key, time_now_s)
|
||||||
|
self.actions[key] = (action_count + n_actions, time_start, rate_hz)
|
||||||
|
|
||||||
def _prune_message_counts(self, time_now_s: float) -> None:
|
def _prune_message_counts(self, time_now_s: float) -> None:
|
||||||
"""Remove message count entries that have not exceeded their defined
|
"""Remove message count entries that have not exceeded their defined
|
||||||
rate_hz limit
|
rate_hz limit
|
||||||
|
|||||||
@@ -112,6 +112,13 @@ class RatelimitConfig(Config):
|
|||||||
defaults={"per_second": 0.01, "burst_count": 10},
|
defaults={"per_second": 0.01, "burst_count": 10},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Track the rate of joins to a given room. If there are too many, temporarily
|
||||||
|
# prevent local joins and remote joins via this server.
|
||||||
|
self.rc_joins_per_room = RateLimitConfig(
|
||||||
|
config.get("rc_joins_per_room", {}),
|
||||||
|
defaults={"per_second": 1, "burst_count": 10},
|
||||||
|
)
|
||||||
|
|
||||||
# Ratelimit cross-user key requests:
|
# Ratelimit cross-user key requests:
|
||||||
# * For local requests this is keyed by the sending device.
|
# * For local requests this is keyed by the sending device.
|
||||||
# * For requests received over federation this is keyed by the origin.
|
# * For requests received over federation this is keyed by the origin.
|
||||||
|
|||||||
@@ -117,6 +117,7 @@ class FederationServer(FederationBase):
|
|||||||
self._federation_event_handler = hs.get_federation_event_handler()
|
self._federation_event_handler = hs.get_federation_event_handler()
|
||||||
self.state = hs.get_state_handler()
|
self.state = hs.get_state_handler()
|
||||||
self._event_auth_handler = hs.get_event_auth_handler()
|
self._event_auth_handler = hs.get_event_auth_handler()
|
||||||
|
self._room_member_handler = hs.get_room_member_handler()
|
||||||
|
|
||||||
self._state_storage_controller = hs.get_storage_controllers().state
|
self._state_storage_controller = hs.get_storage_controllers().state
|
||||||
|
|
||||||
@@ -620,6 +621,15 @@ class FederationServer(FederationBase):
|
|||||||
)
|
)
|
||||||
raise IncompatibleRoomVersionError(room_version=room_version)
|
raise IncompatibleRoomVersionError(room_version=room_version)
|
||||||
|
|
||||||
|
# Refuse the request if that room has seen too many joins recently.
|
||||||
|
# This is in addition to the HS-level rate limiting applied by
|
||||||
|
# BaseFederationServlet.
|
||||||
|
# type-ignore: mypy doesn't seem able to deduce the type of the limiter(!?)
|
||||||
|
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
|
||||||
|
requester=None,
|
||||||
|
key=room_id,
|
||||||
|
update=False,
|
||||||
|
)
|
||||||
pdu = await self.handler.on_make_join_request(origin, room_id, user_id)
|
pdu = await self.handler.on_make_join_request(origin, room_id, user_id)
|
||||||
return {"event": pdu.get_templated_pdu_json(), "room_version": room_version}
|
return {"event": pdu.get_templated_pdu_json(), "room_version": room_version}
|
||||||
|
|
||||||
@@ -654,6 +664,12 @@ class FederationServer(FederationBase):
|
|||||||
room_id: str,
|
room_id: str,
|
||||||
caller_supports_partial_state: bool = False,
|
caller_supports_partial_state: bool = False,
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
|
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
|
||||||
|
requester=None,
|
||||||
|
key=room_id,
|
||||||
|
update=False,
|
||||||
|
)
|
||||||
|
|
||||||
event, context = await self._on_send_membership_event(
|
event, context = await self._on_send_membership_event(
|
||||||
origin, content, Membership.JOIN, room_id
|
origin, content, Membership.JOIN, room_id
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -2063,6 +2063,10 @@ class FederationEventHandler:
|
|||||||
event, event_pos, max_stream_token, extra_users=extra_users
|
event, event_pos, max_stream_token, extra_users=extra_users
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
|
||||||
|
# TODO retrieve the previous state, and exclude join -> join transitions
|
||||||
|
self._notifier.notify_user_joined_room(event.event_id, event.room_id)
|
||||||
|
|
||||||
def _sanity_check_event(self, ev: EventBase) -> None:
|
def _sanity_check_event(self, ev: EventBase) -> None:
|
||||||
"""
|
"""
|
||||||
Do some early sanity checks of a received event
|
Do some early sanity checks of a received event
|
||||||
|
|||||||
@@ -461,6 +461,7 @@ class EventCreationHandler:
|
|||||||
)
|
)
|
||||||
self._events_shard_config = self.config.worker.events_shard_config
|
self._events_shard_config = self.config.worker.events_shard_config
|
||||||
self._instance_name = hs.get_instance_name()
|
self._instance_name = hs.get_instance_name()
|
||||||
|
self._notifier = hs.get_notifier()
|
||||||
|
|
||||||
self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state
|
self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state
|
||||||
|
|
||||||
@@ -1515,6 +1516,16 @@ class EventCreationHandler:
|
|||||||
requester, is_admin_redaction=is_admin_redaction
|
requester, is_admin_redaction=is_admin_redaction
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
|
||||||
|
(
|
||||||
|
current_membership,
|
||||||
|
_,
|
||||||
|
) = await self.store.get_local_current_membership_for_user_in_room(
|
||||||
|
event.state_key, event.room_id
|
||||||
|
)
|
||||||
|
if current_membership != Membership.JOIN:
|
||||||
|
self._notifier.notify_user_joined_room(event.event_id, event.room_id)
|
||||||
|
|
||||||
await self._maybe_kick_guest_users(event, context)
|
await self._maybe_kick_guest_users(event, context)
|
||||||
|
|
||||||
if event.type == EventTypes.CanonicalAlias:
|
if event.type == EventTypes.CanonicalAlias:
|
||||||
|
|||||||
@@ -94,12 +94,29 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
|||||||
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
|
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
|
||||||
burst_count=hs.config.ratelimiting.rc_joins_local.burst_count,
|
burst_count=hs.config.ratelimiting.rc_joins_local.burst_count,
|
||||||
)
|
)
|
||||||
|
# Tracks joins from local users to rooms this server isn't a member of.
|
||||||
|
# I.e. joins this server makes by requesting /make_join /send_join from
|
||||||
|
# another server.
|
||||||
self._join_rate_limiter_remote = Ratelimiter(
|
self._join_rate_limiter_remote = Ratelimiter(
|
||||||
store=self.store,
|
store=self.store,
|
||||||
clock=self.clock,
|
clock=self.clock,
|
||||||
rate_hz=hs.config.ratelimiting.rc_joins_remote.per_second,
|
rate_hz=hs.config.ratelimiting.rc_joins_remote.per_second,
|
||||||
burst_count=hs.config.ratelimiting.rc_joins_remote.burst_count,
|
burst_count=hs.config.ratelimiting.rc_joins_remote.burst_count,
|
||||||
)
|
)
|
||||||
|
# TODO: find a better place to keep this Ratelimiter.
|
||||||
|
# It needs to be
|
||||||
|
# - written to by event persistence code
|
||||||
|
# - written to by something which can snoop on replication streams
|
||||||
|
# - read by the RoomMemberHandler to rate limit joins from local users
|
||||||
|
# - read by the FederationServer to rate limit make_joins and send_joins from
|
||||||
|
# other homeservers
|
||||||
|
# I wonder if a homeserver-wide collection of rate limiters might be cleaner?
|
||||||
|
self._join_rate_per_room_limiter = Ratelimiter(
|
||||||
|
store=self.store,
|
||||||
|
clock=self.clock,
|
||||||
|
rate_hz=hs.config.ratelimiting.rc_joins_per_room.per_second,
|
||||||
|
burst_count=hs.config.ratelimiting.rc_joins_per_room.burst_count,
|
||||||
|
)
|
||||||
|
|
||||||
self._invites_per_room_limiter = Ratelimiter(
|
self._invites_per_room_limiter = Ratelimiter(
|
||||||
store=self.store,
|
store=self.store,
|
||||||
@@ -122,6 +139,18 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
|||||||
)
|
)
|
||||||
|
|
||||||
self.request_ratelimiter = hs.get_request_ratelimiter()
|
self.request_ratelimiter = hs.get_request_ratelimiter()
|
||||||
|
hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room)
|
||||||
|
|
||||||
|
def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
|
||||||
|
"""Notify the rate limiter that a room join has occurred.
|
||||||
|
|
||||||
|
Use this to inform the RoomMemberHandler about joins that have either
|
||||||
|
- taken place on another homeserver, or
|
||||||
|
- on another worker in this homeserver.
|
||||||
|
Joins actioned by this worker should use the usual `ratelimit` method, which
|
||||||
|
checks the limit and increments the counter in one go.
|
||||||
|
"""
|
||||||
|
self._join_rate_per_room_limiter.record_action(requester=None, key=room_id)
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
async def _remote_join(
|
async def _remote_join(
|
||||||
@@ -375,6 +404,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
|||||||
# up blocking profile updates.
|
# up blocking profile updates.
|
||||||
if newly_joined and ratelimit:
|
if newly_joined and ratelimit:
|
||||||
await self._join_rate_limiter_local.ratelimit(requester)
|
await self._join_rate_limiter_local.ratelimit(requester)
|
||||||
|
await self._join_rate_per_room_limiter.ratelimit(
|
||||||
|
requester, key=room_id, update=False
|
||||||
|
)
|
||||||
|
|
||||||
result_event = await self.event_creation_handler.handle_new_client_event(
|
result_event = await self.event_creation_handler.handle_new_client_event(
|
||||||
requester,
|
requester,
|
||||||
@@ -823,6 +855,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
|||||||
await self._join_rate_limiter_remote.ratelimit(
|
await self._join_rate_limiter_remote.ratelimit(
|
||||||
requester,
|
requester,
|
||||||
)
|
)
|
||||||
|
await self._join_rate_per_room_limiter.ratelimit(
|
||||||
|
requester,
|
||||||
|
key=room_id,
|
||||||
|
update=False,
|
||||||
|
)
|
||||||
|
|
||||||
inviter = await self._get_inviter(target.to_string(), room_id)
|
inviter = await self._get_inviter(target.to_string(), room_id)
|
||||||
if inviter and not self.hs.is_mine(inviter):
|
if inviter and not self.hs.is_mine(inviter):
|
||||||
|
|||||||
@@ -228,6 +228,7 @@ class Notifier:
|
|||||||
|
|
||||||
# Called when there are new things to stream over replication
|
# Called when there are new things to stream over replication
|
||||||
self.replication_callbacks: List[Callable[[], None]] = []
|
self.replication_callbacks: List[Callable[[], None]] = []
|
||||||
|
self._new_join_in_room_callbacks: List[Callable[[str, str], None]] = []
|
||||||
|
|
||||||
self._federation_client = hs.get_federation_http_client()
|
self._federation_client = hs.get_federation_http_client()
|
||||||
|
|
||||||
@@ -280,6 +281,19 @@ class Notifier:
|
|||||||
"""
|
"""
|
||||||
self.replication_callbacks.append(cb)
|
self.replication_callbacks.append(cb)
|
||||||
|
|
||||||
|
def add_new_join_in_room_callback(self, cb: Callable[[str, str], None]) -> None:
|
||||||
|
"""Add a callback that will be called when a user joins a room.
|
||||||
|
|
||||||
|
This only fires on genuine membership changes, e.g. "invite" -> "join".
|
||||||
|
Membership transitions like "join" -> "join" (for e.g. displayname changes) do
|
||||||
|
not trigger the callback.
|
||||||
|
|
||||||
|
When called, the callback receives two arguments: the event ID and the room ID.
|
||||||
|
It should *not* return a Deferred - if it needs to do any asynchronous work, a
|
||||||
|
background thread should be started and wrapped with run_as_background_process.
|
||||||
|
"""
|
||||||
|
self._new_join_in_room_callbacks.append(cb)
|
||||||
|
|
||||||
async def on_new_room_event(
|
async def on_new_room_event(
|
||||||
self,
|
self,
|
||||||
event: EventBase,
|
event: EventBase,
|
||||||
@@ -723,6 +737,10 @@ class Notifier:
|
|||||||
for cb in self.replication_callbacks:
|
for cb in self.replication_callbacks:
|
||||||
cb()
|
cb()
|
||||||
|
|
||||||
|
def notify_user_joined_room(self, event_id: str, room_id: str) -> None:
|
||||||
|
for cb in self._new_join_in_room_callbacks:
|
||||||
|
cb(event_id, room_id)
|
||||||
|
|
||||||
def notify_remote_server_up(self, server: str) -> None:
|
def notify_remote_server_up(self, server: str) -> None:
|
||||||
"""Notify any replication that a remote server has come back up"""
|
"""Notify any replication that a remote server has come back up"""
|
||||||
# We call federation_sender directly rather than registering as a
|
# We call federation_sender directly rather than registering as a
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ from twisted.internet.interfaces import IAddress, IConnector
|
|||||||
from twisted.internet.protocol import ReconnectingClientFactory
|
from twisted.internet.protocol import ReconnectingClientFactory
|
||||||
from twisted.python.failure import Failure
|
from twisted.python.failure import Failure
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes, ReceiptTypes
|
from synapse.api.constants import EventTypes, Membership, ReceiptTypes
|
||||||
from synapse.federation import send_queue
|
from synapse.federation import send_queue
|
||||||
from synapse.federation.sender import FederationSender
|
from synapse.federation.sender import FederationSender
|
||||||
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
|
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
|
||||||
@@ -219,6 +219,21 @@ class ReplicationDataHandler:
|
|||||||
membership=row.data.membership,
|
membership=row.data.membership,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# If this event is a join, make a note of it so we have an accurate
|
||||||
|
# cross-worker room rate limit.
|
||||||
|
# TODO: Erik said we should exclude rows that came from ex_outliers
|
||||||
|
# here, but I don't see how we can determine that. I guess we could
|
||||||
|
# add a flag to row.data?
|
||||||
|
if (
|
||||||
|
row.data.type == EventTypes.Member
|
||||||
|
and row.data.membership == Membership.JOIN
|
||||||
|
and not row.data.outlier
|
||||||
|
):
|
||||||
|
# TODO retrieve the previous state, and exclude join -> join transitions
|
||||||
|
self.notifier.notify_user_joined_room(
|
||||||
|
row.data.event_id, row.data.room_id
|
||||||
|
)
|
||||||
|
|
||||||
await self._presence_handler.process_replication_rows(
|
await self._presence_handler.process_replication_rows(
|
||||||
stream_name, instance_name, token, rows
|
stream_name, instance_name, token, rows
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -98,6 +98,7 @@ class EventsStreamEventRow(BaseEventsStreamRow):
|
|||||||
relates_to: Optional[str]
|
relates_to: Optional[str]
|
||||||
membership: Optional[str]
|
membership: Optional[str]
|
||||||
rejected: bool
|
rejected: bool
|
||||||
|
outlier: bool
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
|||||||
@@ -1465,7 +1465,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
|
|
||||||
async def get_all_new_forward_event_rows(
|
async def get_all_new_forward_event_rows(
|
||||||
self, instance_name: str, last_id: int, current_id: int, limit: int
|
self, instance_name: str, last_id: int, current_id: int, limit: int
|
||||||
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
|
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
|
||||||
"""Returns new events, for the Events replication stream
|
"""Returns new events, for the Events replication stream
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -1481,10 +1481,11 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
|
|
||||||
def get_all_new_forward_event_rows(
|
def get_all_new_forward_event_rows(
|
||||||
txn: LoggingTransaction,
|
txn: LoggingTransaction,
|
||||||
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
|
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
|
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
|
||||||
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
|
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL,"
|
||||||
|
" e.outlier"
|
||||||
" FROM events AS e"
|
" FROM events AS e"
|
||||||
" LEFT JOIN redactions USING (event_id)"
|
" LEFT JOIN redactions USING (event_id)"
|
||||||
" LEFT JOIN state_events AS se USING (event_id)"
|
" LEFT JOIN state_events AS se USING (event_id)"
|
||||||
@@ -1498,7 +1499,8 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
)
|
)
|
||||||
txn.execute(sql, (last_id, current_id, instance_name, limit))
|
txn.execute(sql, (last_id, current_id, instance_name, limit))
|
||||||
return cast(
|
return cast(
|
||||||
List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall()
|
List[Tuple[int, str, str, str, str, str, str, str, bool, bool]],
|
||||||
|
txn.fetchall(),
|
||||||
)
|
)
|
||||||
|
|
||||||
return await self.db_pool.runInteraction(
|
return await self.db_pool.runInteraction(
|
||||||
@@ -1507,7 +1509,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
|
|
||||||
async def get_ex_outlier_stream_rows(
|
async def get_ex_outlier_stream_rows(
|
||||||
self, instance_name: str, last_id: int, current_id: int
|
self, instance_name: str, last_id: int, current_id: int
|
||||||
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
|
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
|
||||||
"""Returns de-outliered events, for the Events replication stream
|
"""Returns de-outliered events, for the Events replication stream
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -1522,11 +1524,14 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
|
|
||||||
def get_ex_outlier_stream_rows_txn(
|
def get_ex_outlier_stream_rows_txn(
|
||||||
txn: LoggingTransaction,
|
txn: LoggingTransaction,
|
||||||
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
|
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
|
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
|
||||||
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
|
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL,"
|
||||||
|
" e.outlier"
|
||||||
" FROM events AS e"
|
" FROM events AS e"
|
||||||
|
# NB: the next line (inner join) is what makes this query different from
|
||||||
|
# get_all_new_forward_event_rows.
|
||||||
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
|
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
|
||||||
" LEFT JOIN redactions USING (event_id)"
|
" LEFT JOIN redactions USING (event_id)"
|
||||||
" LEFT JOIN state_events AS se USING (event_id)"
|
" LEFT JOIN state_events AS se USING (event_id)"
|
||||||
@@ -1541,7 +1546,8 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
|
|
||||||
txn.execute(sql, (last_id, current_id, instance_name))
|
txn.execute(sql, (last_id, current_id, instance_name))
|
||||||
return cast(
|
return cast(
|
||||||
List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall()
|
List[Tuple[int, str, str, str, str, str, str, str, bool, bool]],
|
||||||
|
txn.fetchall(),
|
||||||
)
|
)
|
||||||
|
|
||||||
return await self.db_pool.runInteraction(
|
return await self.db_pool.runInteraction(
|
||||||
|
|||||||
@@ -314,3 +314,77 @@ class TestRatelimiter(unittest.HomeserverTestCase):
|
|||||||
|
|
||||||
# Check that we get rate limited after using that token.
|
# Check that we get rate limited after using that token.
|
||||||
self.assertFalse(consume_at(11.1))
|
self.assertFalse(consume_at(11.1))
|
||||||
|
|
||||||
|
def test_record_action_which_doesnt_fill_bucket(self) -> None:
|
||||||
|
limiter = Ratelimiter(
|
||||||
|
store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=3
|
||||||
|
)
|
||||||
|
|
||||||
|
# Observe two actions, leaving room in the bucket for one more.
|
||||||
|
limiter.record_action(requester=None, key="a", n_actions=2, _time_now_s=0.0)
|
||||||
|
|
||||||
|
# We should be able to take a new action now.
|
||||||
|
success, _ = self.get_success_or_raise(
|
||||||
|
limiter.can_do_action(requester=None, key="a", _time_now_s=0.0)
|
||||||
|
)
|
||||||
|
self.assertTrue(success)
|
||||||
|
|
||||||
|
# ... but not two.
|
||||||
|
success, _ = self.get_success_or_raise(
|
||||||
|
limiter.can_do_action(requester=None, key="a", _time_now_s=0.0)
|
||||||
|
)
|
||||||
|
self.assertFalse(success)
|
||||||
|
|
||||||
|
def test_record_action_which_fills_bucket(self) -> None:
|
||||||
|
limiter = Ratelimiter(
|
||||||
|
store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=3
|
||||||
|
)
|
||||||
|
|
||||||
|
# Observe three actions, filling up the bucket.
|
||||||
|
limiter.record_action(requester=None, key="a", n_actions=3, _time_now_s=0.0)
|
||||||
|
|
||||||
|
# We should be unable to take a new action now.
|
||||||
|
success, _ = self.get_success_or_raise(
|
||||||
|
limiter.can_do_action(requester=None, key="a", _time_now_s=0.0)
|
||||||
|
)
|
||||||
|
self.assertFalse(success)
|
||||||
|
|
||||||
|
# If we wait 10 seconds to leak a token, we should be able to take one action...
|
||||||
|
success, _ = self.get_success_or_raise(
|
||||||
|
limiter.can_do_action(requester=None, key="a", _time_now_s=10.0)
|
||||||
|
)
|
||||||
|
self.assertTrue(success)
|
||||||
|
|
||||||
|
# ... but not two.
|
||||||
|
success, _ = self.get_success_or_raise(
|
||||||
|
limiter.can_do_action(requester=None, key="a", _time_now_s=10.0)
|
||||||
|
)
|
||||||
|
self.assertFalse(success)
|
||||||
|
|
||||||
|
def test_record_action_which_overfills_bucket(self) -> None:
|
||||||
|
limiter = Ratelimiter(
|
||||||
|
store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=3
|
||||||
|
)
|
||||||
|
|
||||||
|
# Observe four actions, exceeding the bucket.
|
||||||
|
limiter.record_action(requester=None, key="a", n_actions=4, _time_now_s=0.0)
|
||||||
|
|
||||||
|
# We should be prevented from taking a new action now.
|
||||||
|
success, _ = self.get_success_or_raise(
|
||||||
|
limiter.can_do_action(requester=None, key="a", _time_now_s=0.0)
|
||||||
|
)
|
||||||
|
self.assertFalse(success)
|
||||||
|
|
||||||
|
# If we wait 10 seconds to leak a token, we should be unable to take an action
|
||||||
|
# because the bucket is still full.
|
||||||
|
success, _ = self.get_success_or_raise(
|
||||||
|
limiter.can_do_action(requester=None, key="a", _time_now_s=10.0)
|
||||||
|
)
|
||||||
|
self.assertFalse(success)
|
||||||
|
|
||||||
|
# But after another 10 seconds we leak a second token, giving us room for
|
||||||
|
# action.
|
||||||
|
success, _ = self.get_success_or_raise(
|
||||||
|
limiter.can_do_action(requester=None, key="a", _time_now_s=20.0)
|
||||||
|
)
|
||||||
|
self.assertTrue(success)
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ class FederationClientTest(FederatingHomeserverTestCase):
|
|||||||
# mock up some events to use in the response.
|
# mock up some events to use in the response.
|
||||||
# In real life, these would have things in `prev_events` and `auth_events`, but that's
|
# In real life, these would have things in `prev_events` and `auth_events`, but that's
|
||||||
# a bit annoying to mock up, and the code under test doesn't care, so we don't bother.
|
# a bit annoying to mock up, and the code under test doesn't care, so we don't bother.
|
||||||
create_event_dict = self.add_hashes_and_signatures(
|
create_event_dict = self.add_hashes_and_signatures_from_other_server(
|
||||||
{
|
{
|
||||||
"room_id": test_room_id,
|
"room_id": test_room_id,
|
||||||
"type": "m.room.create",
|
"type": "m.room.create",
|
||||||
@@ -57,7 +57,7 @@ class FederationClientTest(FederatingHomeserverTestCase):
|
|||||||
"origin_server_ts": 500,
|
"origin_server_ts": 500,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
member_event_dict = self.add_hashes_and_signatures(
|
member_event_dict = self.add_hashes_and_signatures_from_other_server(
|
||||||
{
|
{
|
||||||
"room_id": test_room_id,
|
"room_id": test_room_id,
|
||||||
"type": "m.room.member",
|
"type": "m.room.member",
|
||||||
@@ -69,7 +69,7 @@ class FederationClientTest(FederatingHomeserverTestCase):
|
|||||||
"origin_server_ts": 600,
|
"origin_server_ts": 600,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
pl_event_dict = self.add_hashes_and_signatures(
|
pl_event_dict = self.add_hashes_and_signatures_from_other_server(
|
||||||
{
|
{
|
||||||
"room_id": test_room_id,
|
"room_id": test_room_id,
|
||||||
"type": "m.room.power_levels",
|
"type": "m.room.power_levels",
|
||||||
|
|||||||
@@ -13,6 +13,7 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import logging
|
import logging
|
||||||
|
from http import HTTPStatus
|
||||||
|
|
||||||
from parameterized import parameterized
|
from parameterized import parameterized
|
||||||
|
|
||||||
@@ -20,7 +21,6 @@ from twisted.test.proto_helpers import MemoryReactor
|
|||||||
|
|
||||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||||
from synapse.config.server import DEFAULT_ROOM_VERSION
|
from synapse.config.server import DEFAULT_ROOM_VERSION
|
||||||
from synapse.crypto.event_signing import add_hashes_and_signatures
|
|
||||||
from synapse.events import make_event_from_dict
|
from synapse.events import make_event_from_dict
|
||||||
from synapse.federation.federation_server import server_matches_acl_event
|
from synapse.federation.federation_server import server_matches_acl_event
|
||||||
from synapse.rest import admin
|
from synapse.rest import admin
|
||||||
@@ -148,7 +148,7 @@ class SendJoinFederationTests(unittest.FederatingHomeserverTestCase):
|
|||||||
tok2 = self.login("fozzie", "bear")
|
tok2 = self.login("fozzie", "bear")
|
||||||
self.helper.join(self._room_id, second_member_user_id, tok=tok2)
|
self.helper.join(self._room_id, second_member_user_id, tok=tok2)
|
||||||
|
|
||||||
def _make_join(self, user_id) -> JsonDict:
|
def _make_join(self, user_id: str) -> JsonDict:
|
||||||
channel = self.make_signed_federation_request(
|
channel = self.make_signed_federation_request(
|
||||||
"GET",
|
"GET",
|
||||||
f"/_matrix/federation/v1/make_join/{self._room_id}/{user_id}"
|
f"/_matrix/federation/v1/make_join/{self._room_id}/{user_id}"
|
||||||
@@ -163,11 +163,9 @@ class SendJoinFederationTests(unittest.FederatingHomeserverTestCase):
|
|||||||
join_result = self._make_join(joining_user)
|
join_result = self._make_join(joining_user)
|
||||||
|
|
||||||
join_event_dict = join_result["event"]
|
join_event_dict = join_result["event"]
|
||||||
add_hashes_and_signatures(
|
self.add_hashes_and_signatures_from_other_server(
|
||||||
KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION],
|
|
||||||
join_event_dict,
|
join_event_dict,
|
||||||
signature_name=self.OTHER_SERVER_NAME,
|
KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION],
|
||||||
signing_key=self.OTHER_SERVER_SIGNATURE_KEY,
|
|
||||||
)
|
)
|
||||||
channel = self.make_signed_federation_request(
|
channel = self.make_signed_federation_request(
|
||||||
"PUT",
|
"PUT",
|
||||||
@@ -220,11 +218,9 @@ class SendJoinFederationTests(unittest.FederatingHomeserverTestCase):
|
|||||||
join_result = self._make_join(joining_user)
|
join_result = self._make_join(joining_user)
|
||||||
|
|
||||||
join_event_dict = join_result["event"]
|
join_event_dict = join_result["event"]
|
||||||
add_hashes_and_signatures(
|
self.add_hashes_and_signatures_from_other_server(
|
||||||
KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION],
|
|
||||||
join_event_dict,
|
join_event_dict,
|
||||||
signature_name=self.OTHER_SERVER_NAME,
|
KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION],
|
||||||
signing_key=self.OTHER_SERVER_SIGNATURE_KEY,
|
|
||||||
)
|
)
|
||||||
channel = self.make_signed_federation_request(
|
channel = self.make_signed_federation_request(
|
||||||
"PUT",
|
"PUT",
|
||||||
@@ -264,6 +260,67 @@ class SendJoinFederationTests(unittest.FederatingHomeserverTestCase):
|
|||||||
)
|
)
|
||||||
self.assertEqual(r[("m.room.member", joining_user)].membership, "join")
|
self.assertEqual(r[("m.room.member", joining_user)].membership, "join")
|
||||||
|
|
||||||
|
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 3}})
|
||||||
|
def test_make_join_respects_room_join_rate_limit(self) -> None:
|
||||||
|
# In the test setup, two users join the room. Since the rate limiter burst
|
||||||
|
# count is 3, a new make_join request to the room should be accepted.
|
||||||
|
|
||||||
|
joining_user = "@ronniecorbett:" + self.OTHER_SERVER_NAME
|
||||||
|
self._make_join(joining_user)
|
||||||
|
|
||||||
|
# Now have a new local user join the room. This saturates the rate limiter
|
||||||
|
# bucket, so the next make_join should be denied.
|
||||||
|
new_local_user = self.register_user("animal", "animal")
|
||||||
|
token = self.login("animal", "animal")
|
||||||
|
self.helper.join(self._room_id, new_local_user, tok=token)
|
||||||
|
|
||||||
|
joining_user = "@ronniebarker:" + self.OTHER_SERVER_NAME
|
||||||
|
channel = self.make_signed_federation_request(
|
||||||
|
"GET",
|
||||||
|
f"/_matrix/federation/v1/make_join/{self._room_id}/{joining_user}"
|
||||||
|
f"?ver={DEFAULT_ROOM_VERSION}",
|
||||||
|
)
|
||||||
|
self.assertEqual(channel.code, HTTPStatus.TOO_MANY_REQUESTS, channel.json_body)
|
||||||
|
|
||||||
|
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 3}})
|
||||||
|
def test_send_join_contributes_to_room_join_rate_limit_and_is_limited(self) -> None:
|
||||||
|
# Make two make_join requests up front. (These are rate limited, but do not
|
||||||
|
# contribute to the rate limit.)
|
||||||
|
join_event_dicts = []
|
||||||
|
for i in range(2):
|
||||||
|
joining_user = f"@misspiggy{i}:{self.OTHER_SERVER_NAME}"
|
||||||
|
join_result = self._make_join(joining_user)
|
||||||
|
join_event_dict = join_result["event"]
|
||||||
|
self.add_hashes_and_signatures_from_other_server(
|
||||||
|
join_event_dict,
|
||||||
|
KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION],
|
||||||
|
)
|
||||||
|
join_event_dicts.append(join_event_dict)
|
||||||
|
|
||||||
|
# In the test setup, two users join the room. Since the rate limiter burst
|
||||||
|
# count is 3, the first send_join should be accepted...
|
||||||
|
channel = self.make_signed_federation_request(
|
||||||
|
"PUT",
|
||||||
|
f"/_matrix/federation/v2/send_join/{self._room_id}/join0",
|
||||||
|
content=join_event_dicts[0],
|
||||||
|
)
|
||||||
|
self.assertEqual(channel.code, 200, channel.json_body)
|
||||||
|
|
||||||
|
# ... but the second should be denied.
|
||||||
|
channel = self.make_signed_federation_request(
|
||||||
|
"PUT",
|
||||||
|
f"/_matrix/federation/v2/send_join/{self._room_id}/join1",
|
||||||
|
content=join_event_dicts[1],
|
||||||
|
)
|
||||||
|
self.assertEqual(channel.code, HTTPStatus.TOO_MANY_REQUESTS, channel.json_body)
|
||||||
|
|
||||||
|
# NB: we could write a test which checks that the send_join event is seen
|
||||||
|
# by other workers over replication, and that they update their rate limit
|
||||||
|
# buckets accordingly. I'm going to assume that the join event gets sent over
|
||||||
|
# replication, at which point the tests.handlers.room_member test
|
||||||
|
# test_local_users_joining_on_another_worker_contribute_to_rate_limit
|
||||||
|
# is probably sufficient to reassure that the bucket is updated.
|
||||||
|
|
||||||
|
|
||||||
def _create_acl_event(content):
|
def _create_acl_event(content):
|
||||||
return make_event_from_dict(
|
return make_event_from_dict(
|
||||||
|
|||||||
@@ -256,7 +256,7 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase):
|
|||||||
]
|
]
|
||||||
for _ in range(0, 8):
|
for _ in range(0, 8):
|
||||||
event = make_event_from_dict(
|
event = make_event_from_dict(
|
||||||
self.add_hashes_and_signatures(
|
self.add_hashes_and_signatures_from_other_server(
|
||||||
{
|
{
|
||||||
"origin_server_ts": 1,
|
"origin_server_ts": 1,
|
||||||
"type": "m.room.message",
|
"type": "m.room.message",
|
||||||
|
|||||||
@@ -104,7 +104,7 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
|
|||||||
# mock up a load of state events which we are missing
|
# mock up a load of state events which we are missing
|
||||||
state_events = [
|
state_events = [
|
||||||
make_event_from_dict(
|
make_event_from_dict(
|
||||||
self.add_hashes_and_signatures(
|
self.add_hashes_and_signatures_from_other_server(
|
||||||
{
|
{
|
||||||
"type": "test_state_type",
|
"type": "test_state_type",
|
||||||
"state_key": f"state_{i}",
|
"state_key": f"state_{i}",
|
||||||
@@ -131,7 +131,7 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
|
|||||||
# Depending on the test, we either persist this upfront (as an outlier),
|
# Depending on the test, we either persist this upfront (as an outlier),
|
||||||
# or let the server request it.
|
# or let the server request it.
|
||||||
prev_event = make_event_from_dict(
|
prev_event = make_event_from_dict(
|
||||||
self.add_hashes_and_signatures(
|
self.add_hashes_and_signatures_from_other_server(
|
||||||
{
|
{
|
||||||
"type": "test_regular_type",
|
"type": "test_regular_type",
|
||||||
"room_id": room_id,
|
"room_id": room_id,
|
||||||
@@ -165,7 +165,7 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
|
|||||||
|
|
||||||
# mock up a regular event to pass into _process_pulled_event
|
# mock up a regular event to pass into _process_pulled_event
|
||||||
pulled_event = make_event_from_dict(
|
pulled_event = make_event_from_dict(
|
||||||
self.add_hashes_and_signatures(
|
self.add_hashes_and_signatures_from_other_server(
|
||||||
{
|
{
|
||||||
"type": "test_regular_type",
|
"type": "test_regular_type",
|
||||||
"room_id": room_id,
|
"room_id": room_id,
|
||||||
|
|||||||
290
tests/handlers/test_room_member.py
Normal file
290
tests/handlers/test_room_member.py
Normal file
@@ -0,0 +1,290 @@
|
|||||||
|
from http import HTTPStatus
|
||||||
|
from unittest.mock import Mock, patch
|
||||||
|
|
||||||
|
from twisted.test.proto_helpers import MemoryReactor
|
||||||
|
|
||||||
|
import synapse.rest.admin
|
||||||
|
import synapse.rest.client.login
|
||||||
|
import synapse.rest.client.room
|
||||||
|
from synapse.api.constants import EventTypes, Membership
|
||||||
|
from synapse.api.errors import LimitExceededError
|
||||||
|
from synapse.crypto.event_signing import add_hashes_and_signatures
|
||||||
|
from synapse.events import FrozenEventV3
|
||||||
|
from synapse.federation.federation_client import SendJoinResult
|
||||||
|
from synapse.server import HomeServer
|
||||||
|
from synapse.types import UserID, create_requester
|
||||||
|
from synapse.util import Clock
|
||||||
|
|
||||||
|
from tests.replication._base import RedisMultiWorkerStreamTestCase
|
||||||
|
from tests.server import make_request
|
||||||
|
from tests.test_utils import make_awaitable
|
||||||
|
from tests.unittest import FederatingHomeserverTestCase, override_config
|
||||||
|
|
||||||
|
|
||||||
|
class TestJoinsLimitedByPerRoomRateLimiter(FederatingHomeserverTestCase):
|
||||||
|
servlets = [
|
||||||
|
synapse.rest.admin.register_servlets,
|
||||||
|
synapse.rest.client.login.register_servlets,
|
||||||
|
synapse.rest.client.room.register_servlets,
|
||||||
|
]
|
||||||
|
|
||||||
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||||
|
self.handler = hs.get_room_member_handler()
|
||||||
|
|
||||||
|
# Create three users.
|
||||||
|
self.alice = self.register_user("alice", "pass")
|
||||||
|
self.alice_token = self.login("alice", "pass")
|
||||||
|
self.bob = self.register_user("bob", "pass")
|
||||||
|
self.bob_token = self.login("bob", "pass")
|
||||||
|
self.chris = self.register_user("chris", "pass")
|
||||||
|
self.chris_token = self.login("chris", "pass")
|
||||||
|
|
||||||
|
# Create a room on this homeserver. Note that this counts as a join: it
|
||||||
|
# contributes to the rate limter's count of actions
|
||||||
|
self.room_id = self.helper.create_room_as(self.alice, tok=self.alice_token)
|
||||||
|
|
||||||
|
self.intially_unjoined_room_id = f"!example:{self.OTHER_SERVER_NAME}"
|
||||||
|
|
||||||
|
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 2}})
|
||||||
|
def test_local_user_local_joins_contribute_to_limit_and_are_limited(self) -> None:
|
||||||
|
# The rate limiter has accumulated one token from Alice's join after the create
|
||||||
|
# event.
|
||||||
|
# Try joining the room as Bob.
|
||||||
|
self.get_success(
|
||||||
|
self.handler.update_membership(
|
||||||
|
requester=create_requester(self.bob),
|
||||||
|
target=UserID.from_string(self.bob),
|
||||||
|
room_id=self.room_id,
|
||||||
|
action=Membership.JOIN,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# The rate limiter bucket is full. A second join should be denied.
|
||||||
|
self.get_failure(
|
||||||
|
self.handler.update_membership(
|
||||||
|
requester=create_requester(self.chris),
|
||||||
|
target=UserID.from_string(self.chris),
|
||||||
|
room_id=self.room_id,
|
||||||
|
action=Membership.JOIN,
|
||||||
|
),
|
||||||
|
LimitExceededError,
|
||||||
|
)
|
||||||
|
|
||||||
|
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 2}})
|
||||||
|
def test_local_user_profile_edits_dont_contribute_to_limit(self) -> None:
|
||||||
|
# The rate limiter has accumulated one token from Alice's join after the create
|
||||||
|
# event. Alice should still be able to change her displayname.
|
||||||
|
self.get_success(
|
||||||
|
self.handler.update_membership(
|
||||||
|
requester=create_requester(self.alice),
|
||||||
|
target=UserID.from_string(self.alice),
|
||||||
|
room_id=self.room_id,
|
||||||
|
action=Membership.JOIN,
|
||||||
|
content={"displayname": "Alice Cooper"},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Still room in the limiter bucket. Chris's join should be accepted.
|
||||||
|
self.get_success(
|
||||||
|
self.handler.update_membership(
|
||||||
|
requester=create_requester(self.chris),
|
||||||
|
target=UserID.from_string(self.chris),
|
||||||
|
room_id=self.room_id,
|
||||||
|
action=Membership.JOIN,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 1}})
|
||||||
|
def test_remote_joins_contribute_to_rate_limit(self) -> None:
|
||||||
|
# Join once, to fill the rate limiter bucket.
|
||||||
|
#
|
||||||
|
# To do this we have to mock the responses from the remote homeserver.
|
||||||
|
# We also patch out a bunch of event checks on our end. All we're really
|
||||||
|
# trying to check here is that remote joins will bump the rate limter when
|
||||||
|
# they are persisted.
|
||||||
|
create_event_source = {
|
||||||
|
"auth_events": [],
|
||||||
|
"content": {
|
||||||
|
"creator": f"@creator:{self.OTHER_SERVER_NAME}",
|
||||||
|
"room_version": self.hs.config.server.default_room_version.identifier,
|
||||||
|
},
|
||||||
|
"depth": 0,
|
||||||
|
"origin_server_ts": 0,
|
||||||
|
"prev_events": [],
|
||||||
|
"room_id": self.intially_unjoined_room_id,
|
||||||
|
"sender": f"@creator:{self.OTHER_SERVER_NAME}",
|
||||||
|
"state_key": "",
|
||||||
|
"type": EventTypes.Create,
|
||||||
|
}
|
||||||
|
self.add_hashes_and_signatures_from_other_server(
|
||||||
|
create_event_source,
|
||||||
|
self.hs.config.server.default_room_version,
|
||||||
|
)
|
||||||
|
create_event = FrozenEventV3(
|
||||||
|
create_event_source,
|
||||||
|
self.hs.config.server.default_room_version,
|
||||||
|
{},
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
|
||||||
|
join_event_source = {
|
||||||
|
"auth_events": [create_event.event_id],
|
||||||
|
"content": {"membership": "join"},
|
||||||
|
"depth": 1,
|
||||||
|
"origin_server_ts": 100,
|
||||||
|
"prev_events": [create_event.event_id],
|
||||||
|
"sender": self.bob,
|
||||||
|
"state_key": self.bob,
|
||||||
|
"room_id": self.intially_unjoined_room_id,
|
||||||
|
"type": EventTypes.Member,
|
||||||
|
}
|
||||||
|
add_hashes_and_signatures(
|
||||||
|
self.hs.config.server.default_room_version,
|
||||||
|
join_event_source,
|
||||||
|
self.hs.hostname,
|
||||||
|
self.hs.signing_key,
|
||||||
|
)
|
||||||
|
join_event = FrozenEventV3(
|
||||||
|
join_event_source,
|
||||||
|
self.hs.config.server.default_room_version,
|
||||||
|
{},
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_make_membership_event = Mock(
|
||||||
|
return_value=make_awaitable(
|
||||||
|
(
|
||||||
|
self.OTHER_SERVER_NAME,
|
||||||
|
join_event,
|
||||||
|
self.hs.config.server.default_room_version,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
mock_send_join = Mock(
|
||||||
|
return_value=make_awaitable(
|
||||||
|
SendJoinResult(
|
||||||
|
join_event,
|
||||||
|
self.OTHER_SERVER_NAME,
|
||||||
|
state=[create_event],
|
||||||
|
auth_chain=[create_event],
|
||||||
|
partial_state=False,
|
||||||
|
servers_in_room=[],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch.object(
|
||||||
|
self.handler.federation_handler.federation_client,
|
||||||
|
"make_membership_event",
|
||||||
|
mock_make_membership_event,
|
||||||
|
), patch.object(
|
||||||
|
self.handler.federation_handler.federation_client,
|
||||||
|
"send_join",
|
||||||
|
mock_send_join,
|
||||||
|
), patch(
|
||||||
|
"synapse.event_auth._is_membership_change_allowed",
|
||||||
|
return_value=None,
|
||||||
|
), patch(
|
||||||
|
"synapse.handlers.federation_event.check_state_dependent_auth_rules",
|
||||||
|
return_value=None,
|
||||||
|
):
|
||||||
|
self.get_success(
|
||||||
|
self.handler.update_membership(
|
||||||
|
requester=create_requester(self.bob),
|
||||||
|
target=UserID.from_string(self.bob),
|
||||||
|
room_id=self.intially_unjoined_room_id,
|
||||||
|
action=Membership.JOIN,
|
||||||
|
remote_room_hosts=[self.OTHER_SERVER_NAME],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Try to join as Chris. Should get denied.
|
||||||
|
self.get_failure(
|
||||||
|
self.handler.update_membership(
|
||||||
|
requester=create_requester(self.chris),
|
||||||
|
target=UserID.from_string(self.chris),
|
||||||
|
room_id=self.intially_unjoined_room_id,
|
||||||
|
action=Membership.JOIN,
|
||||||
|
remote_room_hosts=[self.OTHER_SERVER_NAME],
|
||||||
|
),
|
||||||
|
LimitExceededError,
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: test that remote joins to a room are rate limited.
|
||||||
|
# Could do this by setting the burst count to 1, then:
|
||||||
|
# - remote-joining a room
|
||||||
|
# - immediately leaving
|
||||||
|
# - trying to remote-join again.
|
||||||
|
|
||||||
|
|
||||||
|
class TestReplicatedJoinsLimitedByPerRoomRateLimiter(RedisMultiWorkerStreamTestCase):
|
||||||
|
servlets = [
|
||||||
|
synapse.rest.admin.register_servlets,
|
||||||
|
synapse.rest.client.login.register_servlets,
|
||||||
|
synapse.rest.client.room.register_servlets,
|
||||||
|
]
|
||||||
|
|
||||||
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||||
|
self.handler = hs.get_room_member_handler()
|
||||||
|
|
||||||
|
# Create three users.
|
||||||
|
self.alice = self.register_user("alice", "pass")
|
||||||
|
self.alice_token = self.login("alice", "pass")
|
||||||
|
self.bob = self.register_user("bob", "pass")
|
||||||
|
self.bob_token = self.login("bob", "pass")
|
||||||
|
self.chris = self.register_user("chris", "pass")
|
||||||
|
self.chris_token = self.login("chris", "pass")
|
||||||
|
|
||||||
|
# Create a room on this homeserver.
|
||||||
|
# Note that this counts as a
|
||||||
|
self.room_id = self.helper.create_room_as(self.alice, tok=self.alice_token)
|
||||||
|
self.intially_unjoined_room_id = "!example:otherhs"
|
||||||
|
|
||||||
|
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 2}})
|
||||||
|
def test_local_users_joining_on_another_worker_contribute_to_rate_limit(
|
||||||
|
self,
|
||||||
|
) -> None:
|
||||||
|
# The rate limiter has accumulated one token from Alice's join after the create
|
||||||
|
# event.
|
||||||
|
self.replicate()
|
||||||
|
|
||||||
|
# Spawn another worker and have bob join via it.
|
||||||
|
worker_app = self.make_worker_hs(
|
||||||
|
"synapse.app.generic_worker", extra_config={"worker_name": "other worker"}
|
||||||
|
)
|
||||||
|
worker_site = self._hs_to_site[worker_app]
|
||||||
|
channel = make_request(
|
||||||
|
self.reactor,
|
||||||
|
worker_site,
|
||||||
|
"POST",
|
||||||
|
f"/_matrix/client/v3/rooms/{self.room_id}/join",
|
||||||
|
access_token=self.bob_token,
|
||||||
|
)
|
||||||
|
self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
|
||||||
|
|
||||||
|
# wait for join to arrive over replication
|
||||||
|
self.replicate()
|
||||||
|
|
||||||
|
# Try to join as Chris on the worker. Should get denied because Alice
|
||||||
|
# and Bob have both joined the room.
|
||||||
|
self.get_failure(
|
||||||
|
worker_app.get_room_member_handler().update_membership(
|
||||||
|
requester=create_requester(self.chris),
|
||||||
|
target=UserID.from_string(self.chris),
|
||||||
|
room_id=self.room_id,
|
||||||
|
action=Membership.JOIN,
|
||||||
|
),
|
||||||
|
LimitExceededError,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Try to join as Chris on the original worker. Should get denied because Alice
|
||||||
|
# and Bob have both joined the room.
|
||||||
|
self.get_failure(
|
||||||
|
self.handler.update_membership(
|
||||||
|
requester=create_requester(self.chris),
|
||||||
|
target=UserID.from_string(self.chris),
|
||||||
|
room_id=self.room_id,
|
||||||
|
action=Membership.JOIN,
|
||||||
|
),
|
||||||
|
LimitExceededError,
|
||||||
|
)
|
||||||
@@ -225,7 +225,7 @@ class OptionsResourceTests(unittest.TestCase):
|
|||||||
parse_listener_def({"type": "http", "port": 0}),
|
parse_listener_def({"type": "http", "port": 0}),
|
||||||
self.resource,
|
self.resource,
|
||||||
"1.0",
|
"1.0",
|
||||||
max_request_body_size=1234,
|
max_request_body_size=4096,
|
||||||
reactor=self.reactor,
|
reactor=self.reactor,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -272,7 +272,7 @@ class FilterEventsForClientTestCase(unittest.FederatingHomeserverTestCase):
|
|||||||
"state_key": "@user:test",
|
"state_key": "@user:test",
|
||||||
"content": {"membership": "invite"},
|
"content": {"membership": "invite"},
|
||||||
}
|
}
|
||||||
self.add_hashes_and_signatures(invite_pdu)
|
self.add_hashes_and_signatures_from_other_server(invite_pdu)
|
||||||
invite_event_id = make_event_from_dict(invite_pdu, RoomVersions.V9).event_id
|
invite_event_id = make_event_from_dict(invite_pdu, RoomVersions.V9).event_id
|
||||||
|
|
||||||
self.get_success(
|
self.get_success(
|
||||||
|
|||||||
@@ -285,7 +285,7 @@ class HomeserverTestCase(TestCase):
|
|||||||
config=self.hs.config.server.listeners[0],
|
config=self.hs.config.server.listeners[0],
|
||||||
resource=self.resource,
|
resource=self.resource,
|
||||||
server_version_string="1",
|
server_version_string="1",
|
||||||
max_request_body_size=1234,
|
max_request_body_size=4096,
|
||||||
reactor=self.reactor,
|
reactor=self.reactor,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -780,7 +780,7 @@ class FederatingHomeserverTestCase(HomeserverTestCase):
|
|||||||
verify_key_id,
|
verify_key_id,
|
||||||
FetchKeyResult(
|
FetchKeyResult(
|
||||||
verify_key=verify_key,
|
verify_key=verify_key,
|
||||||
valid_until_ts=clock.time_msec() + 1000,
|
valid_until_ts=clock.time_msec() + 10000,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
],
|
],
|
||||||
@@ -838,7 +838,7 @@ class FederatingHomeserverTestCase(HomeserverTestCase):
|
|||||||
client_ip=client_ip,
|
client_ip=client_ip,
|
||||||
)
|
)
|
||||||
|
|
||||||
def add_hashes_and_signatures(
|
def add_hashes_and_signatures_from_other_server(
|
||||||
self,
|
self,
|
||||||
event_dict: JsonDict,
|
event_dict: JsonDict,
|
||||||
room_version: RoomVersion = KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION],
|
room_version: RoomVersion = KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION],
|
||||||
|
|||||||
@@ -151,6 +151,7 @@ def default_config(name, parse=False):
|
|||||||
"local": {"per_second": 10000, "burst_count": 10000},
|
"local": {"per_second": 10000, "burst_count": 10000},
|
||||||
"remote": {"per_second": 10000, "burst_count": 10000},
|
"remote": {"per_second": 10000, "burst_count": 10000},
|
||||||
},
|
},
|
||||||
|
"rc_joins_per_room": {"per_second": 10000, "burst_count": 10000},
|
||||||
"rc_invites": {
|
"rc_invites": {
|
||||||
"per_room": {"per_second": 10000, "burst_count": 10000},
|
"per_room": {"per_second": 10000, "burst_count": 10000},
|
||||||
"per_user": {"per_second": 10000, "burst_count": 10000},
|
"per_user": {"per_second": 10000, "burst_count": 10000},
|
||||||
|
|||||||
Reference in New Issue
Block a user