Compare commits

...

23 Commits

Author SHA1 Message Date
David Robertson
80e2e0b4f2 Don't use newsfiles for config changes
per discussion today
2022-07-11 21:12:28 +01:00
David Robertson
5cf6700c2e Monstrous test update of doom 2022-07-11 21:09:52 +01:00
David Robertson
fda6252233 Also notify of join events via another path 2022-07-11 21:09:20 +01:00
David Robertson
47acf465ef Don't require ratelimit to notify for joins 2022-07-11 21:09:20 +01:00
David Robertson
8377172c4c Use add_hashes_and_signatures_from_other_server
- rename the method to distinguish it from `add_hashes_and_signatures`
- use it in a few other places where it makes sense
2022-07-11 21:09:19 +01:00
David Robertson
81eb4ab86a Update rate limiter in the event persister logic
Simpler, cleaner, faster, stronger.
2022-07-08 18:55:21 +01:00
David Robertson
e16294ef3b Be more explicit about the default values 2022-07-08 17:31:59 +01:00
David Robertson
9d9253109b Announce new config option in the changelog 2022-07-08 17:28:51 +01:00
David Robertson
4da8f29ff6 Tweak phrasing; note the version adding the option 2022-07-08 17:24:18 +01:00
David Robertson
dcb16831e8 Move comment translating between bucket terminology 2022-07-08 17:23:41 +01:00
David Robertson
121590a0c9 Changelog 2022-07-04 19:10:15 +01:00
David Robertson
240e32f264 Fixes to unit tests
- FederatingHomeserverTestCase: keys last 10x longer

  I am guessing that this is the first example of a test which calls
  `make_signed_federation_request` after the reactor has advanced by >=
  1 second until now.

- Increase max_request_body_size to 4KB in tests

  The previous limit prevented the master from accepting some
  replication requests which were 1.3kB in size.
2022-07-04 19:10:15 +01:00
David Robertson
7a14b94698 Test cases 2022-07-04 19:10:14 +01:00
David Robertson
0bb4122726 Snoop on replication to learn about joins on other workers 2022-07-04 19:10:14 +01:00
David Robertson
6b47e82ca2 Track per-room join rates actioned by this worker
and consult it when actioning joins.
Only bump rate limit if we will persist the event; otherwise we'll see
it over replication
2022-07-04 19:10:14 +01:00
David Robertson
4230112526 Add helper to determine if we persist event or not 2022-07-04 19:10:14 +01:00
David Robertson
ae788ca796 Replication: include outlier in event rows
Warn about replication problem in upgrade notes
2022-07-04 19:10:14 +01:00
David Robertson
77de15927a Room member: drive-by-comment 2022-07-04 19:10:14 +01:00
David Robertson
9d4cdae33a Notifier: accept callbacks to fire on room joins 2022-07-04 19:10:14 +01:00
David Robertson
c594ab774b Rate limiter: Introduce record_action 2022-07-04 19:10:14 +01:00
David Robertson
c2e3025b33 Rate limiter: Pull out some small methods 2022-07-04 19:10:14 +01:00
David Robertson
453f621d23 Rate limiter: describe leaky bucket 2022-07-04 19:10:14 +01:00
David Robertson
bd873e6571 Define config for room-level join limiter
but don't use it in tests
2022-07-04 19:10:13 +01:00
24 changed files with 692 additions and 43 deletions

View File

@@ -0,0 +1 @@
Add per-room rate limiting for room joins. For each room, Synapse now monitors the rate of join events in that room and throttles additional joins if that rate grows too large.

View File

@@ -67,6 +67,10 @@ rc_joins:
per_second: 9999
burst_count: 9999
rc_joins_per_room:
per_second: 9999
burst_count: 9999
rc_3pid_validation:
per_second: 1000
burst_count: 1000

View File

@@ -89,6 +89,18 @@ process, for example:
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
```
# Upgrading to v1.63.0
## Changes to the event replication streams
Synapse now includes a flag indicating if an event is an outlier when
replicating it to other workers. This is a forwards- and backwards-incompatible
change: v1.62 and workers cannot process events replicated by v1.63 workers, and
vice versa.
Once all workers are upgraded to v1.63 (or downgraded to v1.62), event
replication will resume as normal.
# Upgrading to v1.62.0
## New signatures for spam checker callbacks

View File

@@ -1379,6 +1379,25 @@ rc_joins:
per_second: 0.03
burst_count: 12
```
---
### `rc_joins_per_room`
This option allows admins to ratelimit joins to a room based on the number of recent
joins (local or remote) to that room. It is intended to mitigate mass-join spam
waves which target multiple homeservers.
By default, one join is permitted to a room every second, with an accumulating
buffer of up to ten instantaneous joins.
Example configuration (default values):
```yaml
rc_joins_per_room:
per_second: 1
burst_count: 10
```
_Added in Synapse 1.63.0._
---
### `rc_3pid_validation`

View File

@@ -27,6 +27,33 @@ class Ratelimiter:
"""
Ratelimit actions marked by arbitrary keys.
(Note that the source code speaks of "actions" and "burst_count" rather than
"tokens" and a "bucket_size".)
This is a "leaky bucket as a meter". For each key to be tracked there is a bucket
containing some number 0 <= T <= `burst_count` of tokens corresponding to previously
permitted requests for that key. Each bucket starts empty, and gradually leaks
tokens at a rate of `rate_hz`.
Upon an incoming request, we must determine:
- the key that this request falls under (which bucket to inspect), and
- the cost C of this request in tokens.
Then, if there is room in the bucket for C tokens (T + C <= `burst_count`),
the request is permitted and `cost` tokens are added to the bucket.
Otherwise the request is denied, and the bucket continues to hold T tokens.
This means that the limiter enforces an average request frequency of `rate_hz`,
while accumulating a buffer of up to `burst_count` requests which can be consumed
instantaneously.
The tricky bit is the leaking. We do not want to have a periodic process which
leaks every bucket! Instead, we track
- the time point when the bucket was last completely empty, and
- how many tokens have added to the bucket permitted since then.
Then for each incoming request, we can calculate how many tokens have leaked
since this time point, and use that to decide if we should accept or reject the
request.
Args:
clock: A homeserver clock, for retrieving the current time
rate_hz: The long term number of actions that can be performed in a second.
@@ -41,14 +68,36 @@ class Ratelimiter:
self.burst_count = burst_count
self.store = store
# A ordered dictionary keeping track of actions, when they were last
# performed and how often. Each entry is a mapping from a key of arbitrary type
# to a tuple representing:
# * How many times an action has occurred since a point in time
# * The point in time
# * The rate_hz of this particular entry. This can vary per request
# An ordered dictionary representing the token buckets tracked by this rate
# limiter. Each entry maps a key of arbitrary type to a tuple representing:
# * The number of tokens currently in the bucket,
# * The time point when the bucket was last completely empty, and
# * The rate_hz (leak rate) of this particular bucket.
self.actions: OrderedDict[Hashable, Tuple[float, float, float]] = OrderedDict()
def _get_key(
self, requester: Optional[Requester], key: Optional[Hashable]
) -> Hashable:
"""Use the requester's MXID as a fallback key if no key is provided.
Pulled out so that `can_do_action` and `record_action` are consistent.
"""
if key is None:
if not requester:
raise ValueError("Must supply at least one of `requester` or `key`")
key = requester.user.to_string()
return key
def _get_action_counts(
self, key: Hashable, time_now_s: float
) -> Tuple[float, float, float]:
"""Retrieve the action counts, with a fallback representing an empty bucket.
Pulled out so that `can_do_action` and `record_action` are consistent.
"""
return self.actions.get(key, (0.0, time_now_s, 0.0))
async def can_do_action(
self,
requester: Optional[Requester],
@@ -88,11 +137,7 @@ class Ratelimiter:
* The reactor timestamp for when the action can be performed next.
-1 if rate_hz is less than or equal to zero
"""
if key is None:
if not requester:
raise ValueError("Must supply at least one of `requester` or `key`")
key = requester.user.to_string()
key = self._get_key(requester, key)
if requester:
# Disable rate limiting of users belonging to any AS that is configured
@@ -121,7 +166,7 @@ class Ratelimiter:
self._prune_message_counts(time_now_s)
# Check if there is an existing count entry for this key
action_count, time_start, _ = self.actions.get(key, (0.0, time_now_s, 0.0))
action_count, time_start, _ = self._get_action_counts(key, time_now_s)
# Check whether performing another action is allowed
time_delta = time_now_s - time_start
@@ -164,6 +209,37 @@ class Ratelimiter:
return allowed, time_allowed
def record_action(
self,
requester: Optional[Requester],
key: Optional[Hashable] = None,
n_actions: int = 1,
_time_now_s: Optional[float] = None,
) -> None:
"""Record that an action(s) took place, even if they violate the rate limit.
This is useful for tracking the frequency of events that happen across
federation which we still want to impose local rate limits on. For instance, if
we are alice.com monitoring a particular room, we cannot prevent bob.com
from joining users to that room. However, we can track the number of recent
joins in the room and refuse to serve new joins ourselves if there have been too
many in the room across both homeservers.
Args:
requester: The requester that is doing the action, if any.
key: An arbitrary key used to classify an action. Defaults to the
requester's user ID.
n_actions: The number of times the user wants to do this action. If the user
cannot do all of the actions, the user's action count is not incremented
at all.
_time_now_s: The current time. Optional, defaults to the current time according
to self.clock. Only used by tests.
"""
key = self._get_key(requester, key)
time_now_s = _time_now_s if _time_now_s is not None else self.clock.time()
action_count, time_start, rate_hz = self._get_action_counts(key, time_now_s)
self.actions[key] = (action_count + n_actions, time_start, rate_hz)
def _prune_message_counts(self, time_now_s: float) -> None:
"""Remove message count entries that have not exceeded their defined
rate_hz limit

View File

@@ -112,6 +112,13 @@ class RatelimitConfig(Config):
defaults={"per_second": 0.01, "burst_count": 10},
)
# Track the rate of joins to a given room. If there are too many, temporarily
# prevent local joins and remote joins via this server.
self.rc_joins_per_room = RateLimitConfig(
config.get("rc_joins_per_room", {}),
defaults={"per_second": 1, "burst_count": 10},
)
# Ratelimit cross-user key requests:
# * For local requests this is keyed by the sending device.
# * For requests received over federation this is keyed by the origin.

View File

@@ -117,6 +117,7 @@ class FederationServer(FederationBase):
self._federation_event_handler = hs.get_federation_event_handler()
self.state = hs.get_state_handler()
self._event_auth_handler = hs.get_event_auth_handler()
self._room_member_handler = hs.get_room_member_handler()
self._state_storage_controller = hs.get_storage_controllers().state
@@ -620,6 +621,15 @@ class FederationServer(FederationBase):
)
raise IncompatibleRoomVersionError(room_version=room_version)
# Refuse the request if that room has seen too many joins recently.
# This is in addition to the HS-level rate limiting applied by
# BaseFederationServlet.
# type-ignore: mypy doesn't seem able to deduce the type of the limiter(!?)
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
requester=None,
key=room_id,
update=False,
)
pdu = await self.handler.on_make_join_request(origin, room_id, user_id)
return {"event": pdu.get_templated_pdu_json(), "room_version": room_version}
@@ -654,6 +664,12 @@ class FederationServer(FederationBase):
room_id: str,
caller_supports_partial_state: bool = False,
) -> Dict[str, Any]:
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
requester=None,
key=room_id,
update=False,
)
event, context = await self._on_send_membership_event(
origin, content, Membership.JOIN, room_id
)

View File

@@ -2063,6 +2063,10 @@ class FederationEventHandler:
event, event_pos, max_stream_token, extra_users=extra_users
)
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
# TODO retrieve the previous state, and exclude join -> join transitions
self._notifier.notify_user_joined_room(event.event_id, event.room_id)
def _sanity_check_event(self, ev: EventBase) -> None:
"""
Do some early sanity checks of a received event

View File

@@ -461,6 +461,7 @@ class EventCreationHandler:
)
self._events_shard_config = self.config.worker.events_shard_config
self._instance_name = hs.get_instance_name()
self._notifier = hs.get_notifier()
self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state
@@ -1515,6 +1516,16 @@ class EventCreationHandler:
requester, is_admin_redaction=is_admin_redaction
)
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
(
current_membership,
_,
) = await self.store.get_local_current_membership_for_user_in_room(
event.state_key, event.room_id
)
if current_membership != Membership.JOIN:
self._notifier.notify_user_joined_room(event.event_id, event.room_id)
await self._maybe_kick_guest_users(event, context)
if event.type == EventTypes.CanonicalAlias:

View File

@@ -94,12 +94,29 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
burst_count=hs.config.ratelimiting.rc_joins_local.burst_count,
)
# Tracks joins from local users to rooms this server isn't a member of.
# I.e. joins this server makes by requesting /make_join /send_join from
# another server.
self._join_rate_limiter_remote = Ratelimiter(
store=self.store,
clock=self.clock,
rate_hz=hs.config.ratelimiting.rc_joins_remote.per_second,
burst_count=hs.config.ratelimiting.rc_joins_remote.burst_count,
)
# TODO: find a better place to keep this Ratelimiter.
# It needs to be
# - written to by event persistence code
# - written to by something which can snoop on replication streams
# - read by the RoomMemberHandler to rate limit joins from local users
# - read by the FederationServer to rate limit make_joins and send_joins from
# other homeservers
# I wonder if a homeserver-wide collection of rate limiters might be cleaner?
self._join_rate_per_room_limiter = Ratelimiter(
store=self.store,
clock=self.clock,
rate_hz=hs.config.ratelimiting.rc_joins_per_room.per_second,
burst_count=hs.config.ratelimiting.rc_joins_per_room.burst_count,
)
self._invites_per_room_limiter = Ratelimiter(
store=self.store,
@@ -122,6 +139,18 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
)
self.request_ratelimiter = hs.get_request_ratelimiter()
hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room)
def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
"""Notify the rate limiter that a room join has occurred.
Use this to inform the RoomMemberHandler about joins that have either
- taken place on another homeserver, or
- on another worker in this homeserver.
Joins actioned by this worker should use the usual `ratelimit` method, which
checks the limit and increments the counter in one go.
"""
self._join_rate_per_room_limiter.record_action(requester=None, key=room_id)
@abc.abstractmethod
async def _remote_join(
@@ -375,6 +404,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# up blocking profile updates.
if newly_joined and ratelimit:
await self._join_rate_limiter_local.ratelimit(requester)
await self._join_rate_per_room_limiter.ratelimit(
requester, key=room_id, update=False
)
result_event = await self.event_creation_handler.handle_new_client_event(
requester,
@@ -823,6 +855,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
await self._join_rate_limiter_remote.ratelimit(
requester,
)
await self._join_rate_per_room_limiter.ratelimit(
requester,
key=room_id,
update=False,
)
inviter = await self._get_inviter(target.to_string(), room_id)
if inviter and not self.hs.is_mine(inviter):

View File

@@ -228,6 +228,7 @@ class Notifier:
# Called when there are new things to stream over replication
self.replication_callbacks: List[Callable[[], None]] = []
self._new_join_in_room_callbacks: List[Callable[[str, str], None]] = []
self._federation_client = hs.get_federation_http_client()
@@ -280,6 +281,19 @@ class Notifier:
"""
self.replication_callbacks.append(cb)
def add_new_join_in_room_callback(self, cb: Callable[[str, str], None]) -> None:
"""Add a callback that will be called when a user joins a room.
This only fires on genuine membership changes, e.g. "invite" -> "join".
Membership transitions like "join" -> "join" (for e.g. displayname changes) do
not trigger the callback.
When called, the callback receives two arguments: the event ID and the room ID.
It should *not* return a Deferred - if it needs to do any asynchronous work, a
background thread should be started and wrapped with run_as_background_process.
"""
self._new_join_in_room_callbacks.append(cb)
async def on_new_room_event(
self,
event: EventBase,
@@ -723,6 +737,10 @@ class Notifier:
for cb in self.replication_callbacks:
cb()
def notify_user_joined_room(self, event_id: str, room_id: str) -> None:
for cb in self._new_join_in_room_callbacks:
cb(event_id, room_id)
def notify_remote_server_up(self, server: str) -> None:
"""Notify any replication that a remote server has come back up"""
# We call federation_sender directly rather than registering as a

View File

@@ -21,7 +21,7 @@ from twisted.internet.interfaces import IAddress, IConnector
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, ReceiptTypes
from synapse.api.constants import EventTypes, Membership, ReceiptTypes
from synapse.federation import send_queue
from synapse.federation.sender import FederationSender
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
@@ -219,6 +219,21 @@ class ReplicationDataHandler:
membership=row.data.membership,
)
# If this event is a join, make a note of it so we have an accurate
# cross-worker room rate limit.
# TODO: Erik said we should exclude rows that came from ex_outliers
# here, but I don't see how we can determine that. I guess we could
# add a flag to row.data?
if (
row.data.type == EventTypes.Member
and row.data.membership == Membership.JOIN
and not row.data.outlier
):
# TODO retrieve the previous state, and exclude join -> join transitions
self.notifier.notify_user_joined_room(
row.data.event_id, row.data.room_id
)
await self._presence_handler.process_replication_rows(
stream_name, instance_name, token, rows
)

View File

@@ -98,6 +98,7 @@ class EventsStreamEventRow(BaseEventsStreamRow):
relates_to: Optional[str]
membership: Optional[str]
rejected: bool
outlier: bool
@attr.s(slots=True, frozen=True, auto_attribs=True)

View File

@@ -1465,7 +1465,7 @@ class EventsWorkerStore(SQLBaseStore):
async def get_all_new_forward_event_rows(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
"""Returns new events, for the Events replication stream
Args:
@@ -1481,10 +1481,11 @@ class EventsWorkerStore(SQLBaseStore):
def get_all_new_forward_event_rows(
txn: LoggingTransaction,
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
sql = (
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL,"
" e.outlier"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
@@ -1498,7 +1499,8 @@ class EventsWorkerStore(SQLBaseStore):
)
txn.execute(sql, (last_id, current_id, instance_name, limit))
return cast(
List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall()
List[Tuple[int, str, str, str, str, str, str, str, bool, bool]],
txn.fetchall(),
)
return await self.db_pool.runInteraction(
@@ -1507,7 +1509,7 @@ class EventsWorkerStore(SQLBaseStore):
async def get_ex_outlier_stream_rows(
self, instance_name: str, last_id: int, current_id: int
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
"""Returns de-outliered events, for the Events replication stream
Args:
@@ -1522,11 +1524,14 @@ class EventsWorkerStore(SQLBaseStore):
def get_ex_outlier_stream_rows_txn(
txn: LoggingTransaction,
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
sql = (
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL,"
" e.outlier"
" FROM events AS e"
# NB: the next line (inner join) is what makes this query different from
# get_all_new_forward_event_rows.
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
@@ -1541,7 +1546,8 @@ class EventsWorkerStore(SQLBaseStore):
txn.execute(sql, (last_id, current_id, instance_name))
return cast(
List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall()
List[Tuple[int, str, str, str, str, str, str, str, bool, bool]],
txn.fetchall(),
)
return await self.db_pool.runInteraction(

View File

@@ -314,3 +314,77 @@ class TestRatelimiter(unittest.HomeserverTestCase):
# Check that we get rate limited after using that token.
self.assertFalse(consume_at(11.1))
def test_record_action_which_doesnt_fill_bucket(self) -> None:
limiter = Ratelimiter(
store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=3
)
# Observe two actions, leaving room in the bucket for one more.
limiter.record_action(requester=None, key="a", n_actions=2, _time_now_s=0.0)
# We should be able to take a new action now.
success, _ = self.get_success_or_raise(
limiter.can_do_action(requester=None, key="a", _time_now_s=0.0)
)
self.assertTrue(success)
# ... but not two.
success, _ = self.get_success_or_raise(
limiter.can_do_action(requester=None, key="a", _time_now_s=0.0)
)
self.assertFalse(success)
def test_record_action_which_fills_bucket(self) -> None:
limiter = Ratelimiter(
store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=3
)
# Observe three actions, filling up the bucket.
limiter.record_action(requester=None, key="a", n_actions=3, _time_now_s=0.0)
# We should be unable to take a new action now.
success, _ = self.get_success_or_raise(
limiter.can_do_action(requester=None, key="a", _time_now_s=0.0)
)
self.assertFalse(success)
# If we wait 10 seconds to leak a token, we should be able to take one action...
success, _ = self.get_success_or_raise(
limiter.can_do_action(requester=None, key="a", _time_now_s=10.0)
)
self.assertTrue(success)
# ... but not two.
success, _ = self.get_success_or_raise(
limiter.can_do_action(requester=None, key="a", _time_now_s=10.0)
)
self.assertFalse(success)
def test_record_action_which_overfills_bucket(self) -> None:
limiter = Ratelimiter(
store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=3
)
# Observe four actions, exceeding the bucket.
limiter.record_action(requester=None, key="a", n_actions=4, _time_now_s=0.0)
# We should be prevented from taking a new action now.
success, _ = self.get_success_or_raise(
limiter.can_do_action(requester=None, key="a", _time_now_s=0.0)
)
self.assertFalse(success)
# If we wait 10 seconds to leak a token, we should be unable to take an action
# because the bucket is still full.
success, _ = self.get_success_or_raise(
limiter.can_do_action(requester=None, key="a", _time_now_s=10.0)
)
self.assertFalse(success)
# But after another 10 seconds we leak a second token, giving us room for
# action.
success, _ = self.get_success_or_raise(
limiter.can_do_action(requester=None, key="a", _time_now_s=20.0)
)
self.assertTrue(success)

View File

@@ -45,7 +45,7 @@ class FederationClientTest(FederatingHomeserverTestCase):
# mock up some events to use in the response.
# In real life, these would have things in `prev_events` and `auth_events`, but that's
# a bit annoying to mock up, and the code under test doesn't care, so we don't bother.
create_event_dict = self.add_hashes_and_signatures(
create_event_dict = self.add_hashes_and_signatures_from_other_server(
{
"room_id": test_room_id,
"type": "m.room.create",
@@ -57,7 +57,7 @@ class FederationClientTest(FederatingHomeserverTestCase):
"origin_server_ts": 500,
}
)
member_event_dict = self.add_hashes_and_signatures(
member_event_dict = self.add_hashes_and_signatures_from_other_server(
{
"room_id": test_room_id,
"type": "m.room.member",
@@ -69,7 +69,7 @@ class FederationClientTest(FederatingHomeserverTestCase):
"origin_server_ts": 600,
}
)
pl_event_dict = self.add_hashes_and_signatures(
pl_event_dict = self.add_hashes_and_signatures_from_other_server(
{
"room_id": test_room_id,
"type": "m.room.power_levels",

View File

@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from http import HTTPStatus
from parameterized import parameterized
@@ -20,7 +21,6 @@ from twisted.test.proto_helpers import MemoryReactor
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.config.server import DEFAULT_ROOM_VERSION
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events import make_event_from_dict
from synapse.federation.federation_server import server_matches_acl_event
from synapse.rest import admin
@@ -148,7 +148,7 @@ class SendJoinFederationTests(unittest.FederatingHomeserverTestCase):
tok2 = self.login("fozzie", "bear")
self.helper.join(self._room_id, second_member_user_id, tok=tok2)
def _make_join(self, user_id) -> JsonDict:
def _make_join(self, user_id: str) -> JsonDict:
channel = self.make_signed_federation_request(
"GET",
f"/_matrix/federation/v1/make_join/{self._room_id}/{user_id}"
@@ -163,11 +163,9 @@ class SendJoinFederationTests(unittest.FederatingHomeserverTestCase):
join_result = self._make_join(joining_user)
join_event_dict = join_result["event"]
add_hashes_and_signatures(
KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION],
self.add_hashes_and_signatures_from_other_server(
join_event_dict,
signature_name=self.OTHER_SERVER_NAME,
signing_key=self.OTHER_SERVER_SIGNATURE_KEY,
KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION],
)
channel = self.make_signed_federation_request(
"PUT",
@@ -220,11 +218,9 @@ class SendJoinFederationTests(unittest.FederatingHomeserverTestCase):
join_result = self._make_join(joining_user)
join_event_dict = join_result["event"]
add_hashes_and_signatures(
KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION],
self.add_hashes_and_signatures_from_other_server(
join_event_dict,
signature_name=self.OTHER_SERVER_NAME,
signing_key=self.OTHER_SERVER_SIGNATURE_KEY,
KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION],
)
channel = self.make_signed_federation_request(
"PUT",
@@ -264,6 +260,67 @@ class SendJoinFederationTests(unittest.FederatingHomeserverTestCase):
)
self.assertEqual(r[("m.room.member", joining_user)].membership, "join")
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 3}})
def test_make_join_respects_room_join_rate_limit(self) -> None:
# In the test setup, two users join the room. Since the rate limiter burst
# count is 3, a new make_join request to the room should be accepted.
joining_user = "@ronniecorbett:" + self.OTHER_SERVER_NAME
self._make_join(joining_user)
# Now have a new local user join the room. This saturates the rate limiter
# bucket, so the next make_join should be denied.
new_local_user = self.register_user("animal", "animal")
token = self.login("animal", "animal")
self.helper.join(self._room_id, new_local_user, tok=token)
joining_user = "@ronniebarker:" + self.OTHER_SERVER_NAME
channel = self.make_signed_federation_request(
"GET",
f"/_matrix/federation/v1/make_join/{self._room_id}/{joining_user}"
f"?ver={DEFAULT_ROOM_VERSION}",
)
self.assertEqual(channel.code, HTTPStatus.TOO_MANY_REQUESTS, channel.json_body)
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 3}})
def test_send_join_contributes_to_room_join_rate_limit_and_is_limited(self) -> None:
# Make two make_join requests up front. (These are rate limited, but do not
# contribute to the rate limit.)
join_event_dicts = []
for i in range(2):
joining_user = f"@misspiggy{i}:{self.OTHER_SERVER_NAME}"
join_result = self._make_join(joining_user)
join_event_dict = join_result["event"]
self.add_hashes_and_signatures_from_other_server(
join_event_dict,
KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION],
)
join_event_dicts.append(join_event_dict)
# In the test setup, two users join the room. Since the rate limiter burst
# count is 3, the first send_join should be accepted...
channel = self.make_signed_federation_request(
"PUT",
f"/_matrix/federation/v2/send_join/{self._room_id}/join0",
content=join_event_dicts[0],
)
self.assertEqual(channel.code, 200, channel.json_body)
# ... but the second should be denied.
channel = self.make_signed_federation_request(
"PUT",
f"/_matrix/federation/v2/send_join/{self._room_id}/join1",
content=join_event_dicts[1],
)
self.assertEqual(channel.code, HTTPStatus.TOO_MANY_REQUESTS, channel.json_body)
# NB: we could write a test which checks that the send_join event is seen
# by other workers over replication, and that they update their rate limit
# buckets accordingly. I'm going to assume that the join event gets sent over
# replication, at which point the tests.handlers.room_member test
# test_local_users_joining_on_another_worker_contribute_to_rate_limit
# is probably sufficient to reassure that the bucket is updated.
def _create_acl_event(content):
return make_event_from_dict(

View File

@@ -256,7 +256,7 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase):
]
for _ in range(0, 8):
event = make_event_from_dict(
self.add_hashes_and_signatures(
self.add_hashes_and_signatures_from_other_server(
{
"origin_server_ts": 1,
"type": "m.room.message",

View File

@@ -104,7 +104,7 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
# mock up a load of state events which we are missing
state_events = [
make_event_from_dict(
self.add_hashes_and_signatures(
self.add_hashes_and_signatures_from_other_server(
{
"type": "test_state_type",
"state_key": f"state_{i}",
@@ -131,7 +131,7 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
# Depending on the test, we either persist this upfront (as an outlier),
# or let the server request it.
prev_event = make_event_from_dict(
self.add_hashes_and_signatures(
self.add_hashes_and_signatures_from_other_server(
{
"type": "test_regular_type",
"room_id": room_id,
@@ -165,7 +165,7 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
# mock up a regular event to pass into _process_pulled_event
pulled_event = make_event_from_dict(
self.add_hashes_and_signatures(
self.add_hashes_and_signatures_from_other_server(
{
"type": "test_regular_type",
"room_id": room_id,

View File

@@ -0,0 +1,290 @@
from http import HTTPStatus
from unittest.mock import Mock, patch
from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
import synapse.rest.client.login
import synapse.rest.client.room
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import LimitExceededError
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events import FrozenEventV3
from synapse.federation.federation_client import SendJoinResult
from synapse.server import HomeServer
from synapse.types import UserID, create_requester
from synapse.util import Clock
from tests.replication._base import RedisMultiWorkerStreamTestCase
from tests.server import make_request
from tests.test_utils import make_awaitable
from tests.unittest import FederatingHomeserverTestCase, override_config
class TestJoinsLimitedByPerRoomRateLimiter(FederatingHomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
synapse.rest.client.login.register_servlets,
synapse.rest.client.room.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.handler = hs.get_room_member_handler()
# Create three users.
self.alice = self.register_user("alice", "pass")
self.alice_token = self.login("alice", "pass")
self.bob = self.register_user("bob", "pass")
self.bob_token = self.login("bob", "pass")
self.chris = self.register_user("chris", "pass")
self.chris_token = self.login("chris", "pass")
# Create a room on this homeserver. Note that this counts as a join: it
# contributes to the rate limter's count of actions
self.room_id = self.helper.create_room_as(self.alice, tok=self.alice_token)
self.intially_unjoined_room_id = f"!example:{self.OTHER_SERVER_NAME}"
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 2}})
def test_local_user_local_joins_contribute_to_limit_and_are_limited(self) -> None:
# The rate limiter has accumulated one token from Alice's join after the create
# event.
# Try joining the room as Bob.
self.get_success(
self.handler.update_membership(
requester=create_requester(self.bob),
target=UserID.from_string(self.bob),
room_id=self.room_id,
action=Membership.JOIN,
)
)
# The rate limiter bucket is full. A second join should be denied.
self.get_failure(
self.handler.update_membership(
requester=create_requester(self.chris),
target=UserID.from_string(self.chris),
room_id=self.room_id,
action=Membership.JOIN,
),
LimitExceededError,
)
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 2}})
def test_local_user_profile_edits_dont_contribute_to_limit(self) -> None:
# The rate limiter has accumulated one token from Alice's join after the create
# event. Alice should still be able to change her displayname.
self.get_success(
self.handler.update_membership(
requester=create_requester(self.alice),
target=UserID.from_string(self.alice),
room_id=self.room_id,
action=Membership.JOIN,
content={"displayname": "Alice Cooper"},
)
)
# Still room in the limiter bucket. Chris's join should be accepted.
self.get_success(
self.handler.update_membership(
requester=create_requester(self.chris),
target=UserID.from_string(self.chris),
room_id=self.room_id,
action=Membership.JOIN,
)
)
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 1}})
def test_remote_joins_contribute_to_rate_limit(self) -> None:
# Join once, to fill the rate limiter bucket.
#
# To do this we have to mock the responses from the remote homeserver.
# We also patch out a bunch of event checks on our end. All we're really
# trying to check here is that remote joins will bump the rate limter when
# they are persisted.
create_event_source = {
"auth_events": [],
"content": {
"creator": f"@creator:{self.OTHER_SERVER_NAME}",
"room_version": self.hs.config.server.default_room_version.identifier,
},
"depth": 0,
"origin_server_ts": 0,
"prev_events": [],
"room_id": self.intially_unjoined_room_id,
"sender": f"@creator:{self.OTHER_SERVER_NAME}",
"state_key": "",
"type": EventTypes.Create,
}
self.add_hashes_and_signatures_from_other_server(
create_event_source,
self.hs.config.server.default_room_version,
)
create_event = FrozenEventV3(
create_event_source,
self.hs.config.server.default_room_version,
{},
None,
)
join_event_source = {
"auth_events": [create_event.event_id],
"content": {"membership": "join"},
"depth": 1,
"origin_server_ts": 100,
"prev_events": [create_event.event_id],
"sender": self.bob,
"state_key": self.bob,
"room_id": self.intially_unjoined_room_id,
"type": EventTypes.Member,
}
add_hashes_and_signatures(
self.hs.config.server.default_room_version,
join_event_source,
self.hs.hostname,
self.hs.signing_key,
)
join_event = FrozenEventV3(
join_event_source,
self.hs.config.server.default_room_version,
{},
None,
)
mock_make_membership_event = Mock(
return_value=make_awaitable(
(
self.OTHER_SERVER_NAME,
join_event,
self.hs.config.server.default_room_version,
)
)
)
mock_send_join = Mock(
return_value=make_awaitable(
SendJoinResult(
join_event,
self.OTHER_SERVER_NAME,
state=[create_event],
auth_chain=[create_event],
partial_state=False,
servers_in_room=[],
)
)
)
with patch.object(
self.handler.federation_handler.federation_client,
"make_membership_event",
mock_make_membership_event,
), patch.object(
self.handler.federation_handler.federation_client,
"send_join",
mock_send_join,
), patch(
"synapse.event_auth._is_membership_change_allowed",
return_value=None,
), patch(
"synapse.handlers.federation_event.check_state_dependent_auth_rules",
return_value=None,
):
self.get_success(
self.handler.update_membership(
requester=create_requester(self.bob),
target=UserID.from_string(self.bob),
room_id=self.intially_unjoined_room_id,
action=Membership.JOIN,
remote_room_hosts=[self.OTHER_SERVER_NAME],
)
)
# Try to join as Chris. Should get denied.
self.get_failure(
self.handler.update_membership(
requester=create_requester(self.chris),
target=UserID.from_string(self.chris),
room_id=self.intially_unjoined_room_id,
action=Membership.JOIN,
remote_room_hosts=[self.OTHER_SERVER_NAME],
),
LimitExceededError,
)
# TODO: test that remote joins to a room are rate limited.
# Could do this by setting the burst count to 1, then:
# - remote-joining a room
# - immediately leaving
# - trying to remote-join again.
class TestReplicatedJoinsLimitedByPerRoomRateLimiter(RedisMultiWorkerStreamTestCase):
servlets = [
synapse.rest.admin.register_servlets,
synapse.rest.client.login.register_servlets,
synapse.rest.client.room.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.handler = hs.get_room_member_handler()
# Create three users.
self.alice = self.register_user("alice", "pass")
self.alice_token = self.login("alice", "pass")
self.bob = self.register_user("bob", "pass")
self.bob_token = self.login("bob", "pass")
self.chris = self.register_user("chris", "pass")
self.chris_token = self.login("chris", "pass")
# Create a room on this homeserver.
# Note that this counts as a
self.room_id = self.helper.create_room_as(self.alice, tok=self.alice_token)
self.intially_unjoined_room_id = "!example:otherhs"
@override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 2}})
def test_local_users_joining_on_another_worker_contribute_to_rate_limit(
self,
) -> None:
# The rate limiter has accumulated one token from Alice's join after the create
# event.
self.replicate()
# Spawn another worker and have bob join via it.
worker_app = self.make_worker_hs(
"synapse.app.generic_worker", extra_config={"worker_name": "other worker"}
)
worker_site = self._hs_to_site[worker_app]
channel = make_request(
self.reactor,
worker_site,
"POST",
f"/_matrix/client/v3/rooms/{self.room_id}/join",
access_token=self.bob_token,
)
self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
# wait for join to arrive over replication
self.replicate()
# Try to join as Chris on the worker. Should get denied because Alice
# and Bob have both joined the room.
self.get_failure(
worker_app.get_room_member_handler().update_membership(
requester=create_requester(self.chris),
target=UserID.from_string(self.chris),
room_id=self.room_id,
action=Membership.JOIN,
),
LimitExceededError,
)
# Try to join as Chris on the original worker. Should get denied because Alice
# and Bob have both joined the room.
self.get_failure(
self.handler.update_membership(
requester=create_requester(self.chris),
target=UserID.from_string(self.chris),
room_id=self.room_id,
action=Membership.JOIN,
),
LimitExceededError,
)

View File

@@ -225,7 +225,7 @@ class OptionsResourceTests(unittest.TestCase):
parse_listener_def({"type": "http", "port": 0}),
self.resource,
"1.0",
max_request_body_size=1234,
max_request_body_size=4096,
reactor=self.reactor,
)

View File

@@ -272,7 +272,7 @@ class FilterEventsForClientTestCase(unittest.FederatingHomeserverTestCase):
"state_key": "@user:test",
"content": {"membership": "invite"},
}
self.add_hashes_and_signatures(invite_pdu)
self.add_hashes_and_signatures_from_other_server(invite_pdu)
invite_event_id = make_event_from_dict(invite_pdu, RoomVersions.V9).event_id
self.get_success(

View File

@@ -285,7 +285,7 @@ class HomeserverTestCase(TestCase):
config=self.hs.config.server.listeners[0],
resource=self.resource,
server_version_string="1",
max_request_body_size=1234,
max_request_body_size=4096,
reactor=self.reactor,
)
@@ -780,7 +780,7 @@ class FederatingHomeserverTestCase(HomeserverTestCase):
verify_key_id,
FetchKeyResult(
verify_key=verify_key,
valid_until_ts=clock.time_msec() + 1000,
valid_until_ts=clock.time_msec() + 10000,
),
)
],
@@ -838,7 +838,7 @@ class FederatingHomeserverTestCase(HomeserverTestCase):
client_ip=client_ip,
)
def add_hashes_and_signatures(
def add_hashes_and_signatures_from_other_server(
self,
event_dict: JsonDict,
room_version: RoomVersion = KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION],

View File

@@ -151,6 +151,7 @@ def default_config(name, parse=False):
"local": {"per_second": 10000, "burst_count": 10000},
"remote": {"per_second": 10000, "burst_count": 10000},
},
"rc_joins_per_room": {"per_second": 10000, "burst_count": 10000},
"rc_invites": {
"per_room": {"per_second": 10000, "burst_count": 10000},
"per_user": {"per_second": 10000, "burst_count": 10000},