mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Recover an appservice if a successful ping occurs. (#18521)
Fixes https://github.com/element-hq/synapse/issues/14240 This scratches an itch that i've had for years. We regularly run into the issue where (especially in development) appservices can go down for a period and them come back up. The ping endpoint was introduced some time ago which means Synapse can determine if an AS is up more or less immediately, so we might as well use that to schedule transaction redelivery. I believe transaction scheduling logic is largely implementation specific, so we should be in the clear to do this without any spec changes.
This commit is contained in:
1
changelog.d/18521.feature
Normal file
1
changelog.d/18521.feature
Normal file
@@ -0,0 +1 @@
|
||||
Successful requests to `/_matrix/app/v1/ping` will now force Synapse to reattempt delivering transactions to appservices.
|
||||
@@ -2,7 +2,7 @@
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright 2015, 2016 OpenMarket Ltd
|
||||
# Copyright (C) 2023 New Vector, Ltd
|
||||
# Copyright (C) 2023, 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
@@ -70,6 +70,8 @@ from typing import (
|
||||
Tuple,
|
||||
)
|
||||
|
||||
from twisted.internet.interfaces import IDelayedCall
|
||||
|
||||
from synapse.appservice import (
|
||||
ApplicationService,
|
||||
ApplicationServiceState,
|
||||
@@ -450,6 +452,20 @@ class _TransactionController:
|
||||
recoverer.recover()
|
||||
logger.info("Now %i active recoverers", len(self.recoverers))
|
||||
|
||||
def force_retry(self, service: ApplicationService) -> None:
|
||||
"""Forces a Recoverer to attempt delivery of transations immediately.
|
||||
|
||||
Args:
|
||||
service:
|
||||
"""
|
||||
recoverer = self.recoverers.get(service.id)
|
||||
if not recoverer:
|
||||
# No need to force a retry on a happy AS.
|
||||
logger.info(f"{service.id} is not in recovery, not forcing retry")
|
||||
return
|
||||
|
||||
recoverer.force_retry()
|
||||
|
||||
async def _is_service_up(self, service: ApplicationService) -> bool:
|
||||
state = await self.store.get_appservice_state(service)
|
||||
return state == ApplicationServiceState.UP or state is None
|
||||
@@ -482,11 +498,12 @@ class _Recoverer:
|
||||
self.service = service
|
||||
self.callback = callback
|
||||
self.backoff_counter = 1
|
||||
self.scheduled_recovery: Optional[IDelayedCall] = None
|
||||
|
||||
def recover(self) -> None:
|
||||
delay = 2**self.backoff_counter
|
||||
logger.info("Scheduling retries on %s in %fs", self.service.id, delay)
|
||||
self.clock.call_later(
|
||||
self.scheduled_recovery = self.clock.call_later(
|
||||
delay, run_as_background_process, "as-recoverer", self.retry
|
||||
)
|
||||
|
||||
@@ -496,6 +513,21 @@ class _Recoverer:
|
||||
self.backoff_counter += 1
|
||||
self.recover()
|
||||
|
||||
def force_retry(self) -> None:
|
||||
"""Cancels the existing timer and forces an immediate retry in the background.
|
||||
|
||||
Args:
|
||||
service:
|
||||
"""
|
||||
# Prevent the existing backoff from occuring
|
||||
if self.scheduled_recovery:
|
||||
self.clock.cancel_call_later(self.scheduled_recovery)
|
||||
# Run a retry, which will resechedule a recovery if it fails.
|
||||
run_as_background_process(
|
||||
"retry",
|
||||
self.retry,
|
||||
)
|
||||
|
||||
async def retry(self) -> None:
|
||||
logger.info("Starting retries on %s", self.service.id)
|
||||
try:
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright 2023 Tulir Asokan
|
||||
# Copyright (C) 2023 New Vector, Ltd
|
||||
# Copyright (C) 2023, 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
@@ -53,6 +53,7 @@ class AppservicePingRestServlet(RestServlet):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__()
|
||||
self.as_api = hs.get_application_service_api()
|
||||
self.scheduler = hs.get_application_service_scheduler()
|
||||
self.auth = hs.get_auth()
|
||||
|
||||
async def on_POST(
|
||||
@@ -85,6 +86,10 @@ class AppservicePingRestServlet(RestServlet):
|
||||
start = time.monotonic()
|
||||
try:
|
||||
await self.as_api.ping(requester.app_service, txn_id)
|
||||
|
||||
# We got a OK response, so if the AS needs to be recovered then lets recover it now.
|
||||
# This sets off a task in the background and so is safe to execute and forget.
|
||||
self.scheduler.txn_ctrl.force_retry(requester.app_service)
|
||||
except RequestTimedOutError as e:
|
||||
raise SynapseError(
|
||||
HTTPStatus.GATEWAY_TIMEOUT,
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright 2015, 2016 OpenMarket Ltd
|
||||
# Copyright (C) 2023 New Vector, Ltd
|
||||
# Copyright (C) 2023, 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
@@ -234,6 +234,41 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase):
|
||||
self.assertEqual(1, txn.complete.call_count)
|
||||
self.callback.assert_called_once_with(self.recoverer)
|
||||
|
||||
def test_recover_force_retry(self) -> None:
|
||||
txn = Mock()
|
||||
txns = [txn, None]
|
||||
pop_txn = False
|
||||
|
||||
def take_txn(
|
||||
*args: object, **kwargs: object
|
||||
) -> "defer.Deferred[Optional[Mock]]":
|
||||
if pop_txn:
|
||||
return defer.succeed(txns.pop(0))
|
||||
else:
|
||||
return defer.succeed(txn)
|
||||
|
||||
self.store.get_oldest_unsent_txn = Mock(side_effect=take_txn)
|
||||
|
||||
# Start the recovery, and then fail the first attempt.
|
||||
self.recoverer.recover()
|
||||
self.assertEqual(0, self.store.get_oldest_unsent_txn.call_count)
|
||||
txn.send = AsyncMock(return_value=False)
|
||||
txn.complete = AsyncMock(return_value=None)
|
||||
self.clock.advance_time(2)
|
||||
self.assertEqual(1, txn.send.call_count)
|
||||
self.assertEqual(0, txn.complete.call_count)
|
||||
self.assertEqual(0, self.callback.call_count)
|
||||
|
||||
# Now allow the send to succeed, and force a retry.
|
||||
pop_txn = True # returns the txn the first time, then no more.
|
||||
txn.send = AsyncMock(return_value=True) # successfully send the txn
|
||||
self.recoverer.force_retry()
|
||||
self.assertEqual(1, txn.send.call_count) # new mock reset call count
|
||||
self.assertEqual(1, txn.complete.call_count)
|
||||
|
||||
# Ensure we call the callback to say we're done!
|
||||
self.callback.assert_called_once_with(self.recoverer)
|
||||
|
||||
|
||||
# Corresponds to synapse.appservice.scheduler._TransactionController.send
|
||||
TxnCtrlArgs: TypeAlias = """
|
||||
|
||||
Reference in New Issue
Block a user