Compare commits

...

13 Commits

Author SHA1 Message Date
Erik Johnston
8410021d1f Fixup 2022-04-21 11:49:51 +01:00
Erik Johnston
e16ec8f804 Mark remote server as up when we can talk to it 2022-04-21 11:49:51 +01:00
Erik Johnston
1e19c79a5d Reduce default max retry times 2022-04-21 11:49:51 +01:00
H. Shay
738742f439 add simple test 2022-04-21 11:49:51 +01:00
H. Shay
f304297486 lint 2022-04-21 11:49:51 +01:00
H. Shay
1736be0678 add max_short_retries to config and rename min_retry_delay -> max_short_retry_delay 2022-04-21 11:49:51 +01:00
H. Shay
6b38d55755 fix newsfragment 2022-04-21 11:49:51 +01:00
H. Shay
0a3fd49bb3 newsframent 2022-04-21 11:49:51 +01:00
H. Shay
312f489919 make matrix fed requests retries configurable 2022-04-21 11:49:51 +01:00
Erik Johnston
9823af0744 Reduce retry interval 2022-04-21 11:49:51 +01:00
Erik Johnston
6cb4339f42 Newsfile 2022-04-21 11:49:51 +01:00
Erik Johnston
a917a36936 Immediate retry any requests that have backed off when a server comes back online 2022-04-21 11:49:51 +01:00
Erik Johnston
fe0048c2f7 Add a AwakenableSleeper class 2022-04-21 11:49:51 +01:00
9 changed files with 176 additions and 18 deletions

1
changelog.d/12500.misc Normal file
View File

@@ -0,0 +1 @@
Immediately retry any requests that have backed off when a server comes back online.

1
changelog.d/12504.misc Normal file
View File

@@ -0,0 +1 @@
Allow for the configuration of max request retries and min/max retry delays in the matrix federation client.

View File

@@ -81,3 +81,10 @@ class ExperimentalConfig(Config):
# MSC2654: Unread counts
self.msc2654_enabled: bool = experimental.get("msc2654_enabled", False)
# Allow for the configuration of max request retries and min/max retry delays
# in the matrix federation client
self.max_long_retry_delay = experimental.get("max_long_retry_delay", 10)
self.max_short_retry_delay = experimental.get("max_short_retry_delay", 2)
self.max_long_retries = experimental.get("max_long_retries", 10)
self.max_short_retries = experimental.get("max_short_retries", 3)

View File

@@ -73,7 +73,7 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import set_tag, start_active_span, tags
from synapse.types import JsonDict
from synapse.util import json_decoder
from synapse.util.async_helpers import timeout_deferred
from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred
from synapse.util.metrics import Measure
if TYPE_CHECKING:
@@ -92,8 +92,6 @@ incoming_responses_counter = Counter(
# need a generous limit here.
MAX_RESPONSE_SIZE = 100 * 1024 * 1024
MAX_LONG_RETRIES = 10
MAX_SHORT_RETRIES = 3
MAXINT = sys.maxsize
@@ -348,11 +346,23 @@ class MatrixFederationHttpClient:
self.version_string_bytes = hs.version_string.encode("ascii")
self.default_timeout = 60
self.max_long_retry_delay = hs.config.experimental.max_long_retry_delay
self.max_short_retry_delay = hs.config.experimental.max_short_retry_delay
self.max_long_retries = hs.config.experimental.max_long_retries
self.max_short_retries = hs.config.experimental.max_short_retries
def schedule(x):
self.reactor.callLater(_EPSILON, x)
self._cooperator = Cooperator(scheduler=schedule)
self._sleeper = AwakenableSleeper(self.reactor)
def wake_destination(self, destination: str) -> None:
"""Called when the remote server may have come back online."""
self._sleeper.wake(destination)
async def _send_request_with_optional_trailing_slash(
self,
request: MatrixFederationRequest,
@@ -474,6 +484,8 @@ class MatrixFederationHttpClient:
self._store,
backoff_on_404=backoff_on_404,
ignore_backoff=ignore_backoff,
notifier=self.hs.get_notifier(),
replication_client=self.hs.get_replication_command_handler(),
)
method_bytes = request.method.encode("ascii")
@@ -502,9 +514,9 @@ class MatrixFederationHttpClient:
# XXX: Would be much nicer to retry only at the transaction-layer
# (once we have reliable transactions in place)
if long_retries:
retries_left = MAX_LONG_RETRIES
retries_left = self.max_long_retries
else:
retries_left = MAX_SHORT_RETRIES
retries_left = self.max_short_retries
url_bytes = request.uri
url_str = url_bytes.decode("ascii")
@@ -649,12 +661,12 @@ class MatrixFederationHttpClient:
if retries_left and not timeout:
if long_retries:
delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
delay = min(delay, 60)
delay = 4 ** (self.max_long_retries + 1 - retries_left)
delay = min(delay, self.max_long_retry_delay)
delay *= random.uniform(0.8, 1.4)
else:
delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
delay = min(delay, 2)
delay = 0.5 * 2 ** (self.max_short_retries - retries_left)
delay = min(delay, self.max_short_retry_delay)
delay *= random.uniform(0.8, 1.4)
logger.debug(
@@ -664,7 +676,9 @@ class MatrixFederationHttpClient:
delay,
)
await self.clock.sleep(delay)
# Sleep for the calculated delay, or wake up immediately
# if we get notified that the server is back up.
await self._sleeper.sleep(request.destination, delay * 1000)
retries_left -= 1
else:
raise

View File

@@ -228,9 +228,7 @@ class Notifier:
# Called when there are new things to stream over replication
self.replication_callbacks: List[Callable[[], None]] = []
# Called when remote servers have come back online after having been
# down.
self.remote_server_up_callbacks: List[Callable[[str], None]] = []
self._federation_client = hs.get_federation_http_client()
self._third_party_rules = hs.get_third_party_event_rules()
@@ -731,3 +729,7 @@ class Notifier:
# circular dependencies.
if self.federation_sender:
self.federation_sender.wake_destination(server)
# Tell the federation client about the fact the server is back up, so
# that any in flight requests can be immediately retried.
self._federation_client.wake_destination(server)

View File

@@ -734,3 +734,60 @@ def delay_cancellation(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
new_deferred: "defer.Deferred[T]" = defer.Deferred(handle_cancel)
deferred.chainDeferred(new_deferred)
return new_deferred
class AwakenableSleeper:
"""Allows explicitly waking up deferreds related to an entity that are
currently sleeping.
"""
def __init__(self, reactor: IReactorTime) -> None:
self._streams: Dict[str, Set[defer.Deferred[None]]] = {}
self._reactor = reactor
def wake(self, name: str) -> None:
"""Wake everything related to `name` that is currently sleeping."""
stream_set = self._streams.pop(name, set())
for deferred in set(stream_set):
try:
with PreserveLoggingContext():
deferred.callback(None)
except Exception:
pass
async def sleep(self, name: str, delay_ms: int) -> None:
"""Sleep for the given number of milliseconds, or return if the given
`name` is explicitly woken up.
"""
# Create a deferred that gets called in N seconds
sleep_deferred: "defer.Deferred[None]" = defer.Deferred()
call = self._reactor.callLater(delay_ms / 1000, sleep_deferred.callback, None)
# Create a deferred that will get called if `wake` is called with
# the same `name`.
stream_set = self._streams.setdefault(name, set())
notify_deferred: "defer.Deferred[None]" = defer.Deferred()
stream_set.add(notify_deferred)
try:
# Wait for either the delay or for `wake` to be called.
await make_deferred_yieldable(
defer.DeferredList(
[sleep_deferred, notify_deferred],
fireOnOneCallback=True,
fireOnOneErrback=True,
consumeErrors=True,
)
)
finally:
# Clean up the state
stream_set.discard(notify_deferred)
curr_stream_set = self._streams.get(name)
if curr_stream_set is not None and len(curr_stream_set) == 0:
self._streams.pop(name)
# Cancel the sleep if we were woken up
if call.active():
call.cancel()

View File

@@ -14,23 +14,27 @@
import logging
import random
from types import TracebackType
from typing import Any, Optional, Type
from typing import TYPE_CHECKING, Any, Optional, Type
import synapse.logging.context
from synapse.api.errors import CodeMessageException
from synapse.storage import DataStore
from synapse.util import Clock
if TYPE_CHECKING:
from synapse.notifier import Notifier
from synapse.replication.tcp.handler import ReplicationCommandHandler
logger = logging.getLogger(__name__)
# the initial backoff, after the first transaction fails
MIN_RETRY_INTERVAL = 10 * 60 * 1000
MIN_RETRY_INTERVAL = 30 * 1000
# how much we multiply the backoff by after each subsequent fail
RETRY_MULTIPLIER = 5
# a cap on the backoff. (Essentially none)
MAX_RETRY_INTERVAL = 2**62
# a cap on the backoff.
MAX_RETRY_INTERVAL = 30 * 1000
class NotRetryingDestination(Exception):
@@ -131,6 +135,8 @@ class RetryDestinationLimiter:
retry_interval: int,
backoff_on_404: bool = False,
backoff_on_failure: bool = True,
notifier: Optional["Notifier"] = None,
replication_client: Optional["ReplicationCommandHandler"] = None,
):
"""Marks the destination as "down" if an exception is thrown in the
context, except for CodeMessageException with code < 500.
@@ -160,6 +166,9 @@ class RetryDestinationLimiter:
self.backoff_on_404 = backoff_on_404
self.backoff_on_failure = backoff_on_failure
self.notifier = notifier
self.replication_client = replication_client
def __enter__(self) -> None:
pass
@@ -239,6 +248,19 @@ class RetryDestinationLimiter:
retry_last_ts,
self.retry_interval,
)
if self.notifier:
# Inform the relevant places that the remote server is back up.
self.notifier.notify_remote_server_up(self.destination)
if self.replication_client:
# If we're on a worker we try and inform master about this. The
# replication client doesn't hook into the notifier to avoid
# infinite loops where we send a `REMOTE_SERVER_UP` command to
# master, which then echoes it back to us which in turn pokes
# the notifier.
self.replication_client.send_remote_server_up(self.destination)
except Exception:
logger.exception("Failed to store destination_retry_timings")

View File

@@ -33,7 +33,7 @@ from synapse.http.matrixfederationclient import (
from synapse.logging.context import SENTINEL_CONTEXT, LoggingContext, current_context
from tests.server import FakeTransport
from tests.unittest import HomeserverTestCase
from tests.unittest import HomeserverTestCase, override_config
def check_logcontext(context):
@@ -617,3 +617,19 @@ class FederationClientTests(HomeserverTestCase):
self.assertIsInstance(f.value, RequestSendFailed)
self.assertTrue(transport.disconnecting)
@override_config(
{
"experimental_features": {
"max_long_retry_delay": 100,
"max_short_retry_delay": 7,
"max_long_retries": 20,
"max_short_retries": 5,
}
}
)
def test_configurable_retry_and_delay_values(self):
self.assertEqual(self.cl.max_long_retry_delay, 100)
self.assertEqual(self.cl.max_short_retry_delay, 7)
self.assertEqual(self.cl.max_long_retries, 20)
self.assertEqual(self.cl.max_short_retries, 5)

View File

@@ -28,6 +28,7 @@ from synapse.logging.context import (
make_deferred_yieldable,
)
from synapse.util.async_helpers import (
AwakenableSleeper,
ObservableDeferred,
concurrently_execute,
delay_cancellation,
@@ -35,6 +36,7 @@ from synapse.util.async_helpers import (
timeout_deferred,
)
from tests.server import get_clock
from tests.unittest import TestCase
@@ -467,3 +469,39 @@ class DelayCancellationTests(TestCase):
# logging context.
blocking_d.callback(None)
self.successResultOf(d)
class AwakenableSleeperTests(TestCase):
"Tests AwakenableSleeper"
def test_sleep(self):
reactor, _ = get_clock()
sleeper = AwakenableSleeper(reactor)
d = defer.ensureDeferred(sleeper.sleep("name", 1000))
reactor.pump([0.0])
self.assertFalse(d.called)
reactor.advance(0.5)
self.assertFalse(d.called)
reactor.advance(0.6)
self.assertTrue(d.called)
def test_explicit_wake(self):
reactor, _ = get_clock()
sleeper = AwakenableSleeper(reactor)
d = defer.ensureDeferred(sleeper.sleep("name", 1000))
reactor.pump([0.0])
self.assertFalse(d.called)
reactor.advance(0.5)
self.assertFalse(d.called)
sleeper.wake("name")
self.assertTrue(d.called)
reactor.advance(0.6)