Compare commits

...

13 Commits

Author SHA1 Message Date
Erik Johnston
8410021d1f Fixup 2022-04-21 11:49:51 +01:00
Erik Johnston
e16ec8f804 Mark remote server as up when we can talk to it 2022-04-21 11:49:51 +01:00
Erik Johnston
1e19c79a5d Reduce default max retry times 2022-04-21 11:49:51 +01:00
H. Shay
738742f439 add simple test 2022-04-21 11:49:51 +01:00
H. Shay
f304297486 lint 2022-04-21 11:49:51 +01:00
H. Shay
1736be0678 add max_short_retries to config and rename min_retry_delay -> max_short_retry_delay 2022-04-21 11:49:51 +01:00
H. Shay
6b38d55755 fix newsfragment 2022-04-21 11:49:51 +01:00
H. Shay
0a3fd49bb3 newsframent 2022-04-21 11:49:51 +01:00
H. Shay
312f489919 make matrix fed requests retries configurable 2022-04-21 11:49:51 +01:00
Erik Johnston
9823af0744 Reduce retry interval 2022-04-21 11:49:51 +01:00
Erik Johnston
6cb4339f42 Newsfile 2022-04-21 11:49:51 +01:00
Erik Johnston
a917a36936 Immediate retry any requests that have backed off when a server comes back online 2022-04-21 11:49:51 +01:00
Erik Johnston
fe0048c2f7 Add a AwakenableSleeper class 2022-04-21 11:49:51 +01:00
9 changed files with 176 additions and 18 deletions

1
changelog.d/12500.misc Normal file
View File

@@ -0,0 +1 @@
Immediately retry any requests that have backed off when a server comes back online.

1
changelog.d/12504.misc Normal file
View File

@@ -0,0 +1 @@
Allow for the configuration of max request retries and min/max retry delays in the matrix federation client.

View File

@@ -81,3 +81,10 @@ class ExperimentalConfig(Config):
# MSC2654: Unread counts # MSC2654: Unread counts
self.msc2654_enabled: bool = experimental.get("msc2654_enabled", False) self.msc2654_enabled: bool = experimental.get("msc2654_enabled", False)
# Allow for the configuration of max request retries and min/max retry delays
# in the matrix federation client
self.max_long_retry_delay = experimental.get("max_long_retry_delay", 10)
self.max_short_retry_delay = experimental.get("max_short_retry_delay", 2)
self.max_long_retries = experimental.get("max_long_retries", 10)
self.max_short_retries = experimental.get("max_short_retries", 3)

View File

@@ -73,7 +73,7 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import set_tag, start_active_span, tags from synapse.logging.opentracing import set_tag, start_active_span, tags
from synapse.types import JsonDict from synapse.types import JsonDict
from synapse.util import json_decoder from synapse.util import json_decoder
from synapse.util.async_helpers import timeout_deferred from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
if TYPE_CHECKING: if TYPE_CHECKING:
@@ -92,8 +92,6 @@ incoming_responses_counter = Counter(
# need a generous limit here. # need a generous limit here.
MAX_RESPONSE_SIZE = 100 * 1024 * 1024 MAX_RESPONSE_SIZE = 100 * 1024 * 1024
MAX_LONG_RETRIES = 10
MAX_SHORT_RETRIES = 3
MAXINT = sys.maxsize MAXINT = sys.maxsize
@@ -348,11 +346,23 @@ class MatrixFederationHttpClient:
self.version_string_bytes = hs.version_string.encode("ascii") self.version_string_bytes = hs.version_string.encode("ascii")
self.default_timeout = 60 self.default_timeout = 60
self.max_long_retry_delay = hs.config.experimental.max_long_retry_delay
self.max_short_retry_delay = hs.config.experimental.max_short_retry_delay
self.max_long_retries = hs.config.experimental.max_long_retries
self.max_short_retries = hs.config.experimental.max_short_retries
def schedule(x): def schedule(x):
self.reactor.callLater(_EPSILON, x) self.reactor.callLater(_EPSILON, x)
self._cooperator = Cooperator(scheduler=schedule) self._cooperator = Cooperator(scheduler=schedule)
self._sleeper = AwakenableSleeper(self.reactor)
def wake_destination(self, destination: str) -> None:
"""Called when the remote server may have come back online."""
self._sleeper.wake(destination)
async def _send_request_with_optional_trailing_slash( async def _send_request_with_optional_trailing_slash(
self, self,
request: MatrixFederationRequest, request: MatrixFederationRequest,
@@ -474,6 +484,8 @@ class MatrixFederationHttpClient:
self._store, self._store,
backoff_on_404=backoff_on_404, backoff_on_404=backoff_on_404,
ignore_backoff=ignore_backoff, ignore_backoff=ignore_backoff,
notifier=self.hs.get_notifier(),
replication_client=self.hs.get_replication_command_handler(),
) )
method_bytes = request.method.encode("ascii") method_bytes = request.method.encode("ascii")
@@ -502,9 +514,9 @@ class MatrixFederationHttpClient:
# XXX: Would be much nicer to retry only at the transaction-layer # XXX: Would be much nicer to retry only at the transaction-layer
# (once we have reliable transactions in place) # (once we have reliable transactions in place)
if long_retries: if long_retries:
retries_left = MAX_LONG_RETRIES retries_left = self.max_long_retries
else: else:
retries_left = MAX_SHORT_RETRIES retries_left = self.max_short_retries
url_bytes = request.uri url_bytes = request.uri
url_str = url_bytes.decode("ascii") url_str = url_bytes.decode("ascii")
@@ -649,12 +661,12 @@ class MatrixFederationHttpClient:
if retries_left and not timeout: if retries_left and not timeout:
if long_retries: if long_retries:
delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left) delay = 4 ** (self.max_long_retries + 1 - retries_left)
delay = min(delay, 60) delay = min(delay, self.max_long_retry_delay)
delay *= random.uniform(0.8, 1.4) delay *= random.uniform(0.8, 1.4)
else: else:
delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left) delay = 0.5 * 2 ** (self.max_short_retries - retries_left)
delay = min(delay, 2) delay = min(delay, self.max_short_retry_delay)
delay *= random.uniform(0.8, 1.4) delay *= random.uniform(0.8, 1.4)
logger.debug( logger.debug(
@@ -664,7 +676,9 @@ class MatrixFederationHttpClient:
delay, delay,
) )
await self.clock.sleep(delay) # Sleep for the calculated delay, or wake up immediately
# if we get notified that the server is back up.
await self._sleeper.sleep(request.destination, delay * 1000)
retries_left -= 1 retries_left -= 1
else: else:
raise raise

View File

@@ -228,9 +228,7 @@ class Notifier:
# Called when there are new things to stream over replication # Called when there are new things to stream over replication
self.replication_callbacks: List[Callable[[], None]] = [] self.replication_callbacks: List[Callable[[], None]] = []
# Called when remote servers have come back online after having been self._federation_client = hs.get_federation_http_client()
# down.
self.remote_server_up_callbacks: List[Callable[[str], None]] = []
self._third_party_rules = hs.get_third_party_event_rules() self._third_party_rules = hs.get_third_party_event_rules()
@@ -731,3 +729,7 @@ class Notifier:
# circular dependencies. # circular dependencies.
if self.federation_sender: if self.federation_sender:
self.federation_sender.wake_destination(server) self.federation_sender.wake_destination(server)
# Tell the federation client about the fact the server is back up, so
# that any in flight requests can be immediately retried.
self._federation_client.wake_destination(server)

View File

@@ -734,3 +734,60 @@ def delay_cancellation(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
new_deferred: "defer.Deferred[T]" = defer.Deferred(handle_cancel) new_deferred: "defer.Deferred[T]" = defer.Deferred(handle_cancel)
deferred.chainDeferred(new_deferred) deferred.chainDeferred(new_deferred)
return new_deferred return new_deferred
class AwakenableSleeper:
"""Allows explicitly waking up deferreds related to an entity that are
currently sleeping.
"""
def __init__(self, reactor: IReactorTime) -> None:
self._streams: Dict[str, Set[defer.Deferred[None]]] = {}
self._reactor = reactor
def wake(self, name: str) -> None:
"""Wake everything related to `name` that is currently sleeping."""
stream_set = self._streams.pop(name, set())
for deferred in set(stream_set):
try:
with PreserveLoggingContext():
deferred.callback(None)
except Exception:
pass
async def sleep(self, name: str, delay_ms: int) -> None:
"""Sleep for the given number of milliseconds, or return if the given
`name` is explicitly woken up.
"""
# Create a deferred that gets called in N seconds
sleep_deferred: "defer.Deferred[None]" = defer.Deferred()
call = self._reactor.callLater(delay_ms / 1000, sleep_deferred.callback, None)
# Create a deferred that will get called if `wake` is called with
# the same `name`.
stream_set = self._streams.setdefault(name, set())
notify_deferred: "defer.Deferred[None]" = defer.Deferred()
stream_set.add(notify_deferred)
try:
# Wait for either the delay or for `wake` to be called.
await make_deferred_yieldable(
defer.DeferredList(
[sleep_deferred, notify_deferred],
fireOnOneCallback=True,
fireOnOneErrback=True,
consumeErrors=True,
)
)
finally:
# Clean up the state
stream_set.discard(notify_deferred)
curr_stream_set = self._streams.get(name)
if curr_stream_set is not None and len(curr_stream_set) == 0:
self._streams.pop(name)
# Cancel the sleep if we were woken up
if call.active():
call.cancel()

View File

@@ -14,23 +14,27 @@
import logging import logging
import random import random
from types import TracebackType from types import TracebackType
from typing import Any, Optional, Type from typing import TYPE_CHECKING, Any, Optional, Type
import synapse.logging.context import synapse.logging.context
from synapse.api.errors import CodeMessageException from synapse.api.errors import CodeMessageException
from synapse.storage import DataStore from synapse.storage import DataStore
from synapse.util import Clock from synapse.util import Clock
if TYPE_CHECKING:
from synapse.notifier import Notifier
from synapse.replication.tcp.handler import ReplicationCommandHandler
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# the initial backoff, after the first transaction fails # the initial backoff, after the first transaction fails
MIN_RETRY_INTERVAL = 10 * 60 * 1000 MIN_RETRY_INTERVAL = 30 * 1000
# how much we multiply the backoff by after each subsequent fail # how much we multiply the backoff by after each subsequent fail
RETRY_MULTIPLIER = 5 RETRY_MULTIPLIER = 5
# a cap on the backoff. (Essentially none) # a cap on the backoff.
MAX_RETRY_INTERVAL = 2**62 MAX_RETRY_INTERVAL = 30 * 1000
class NotRetryingDestination(Exception): class NotRetryingDestination(Exception):
@@ -131,6 +135,8 @@ class RetryDestinationLimiter:
retry_interval: int, retry_interval: int,
backoff_on_404: bool = False, backoff_on_404: bool = False,
backoff_on_failure: bool = True, backoff_on_failure: bool = True,
notifier: Optional["Notifier"] = None,
replication_client: Optional["ReplicationCommandHandler"] = None,
): ):
"""Marks the destination as "down" if an exception is thrown in the """Marks the destination as "down" if an exception is thrown in the
context, except for CodeMessageException with code < 500. context, except for CodeMessageException with code < 500.
@@ -160,6 +166,9 @@ class RetryDestinationLimiter:
self.backoff_on_404 = backoff_on_404 self.backoff_on_404 = backoff_on_404
self.backoff_on_failure = backoff_on_failure self.backoff_on_failure = backoff_on_failure
self.notifier = notifier
self.replication_client = replication_client
def __enter__(self) -> None: def __enter__(self) -> None:
pass pass
@@ -239,6 +248,19 @@ class RetryDestinationLimiter:
retry_last_ts, retry_last_ts,
self.retry_interval, self.retry_interval,
) )
if self.notifier:
# Inform the relevant places that the remote server is back up.
self.notifier.notify_remote_server_up(self.destination)
if self.replication_client:
# If we're on a worker we try and inform master about this. The
# replication client doesn't hook into the notifier to avoid
# infinite loops where we send a `REMOTE_SERVER_UP` command to
# master, which then echoes it back to us which in turn pokes
# the notifier.
self.replication_client.send_remote_server_up(self.destination)
except Exception: except Exception:
logger.exception("Failed to store destination_retry_timings") logger.exception("Failed to store destination_retry_timings")

View File

@@ -33,7 +33,7 @@ from synapse.http.matrixfederationclient import (
from synapse.logging.context import SENTINEL_CONTEXT, LoggingContext, current_context from synapse.logging.context import SENTINEL_CONTEXT, LoggingContext, current_context
from tests.server import FakeTransport from tests.server import FakeTransport
from tests.unittest import HomeserverTestCase from tests.unittest import HomeserverTestCase, override_config
def check_logcontext(context): def check_logcontext(context):
@@ -617,3 +617,19 @@ class FederationClientTests(HomeserverTestCase):
self.assertIsInstance(f.value, RequestSendFailed) self.assertIsInstance(f.value, RequestSendFailed)
self.assertTrue(transport.disconnecting) self.assertTrue(transport.disconnecting)
@override_config(
{
"experimental_features": {
"max_long_retry_delay": 100,
"max_short_retry_delay": 7,
"max_long_retries": 20,
"max_short_retries": 5,
}
}
)
def test_configurable_retry_and_delay_values(self):
self.assertEqual(self.cl.max_long_retry_delay, 100)
self.assertEqual(self.cl.max_short_retry_delay, 7)
self.assertEqual(self.cl.max_long_retries, 20)
self.assertEqual(self.cl.max_short_retries, 5)

View File

@@ -28,6 +28,7 @@ from synapse.logging.context import (
make_deferred_yieldable, make_deferred_yieldable,
) )
from synapse.util.async_helpers import ( from synapse.util.async_helpers import (
AwakenableSleeper,
ObservableDeferred, ObservableDeferred,
concurrently_execute, concurrently_execute,
delay_cancellation, delay_cancellation,
@@ -35,6 +36,7 @@ from synapse.util.async_helpers import (
timeout_deferred, timeout_deferred,
) )
from tests.server import get_clock
from tests.unittest import TestCase from tests.unittest import TestCase
@@ -467,3 +469,39 @@ class DelayCancellationTests(TestCase):
# logging context. # logging context.
blocking_d.callback(None) blocking_d.callback(None)
self.successResultOf(d) self.successResultOf(d)
class AwakenableSleeperTests(TestCase):
"Tests AwakenableSleeper"
def test_sleep(self):
reactor, _ = get_clock()
sleeper = AwakenableSleeper(reactor)
d = defer.ensureDeferred(sleeper.sleep("name", 1000))
reactor.pump([0.0])
self.assertFalse(d.called)
reactor.advance(0.5)
self.assertFalse(d.called)
reactor.advance(0.6)
self.assertTrue(d.called)
def test_explicit_wake(self):
reactor, _ = get_clock()
sleeper = AwakenableSleeper(reactor)
d = defer.ensureDeferred(sleeper.sleep("name", 1000))
reactor.pump([0.0])
self.assertFalse(d.called)
reactor.advance(0.5)
self.assertFalse(d.called)
sleeper.wake("name")
self.assertTrue(d.called)
reactor.advance(0.6)