Compare commits

...

4 Commits

Author SHA1 Message Date
Erik Johnston
6002debbde Timeout Linearizer 2025-06-25 13:35:03 +01:00
Erik Johnston
4247fa48b5 Add test 2025-06-25 13:28:33 +01:00
Erik Johnston
b946b028bd Refactor test to use clock 2025-06-25 13:28:33 +01:00
Erik Johnston
f401976fd8 Add timeout and max queue size support 2025-06-25 13:28:33 +01:00
4 changed files with 157 additions and 21 deletions

View File

@@ -76,7 +76,13 @@ from synapse.types import (
create_requester, create_requester,
) )
from synapse.types.state import StateFilter from synapse.types.state import StateFilter
from synapse.util import json_decoder, json_encoder, log_failure, unwrapFirstError from synapse.util import (
Duration,
json_decoder,
json_encoder,
log_failure,
unwrapFirstError,
)
from synapse.util.async_helpers import Linearizer, gather_results from synapse.util.async_helpers import Linearizer, gather_results
from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import measure_func from synapse.util.metrics import measure_func
@@ -506,7 +512,13 @@ class EventCreationHandler:
# We limit concurrent event creation for a room to 1. This prevents state resolution # We limit concurrent event creation for a room to 1. This prevents state resolution
# from occurring when sending bursts of events to a local room # from occurring when sending bursts of events to a local room
self.limiter = Linearizer(max_count=1, name="room_event_creation_limit") self.limiter = Linearizer(
max_count=1,
name="room_event_creation_limit",
# We timeout queued requests after 90 seconds, as the client will
# likely have timed out by then.
timeout=90 * Duration.SECOND_MS,
)
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator() self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()

View File

@@ -59,7 +59,8 @@ class Duration:
"""Helper class that holds constants for common time durations in """Helper class that holds constants for common time durations in
milliseconds.""" milliseconds."""
MINUTE_MS = 60 * 1000 SECOND_MS = 1000
MINUTE_MS = 60 * SECOND_MS
HOUR_MS = 60 * MINUTE_MS HOUR_MS = 60 * MINUTE_MS
DAY_MS = 24 * HOUR_MS DAY_MS = 24 * HOUR_MS

View File

@@ -59,6 +59,7 @@ from twisted.internet.defer import CancelledError
from twisted.internet.interfaces import IReactorTime from twisted.internet.interfaces import IReactorTime
from twisted.python.failure import Failure from twisted.python.failure import Failure
from synapse.api.errors import SynapseError
from synapse.logging.context import ( from synapse.logging.context import (
PreserveLoggingContext, PreserveLoggingContext,
make_deferred_yieldable, make_deferred_yieldable,
@@ -535,10 +536,19 @@ class Linearizer:
name: Optional[str] = None, name: Optional[str] = None,
max_count: int = 1, max_count: int = 1,
clock: Optional[Clock] = None, clock: Optional[Clock] = None,
timeout: Optional[int] = None,
max_queue_size: Optional[int] = None,
limit_error_code: int = 503,
): ):
""" """
Args: Args:
max_count: The maximum number of concurrent accesses max_count: The maximum number of concurrent accesses
timeout: The maximum time to wait for a lock to be acquired, in
milliseconds. Optional.
max_queue_size: The maximum number of items for a given key that
can be queued up waiting for a lock. Optional.
limit_error_code: The HTTP error code to return when one of the
above limits is reached.
""" """
if name is None: if name is None:
self.name: Union[str, int] = id(self) self.name: Union[str, int] = id(self)
@@ -551,6 +561,9 @@ class Linearizer:
clock = Clock(cast(IReactorTime, reactor)) clock = Clock(cast(IReactorTime, reactor))
self._clock = clock self._clock = clock
self.max_count = max_count self.max_count = max_count
self._timeout = timeout
self._max_queue_size = max_queue_size
self._limit_error_code = limit_error_code
# key_to_defer is a map from the key to a _LinearizerEntry. # key_to_defer is a map from the key to a _LinearizerEntry.
self.key_to_defer: Dict[Hashable, _LinearizerEntry] = {} self.key_to_defer: Dict[Hashable, _LinearizerEntry] = {}
@@ -594,6 +607,17 @@ class Linearizer:
entry.count += 1 entry.count += 1
return entry return entry
# Check if the number of deferreds waiting for this key has reached the
# maximum queue size.
if self._max_queue_size and len(entry.deferreds) >= self._max_queue_size:
logger.warning(
"Linearizer %r for key %r has reached max queue size %d",
self.name,
key,
self._max_queue_size,
)
raise SynapseError(code=self._limit_error_code, msg="Limit exceeded")
# Otherwise, the number of things executing is at the maximum and we have to # Otherwise, the number of things executing is at the maximum and we have to
# add a deferred to the list of blocked items. # add a deferred to the list of blocked items.
# When one of the things currently executing finishes it will callback # When one of the things currently executing finishes it will callback
@@ -604,7 +628,22 @@ class Linearizer:
entry.deferreds[new_defer] = 1 entry.deferreds[new_defer] = 1
try: try:
if self._timeout:
await timeout_deferred(
new_defer,
timeout=self._timeout / 1000,
reactor=self._clock._reactor,
)
else:
await new_defer await new_defer
except defer.TimeoutError:
logger.warning(
"Timed out waiting for linearizer lock %r for key %r",
self.name,
key,
)
del entry.deferreds[new_defer]
raise SynapseError(code=self._limit_error_code, msg="Limit exceeded")
except Exception as e: except Exception as e:
logger.info("defer %r got err %r", new_defer, e) logger.info("defer %r got err %r", new_defer, e)
if isinstance(e, CancelledError): if isinstance(e, CancelledError):
@@ -653,12 +692,14 @@ class Linearizer:
# blocked waiting to execute and start one of them # blocked waiting to execute and start one of them
entry.count -= 1 entry.count -= 1
# Find the first deferred in the list that is pending completion and
# call it.
if entry.deferreds: if entry.deferreds:
(next_def, _) = entry.deferreds.popitem(last=False) (next_def, _) = entry.deferreds.popitem(last=False)
# we need to run the next thing in the sentinel context. # we need to run the next thing in the sentinel context.
with PreserveLoggingContext(): with PreserveLoggingContext():
next_def.callback(None) next_def.callback(None)
elif entry.count == 0: elif entry.count == 0:
# We were the last thing for this key: remove it from the # We were the last thing for this key: remove it from the
# map. # map.

View File

@@ -21,14 +21,15 @@
from typing import Hashable, Protocol, Tuple from typing import Hashable, Protocol, Tuple
from twisted.internet import defer, reactor from twisted.internet import defer
from twisted.internet.base import ReactorBase
from twisted.internet.defer import CancelledError, Deferred from twisted.internet.defer import CancelledError, Deferred
from synapse.api.errors import SynapseError
from synapse.logging.context import LoggingContext, current_context from synapse.logging.context import LoggingContext, current_context
from synapse.util.async_helpers import Linearizer from synapse.util.async_helpers import Linearizer
from tests import unittest from tests import unittest
from tests.server import get_clock
class UnblockFunction(Protocol): class UnblockFunction(Protocol):
@@ -36,6 +37,9 @@ class UnblockFunction(Protocol):
class LinearizerTestCase(unittest.TestCase): class LinearizerTestCase(unittest.TestCase):
def setUp(self) -> None:
self.reactor, self.clock = get_clock()
def _start_task( def _start_task(
self, linearizer: Linearizer, key: Hashable self, linearizer: Linearizer, key: Hashable
) -> Tuple["Deferred[None]", "Deferred[None]", UnblockFunction]: ) -> Tuple["Deferred[None]", "Deferred[None]", UnblockFunction]:
@@ -67,19 +71,13 @@ class LinearizerTestCase(unittest.TestCase):
# The next task, if it exists, will acquire the lock and require a kick of # The next task, if it exists, will acquire the lock and require a kick of
# the reactor to advance. # the reactor to advance.
if pump_reactor: if pump_reactor:
self._pump() self.reactor.pump([0.0])
return d, acquired_d, unblock return d, acquired_d, unblock
def _pump(self) -> None:
"""Pump the reactor to advance `Linearizer`s."""
assert isinstance(reactor, ReactorBase)
while reactor.getDelayedCalls():
reactor.runUntilCurrent()
def test_linearizer(self) -> None: def test_linearizer(self) -> None:
"""Tests that a task is queued up behind an earlier task.""" """Tests that a task is queued up behind an earlier task."""
linearizer = Linearizer() linearizer = Linearizer(clock=self.clock)
key = object() key = object()
@@ -100,7 +98,7 @@ class LinearizerTestCase(unittest.TestCase):
Runs through the same scenario as `test_linearizer`. Runs through the same scenario as `test_linearizer`.
""" """
linearizer = Linearizer() linearizer = Linearizer(clock=self.clock)
key = object() key = object()
@@ -131,7 +129,7 @@ class LinearizerTestCase(unittest.TestCase):
The stack should *not* explode when the slow thing completes. The stack should *not* explode when the slow thing completes.
""" """
linearizer = Linearizer() linearizer = Linearizer(clock=self.clock)
key = "" key = ""
async def func(i: int) -> None: async def func(i: int) -> None:
@@ -151,7 +149,7 @@ class LinearizerTestCase(unittest.TestCase):
def test_multiple_entries(self) -> None: def test_multiple_entries(self) -> None:
"""Tests a `Linearizer` with a concurrency above 1.""" """Tests a `Linearizer` with a concurrency above 1."""
limiter = Linearizer(max_count=3) limiter = Linearizer(clock=self.clock, max_count=3)
key = object() key = object()
@@ -192,7 +190,7 @@ class LinearizerTestCase(unittest.TestCase):
def test_cancellation(self) -> None: def test_cancellation(self) -> None:
"""Tests cancellation while waiting for a `Linearizer`.""" """Tests cancellation while waiting for a `Linearizer`."""
linearizer = Linearizer() linearizer = Linearizer(clock=self.clock)
key = object() key = object()
@@ -226,7 +224,7 @@ class LinearizerTestCase(unittest.TestCase):
def test_cancellation_during_sleep(self) -> None: def test_cancellation_during_sleep(self) -> None:
"""Tests cancellation during the sleep just after waiting for a `Linearizer`.""" """Tests cancellation during the sleep just after waiting for a `Linearizer`."""
linearizer = Linearizer() linearizer = Linearizer(clock=self.clock)
key = object() key = object()
@@ -246,7 +244,7 @@ class LinearizerTestCase(unittest.TestCase):
unblock1(pump_reactor=False) unblock1(pump_reactor=False)
self.successResultOf(d1) self.successResultOf(d1)
d2.cancel() d2.cancel()
self._pump() self.reactor.pump([0.0])
self.assertTrue(d2.called) self.assertTrue(d2.called)
self.failureResultOf(d2, CancelledError) self.failureResultOf(d2, CancelledError)
@@ -258,3 +256,87 @@ class LinearizerTestCase(unittest.TestCase):
) )
unblock3() unblock3()
self.successResultOf(d3) self.successResultOf(d3)
def test_timeout(self) -> None:
"""Test the `Linearizer` timeout behaviour."""
linearizer = Linearizer(
clock=self.clock,
timeout=10_000,
limit_error_code=999,
)
key = object()
d1, acquired_d1, unblock1 = self._start_task(linearizer, key)
self.assertTrue(acquired_d1.called)
# Create a second task, waiting for the first task.
d2, acquired_d2, _ = self._start_task(linearizer, key)
self.assertFalse(acquired_d2.called)
self.assertFalse(d2.called)
# Wait for the timeout to occur.
self.reactor.advance(20_000)
# We should have received a timeout error for the second task, and *not*
# acquired the lock.
f = self.failureResultOf(d2, SynapseError)
self.assertEqual(f.value.code, 999)
self.assertFalse(acquired_d2.called)
# The first task should still be running.
self.assertFalse(d1.called)
# Create a third task, waiting for the first task.
d3, acquired_d3, _ = self._start_task(linearizer, key)
self.assertFalse(acquired_d3.called)
self.assertFalse(acquired_d2.called)
# Unblock the first task.
unblock1()
self.successResultOf(d1)
self.assertFalse(acquired_d2.called)
# The third task should have started running.
self.assertTrue(acquired_d3.called)
def test_max_queue_size(self) -> None:
"""Test the `Linearizer` max queue size behaviour."""
linearizer = Linearizer(
clock=self.clock,
max_queue_size=2,
limit_error_code=999,
)
key = object()
# Start three tasks.
d1, acquired_d1, unblock1 = self._start_task(linearizer, key)
d2, acquired_d2, unblock2 = self._start_task(linearizer, key)
d3, _, unblock3 = self._start_task(linearizer, key)
d4, _, _ = self._start_task(linearizer, key)
self.assertTrue(acquired_d1.called)
self.assertFalse(d1.called)
self.assertFalse(d2.called)
self.assertFalse(d3.called)
# The fourth task should have been rejected.
self.failureResultOf(d4, SynapseError)
# Unblock the first task.
unblock1()
# Second task should now be running.
self.assertTrue(acquired_d2.called)
# Adding one more task should succeed, but further tasks should be rejected.
d5, acquired_d5, _ = self._start_task(linearizer, key)
self.assertFalse(d5.called)
d6, _, _ = self._start_task(linearizer, key)
self.failureResultOf(d6, SynapseError)
# Unblock the second and third task should cause the fifth task to start running.
unblock2()
self.assertTrue(d2.called)
unblock3()
self.assertTrue(d3.called)
self.assertTrue(acquired_d5.called)