mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Lift pausing on ratelimited requests to http layer (#18595)
When a request gets ratelimited we (optionally) wait ~500ms before returning to mitigate clients that like to tightloop on request failures. However, this is currently implemented by pausing request processing when we check for ratelimits, which might be deep within request processing, and e.g. while locks are held. Instead, let's hoist the pause to the very top of the HTTP handler. Hopefully, this mitigates the issue where a user sending lots of events to a single room can see their requests time out due to the combination of the linearizer and the pausing of the request. Instead, they should see the requests 429 after ~500ms. The first commit is a refactor to pass the `Clock` to `AsyncResource`, the second commit is the behavioural change.
This commit is contained in:
1
changelog.d/18595.misc
Normal file
1
changelog.d/18595.misc
Normal file
@@ -0,0 +1 @@
|
||||
Better handling of ratelimited requests.
|
||||
@@ -527,7 +527,11 @@ class InvalidCaptchaError(SynapseError):
|
||||
|
||||
|
||||
class LimitExceededError(SynapseError):
|
||||
"""A client has sent too many requests and is being throttled."""
|
||||
"""A client has sent too many requests and is being throttled.
|
||||
|
||||
Args:
|
||||
pause: Optional time in seconds to pause before responding to the client.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -535,6 +539,7 @@ class LimitExceededError(SynapseError):
|
||||
code: int = 429,
|
||||
retry_after_ms: Optional[int] = None,
|
||||
errcode: str = Codes.LIMIT_EXCEEDED,
|
||||
pause: Optional[float] = None,
|
||||
):
|
||||
# Use HTTP header Retry-After to enable library-assisted retry handling.
|
||||
headers = (
|
||||
@@ -545,6 +550,7 @@ class LimitExceededError(SynapseError):
|
||||
super().__init__(code, "Too Many Requests", errcode, headers=headers)
|
||||
self.retry_after_ms = retry_after_ms
|
||||
self.limiter_name = limiter_name
|
||||
self.pause = pause
|
||||
|
||||
def error_dict(self, config: Optional["HomeServerConfig"]) -> "JsonDict":
|
||||
return cs_error(self.msg, self.errcode, retry_after_ms=self.retry_after_ms)
|
||||
|
||||
@@ -338,12 +338,10 @@ class Ratelimiter:
|
||||
)
|
||||
|
||||
if not allowed:
|
||||
if pause:
|
||||
await self.clock.sleep(pause)
|
||||
|
||||
raise LimitExceededError(
|
||||
limiter_name=self._limiter_name,
|
||||
retry_after_ms=int(1000 * (time_allowed - time_now_s)),
|
||||
pause=pause,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -53,7 +53,7 @@ class AdditionalResource(DirectServeJsonResource):
|
||||
hs: homeserver
|
||||
handler: function to be called to handle the request.
|
||||
"""
|
||||
super().__init__()
|
||||
super().__init__(hs.get_clock())
|
||||
self._handler = handler
|
||||
|
||||
async def _async_render(self, request: Request) -> Optional[Tuple[int, Any]]:
|
||||
|
||||
@@ -106,7 +106,7 @@ class ProxyResource(_AsyncResource):
|
||||
isLeaf = True
|
||||
|
||||
def __init__(self, reactor: ISynapseReactor, hs: "HomeServer"):
|
||||
super().__init__(True)
|
||||
super().__init__(hs.get_clock(), True)
|
||||
|
||||
self.reactor = reactor
|
||||
self.agent = hs.get_federation_http_client().agent
|
||||
|
||||
@@ -67,6 +67,7 @@ from twisted.web.util import redirectTo
|
||||
from synapse.api.errors import (
|
||||
CodeMessageException,
|
||||
Codes,
|
||||
LimitExceededError,
|
||||
RedirectException,
|
||||
SynapseError,
|
||||
UnrecognizedRequestError,
|
||||
@@ -74,7 +75,7 @@ from synapse.api.errors import (
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
|
||||
from synapse.logging.opentracing import active_span, start_active_span, trace_servlet
|
||||
from synapse.util import json_encoder
|
||||
from synapse.util import Clock, json_encoder
|
||||
from synapse.util.caches import intern_dict
|
||||
from synapse.util.cancellation import is_function_cancellable
|
||||
from synapse.util.iterutils import chunk_seq
|
||||
@@ -308,9 +309,10 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
|
||||
context from the request the servlet is handling.
|
||||
"""
|
||||
|
||||
def __init__(self, extract_context: bool = False):
|
||||
def __init__(self, clock: Clock, extract_context: bool = False):
|
||||
super().__init__()
|
||||
|
||||
self._clock = clock
|
||||
self._extract_context = extract_context
|
||||
|
||||
def render(self, request: "SynapseRequest") -> int:
|
||||
@@ -329,7 +331,12 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
|
||||
request.request_metrics.name = self.__class__.__name__
|
||||
|
||||
with trace_servlet(request, self._extract_context):
|
||||
callback_return = await self._async_render(request)
|
||||
try:
|
||||
callback_return = await self._async_render(request)
|
||||
except LimitExceededError as e:
|
||||
if e.pause:
|
||||
self._clock.sleep(e.pause)
|
||||
raise
|
||||
|
||||
if callback_return is not None:
|
||||
code, response = callback_return
|
||||
@@ -393,8 +400,10 @@ class DirectServeJsonResource(_AsyncResource):
|
||||
formatting responses and errors as JSON.
|
||||
"""
|
||||
|
||||
def __init__(self, canonical_json: bool = False, extract_context: bool = False):
|
||||
super().__init__(extract_context)
|
||||
def __init__(
|
||||
self, clock: Clock, canonical_json: bool = False, extract_context: bool = False
|
||||
):
|
||||
super().__init__(clock, extract_context)
|
||||
self.canonical_json = canonical_json
|
||||
|
||||
def _send_response(
|
||||
@@ -450,8 +459,8 @@ class JsonResource(DirectServeJsonResource):
|
||||
canonical_json: bool = True,
|
||||
extract_context: bool = False,
|
||||
):
|
||||
super().__init__(canonical_json, extract_context)
|
||||
self.clock = hs.get_clock()
|
||||
super().__init__(self.clock, canonical_json, extract_context)
|
||||
# Map of path regex -> method -> callback.
|
||||
self._routes: Dict[Pattern[str], Dict[bytes, _PathEntry]] = {}
|
||||
self.hs = hs
|
||||
|
||||
@@ -81,7 +81,7 @@ class ConsentResource(DirectServeHtmlResource):
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__()
|
||||
super().__init__(hs.get_clock())
|
||||
|
||||
self.hs = hs
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
@@ -44,7 +44,7 @@ class FederationWhitelistResource(DirectServeJsonResource):
|
||||
PATH = "/_synapse/client/v1/config/federation_whitelist"
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__()
|
||||
super().__init__(hs.get_clock())
|
||||
|
||||
self._federation_whitelist = hs.config.federation.federation_domain_whitelist
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
class JwksResource(DirectServeJsonResource):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(extract_context=True)
|
||||
super().__init__(hs.get_clock(), extract_context=True)
|
||||
|
||||
# Parameters that are allowed to be exposed in the public key.
|
||||
# This is done manually, because authlib's private to public key conversion
|
||||
|
||||
@@ -48,7 +48,7 @@ class NewUserConsentResource(DirectServeHtmlResource):
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__()
|
||||
super().__init__(hs.get_clock())
|
||||
self._sso_handler = hs.get_sso_handler()
|
||||
self._server_name = hs.hostname
|
||||
self._consent_version = hs.config.consent.user_consent_version
|
||||
|
||||
@@ -35,7 +35,7 @@ class OIDCBackchannelLogoutResource(DirectServeJsonResource):
|
||||
isLeaf = 1
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__()
|
||||
super().__init__(hs.get_clock())
|
||||
self._oidc_handler = hs.get_oidc_handler()
|
||||
|
||||
async def _async_render_POST(self, request: SynapseRequest) -> None:
|
||||
|
||||
@@ -35,7 +35,7 @@ class OIDCCallbackResource(DirectServeHtmlResource):
|
||||
isLeaf = 1
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__()
|
||||
super().__init__(hs.get_clock())
|
||||
self._oidc_handler = hs.get_oidc_handler()
|
||||
|
||||
async def _async_render_GET(self, request: SynapseRequest) -> None:
|
||||
|
||||
@@ -47,7 +47,7 @@ class PasswordResetSubmitTokenResource(DirectServeHtmlResource):
|
||||
Args:
|
||||
hs: server
|
||||
"""
|
||||
super().__init__()
|
||||
super().__init__(hs.get_clock())
|
||||
|
||||
self.clock = hs.get_clock()
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
@@ -44,7 +44,7 @@ class PickIdpResource(DirectServeHtmlResource):
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__()
|
||||
super().__init__(hs.get_clock())
|
||||
self._sso_handler = hs.get_sso_handler()
|
||||
self._sso_login_idp_picker_template = (
|
||||
hs.config.sso.sso_login_idp_picker_template
|
||||
|
||||
@@ -62,7 +62,7 @@ def pick_username_resource(hs: "HomeServer") -> Resource:
|
||||
|
||||
class AvailabilityCheckResource(DirectServeJsonResource):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__()
|
||||
super().__init__(hs.get_clock())
|
||||
self._sso_handler = hs.get_sso_handler()
|
||||
|
||||
async def _async_render_GET(self, request: Request) -> Tuple[int, JsonDict]:
|
||||
@@ -78,7 +78,7 @@ class AvailabilityCheckResource(DirectServeJsonResource):
|
||||
|
||||
class AccountDetailsResource(DirectServeHtmlResource):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__()
|
||||
super().__init__(hs.get_clock())
|
||||
self._sso_handler = hs.get_sso_handler()
|
||||
|
||||
def template_search_dirs() -> Generator[str, None, None]:
|
||||
|
||||
@@ -30,7 +30,7 @@ class MSC4108RendezvousSessionResource(DirectServeJsonResource):
|
||||
isLeaf = True
|
||||
|
||||
def __init__(self, hs: "HomeServer") -> None:
|
||||
super().__init__()
|
||||
super().__init__(hs.get_clock())
|
||||
self._handler = hs.get_rendezvous_handler()
|
||||
|
||||
async def _async_render_GET(self, request: SynapseRequest) -> None:
|
||||
|
||||
@@ -35,7 +35,7 @@ class SAML2ResponseResource(DirectServeHtmlResource):
|
||||
isLeaf = 1
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__()
|
||||
super().__init__(hs.get_clock())
|
||||
self._saml_handler = hs.get_saml_handler()
|
||||
self._sso_handler = hs.get_sso_handler()
|
||||
|
||||
|
||||
@@ -43,7 +43,7 @@ class SsoRegisterResource(DirectServeHtmlResource):
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__()
|
||||
super().__init__(hs.get_clock())
|
||||
self._sso_handler = hs.get_sso_handler()
|
||||
|
||||
async def _async_render_GET(self, request: Request) -> None:
|
||||
|
||||
@@ -38,7 +38,7 @@ class UnsubscribeResource(DirectServeHtmlResource):
|
||||
SUCCESS_HTML = b"<html><body>You have been unsubscribed</body><html>"
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__()
|
||||
super().__init__(hs.get_clock())
|
||||
self.notifier = hs.get_notifier()
|
||||
self.auth = hs.get_auth()
|
||||
self.pusher_pool = hs.get_pusherpool()
|
||||
|
||||
@@ -86,7 +86,7 @@ class ClientWellKnownResource(DirectServeJsonResource):
|
||||
isLeaf = 1
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__()
|
||||
super().__init__(hs.get_clock())
|
||||
self._well_known_builder = WellKnownBuilder(hs)
|
||||
|
||||
async def _async_render_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||
|
||||
@@ -316,15 +316,16 @@ class WrapHtmlRequestHandlerTests(unittest.TestCase):
|
||||
await self.callback(request)
|
||||
|
||||
def setUp(self) -> None:
|
||||
reactor, _ = get_clock()
|
||||
reactor, clock = get_clock()
|
||||
self.reactor = reactor
|
||||
self.clock = clock
|
||||
|
||||
def test_good_response(self) -> None:
|
||||
async def callback(request: SynapseRequest) -> None:
|
||||
request.write(b"response")
|
||||
request.finish()
|
||||
|
||||
res = WrapHtmlRequestHandlerTests.TestResource()
|
||||
res = WrapHtmlRequestHandlerTests.TestResource(self.clock)
|
||||
res.callback = callback
|
||||
|
||||
channel = make_request(
|
||||
@@ -344,7 +345,7 @@ class WrapHtmlRequestHandlerTests(unittest.TestCase):
|
||||
async def callback(request: SynapseRequest, **kwargs: object) -> None:
|
||||
raise RedirectException(b"/look/an/eagle", 301)
|
||||
|
||||
res = WrapHtmlRequestHandlerTests.TestResource()
|
||||
res = WrapHtmlRequestHandlerTests.TestResource(self.clock)
|
||||
res.callback = callback
|
||||
|
||||
channel = make_request(
|
||||
@@ -366,7 +367,7 @@ class WrapHtmlRequestHandlerTests(unittest.TestCase):
|
||||
e.cookies.append(b"session=yespls")
|
||||
raise e
|
||||
|
||||
res = WrapHtmlRequestHandlerTests.TestResource()
|
||||
res = WrapHtmlRequestHandlerTests.TestResource(self.clock)
|
||||
res.callback = callback
|
||||
|
||||
channel = make_request(
|
||||
@@ -387,7 +388,7 @@ class WrapHtmlRequestHandlerTests(unittest.TestCase):
|
||||
request.write(b"response")
|
||||
request.finish()
|
||||
|
||||
res = WrapHtmlRequestHandlerTests.TestResource()
|
||||
res = WrapHtmlRequestHandlerTests.TestResource(self.clock)
|
||||
res.callback = callback
|
||||
|
||||
channel = make_request(
|
||||
@@ -400,7 +401,7 @@ class WrapHtmlRequestHandlerTests(unittest.TestCase):
|
||||
|
||||
class CancellableDirectServeJsonResource(DirectServeJsonResource):
|
||||
def __init__(self, clock: Clock):
|
||||
super().__init__()
|
||||
super().__init__(clock)
|
||||
self.clock = clock
|
||||
|
||||
@cancellable
|
||||
@@ -417,7 +418,7 @@ class CancellableDirectServeHtmlResource(DirectServeHtmlResource):
|
||||
ERROR_TEMPLATE = "{code} {msg}"
|
||||
|
||||
def __init__(self, clock: Clock):
|
||||
super().__init__()
|
||||
super().__init__(clock)
|
||||
self.clock = clock
|
||||
|
||||
@cancellable
|
||||
|
||||
Reference in New Issue
Block a user