Compare commits

...

4 Commits

Author SHA1 Message Date
Erik Johnston
6002debbde Timeout Linearizer 2025-06-25 13:35:03 +01:00
Erik Johnston
4247fa48b5 Add test 2025-06-25 13:28:33 +01:00
Erik Johnston
b946b028bd Refactor test to use clock 2025-06-25 13:28:33 +01:00
Erik Johnston
f401976fd8 Add timeout and max queue size support 2025-06-25 13:28:33 +01:00
4 changed files with 157 additions and 21 deletions

View File

@@ -76,7 +76,13 @@ from synapse.types import (
create_requester,
)
from synapse.types.state import StateFilter
from synapse.util import json_decoder, json_encoder, log_failure, unwrapFirstError
from synapse.util import (
Duration,
json_decoder,
json_encoder,
log_failure,
unwrapFirstError,
)
from synapse.util.async_helpers import Linearizer, gather_results
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import measure_func
@@ -506,7 +512,13 @@ class EventCreationHandler:
# We limit concurrent event creation for a room to 1. This prevents state resolution
# from occurring when sending bursts of events to a local room
self.limiter = Linearizer(max_count=1, name="room_event_creation_limit")
self.limiter = Linearizer(
max_count=1,
name="room_event_creation_limit",
# We timeout queued requests after 90 seconds, as the client will
# likely have timed out by then.
timeout=90 * Duration.SECOND_MS,
)
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()

View File

@@ -59,7 +59,8 @@ class Duration:
"""Helper class that holds constants for common time durations in
milliseconds."""
MINUTE_MS = 60 * 1000
SECOND_MS = 1000
MINUTE_MS = 60 * SECOND_MS
HOUR_MS = 60 * MINUTE_MS
DAY_MS = 24 * HOUR_MS

View File

@@ -59,6 +59,7 @@ from twisted.internet.defer import CancelledError
from twisted.internet.interfaces import IReactorTime
from twisted.python.failure import Failure
from synapse.api.errors import SynapseError
from synapse.logging.context import (
PreserveLoggingContext,
make_deferred_yieldable,
@@ -535,10 +536,19 @@ class Linearizer:
name: Optional[str] = None,
max_count: int = 1,
clock: Optional[Clock] = None,
timeout: Optional[int] = None,
max_queue_size: Optional[int] = None,
limit_error_code: int = 503,
):
"""
Args:
max_count: The maximum number of concurrent accesses
timeout: The maximum time to wait for a lock to be acquired, in
milliseconds. Optional.
max_queue_size: The maximum number of items for a given key that
can be queued up waiting for a lock. Optional.
limit_error_code: The HTTP error code to return when one of the
above limits is reached.
"""
if name is None:
self.name: Union[str, int] = id(self)
@@ -551,6 +561,9 @@ class Linearizer:
clock = Clock(cast(IReactorTime, reactor))
self._clock = clock
self.max_count = max_count
self._timeout = timeout
self._max_queue_size = max_queue_size
self._limit_error_code = limit_error_code
# key_to_defer is a map from the key to a _LinearizerEntry.
self.key_to_defer: Dict[Hashable, _LinearizerEntry] = {}
@@ -594,6 +607,17 @@ class Linearizer:
entry.count += 1
return entry
# Check if the number of deferreds waiting for this key has reached the
# maximum queue size.
if self._max_queue_size and len(entry.deferreds) >= self._max_queue_size:
logger.warning(
"Linearizer %r for key %r has reached max queue size %d",
self.name,
key,
self._max_queue_size,
)
raise SynapseError(code=self._limit_error_code, msg="Limit exceeded")
# Otherwise, the number of things executing is at the maximum and we have to
# add a deferred to the list of blocked items.
# When one of the things currently executing finishes it will callback
@@ -604,7 +628,22 @@ class Linearizer:
entry.deferreds[new_defer] = 1
try:
await new_defer
if self._timeout:
await timeout_deferred(
new_defer,
timeout=self._timeout / 1000,
reactor=self._clock._reactor,
)
else:
await new_defer
except defer.TimeoutError:
logger.warning(
"Timed out waiting for linearizer lock %r for key %r",
self.name,
key,
)
del entry.deferreds[new_defer]
raise SynapseError(code=self._limit_error_code, msg="Limit exceeded")
except Exception as e:
logger.info("defer %r got err %r", new_defer, e)
if isinstance(e, CancelledError):
@@ -653,12 +692,14 @@ class Linearizer:
# blocked waiting to execute and start one of them
entry.count -= 1
# Find the first deferred in the list that is pending completion and
# call it.
if entry.deferreds:
(next_def, _) = entry.deferreds.popitem(last=False)
# we need to run the next thing in the sentinel context.
with PreserveLoggingContext():
next_def.callback(None)
elif entry.count == 0:
# We were the last thing for this key: remove it from the
# map.

View File

@@ -21,14 +21,15 @@
from typing import Hashable, Protocol, Tuple
from twisted.internet import defer, reactor
from twisted.internet.base import ReactorBase
from twisted.internet import defer
from twisted.internet.defer import CancelledError, Deferred
from synapse.api.errors import SynapseError
from synapse.logging.context import LoggingContext, current_context
from synapse.util.async_helpers import Linearizer
from tests import unittest
from tests.server import get_clock
class UnblockFunction(Protocol):
@@ -36,6 +37,9 @@ class UnblockFunction(Protocol):
class LinearizerTestCase(unittest.TestCase):
def setUp(self) -> None:
self.reactor, self.clock = get_clock()
def _start_task(
self, linearizer: Linearizer, key: Hashable
) -> Tuple["Deferred[None]", "Deferred[None]", UnblockFunction]:
@@ -67,19 +71,13 @@ class LinearizerTestCase(unittest.TestCase):
# The next task, if it exists, will acquire the lock and require a kick of
# the reactor to advance.
if pump_reactor:
self._pump()
self.reactor.pump([0.0])
return d, acquired_d, unblock
def _pump(self) -> None:
"""Pump the reactor to advance `Linearizer`s."""
assert isinstance(reactor, ReactorBase)
while reactor.getDelayedCalls():
reactor.runUntilCurrent()
def test_linearizer(self) -> None:
"""Tests that a task is queued up behind an earlier task."""
linearizer = Linearizer()
linearizer = Linearizer(clock=self.clock)
key = object()
@@ -100,7 +98,7 @@ class LinearizerTestCase(unittest.TestCase):
Runs through the same scenario as `test_linearizer`.
"""
linearizer = Linearizer()
linearizer = Linearizer(clock=self.clock)
key = object()
@@ -131,7 +129,7 @@ class LinearizerTestCase(unittest.TestCase):
The stack should *not* explode when the slow thing completes.
"""
linearizer = Linearizer()
linearizer = Linearizer(clock=self.clock)
key = ""
async def func(i: int) -> None:
@@ -151,7 +149,7 @@ class LinearizerTestCase(unittest.TestCase):
def test_multiple_entries(self) -> None:
"""Tests a `Linearizer` with a concurrency above 1."""
limiter = Linearizer(max_count=3)
limiter = Linearizer(clock=self.clock, max_count=3)
key = object()
@@ -192,7 +190,7 @@ class LinearizerTestCase(unittest.TestCase):
def test_cancellation(self) -> None:
"""Tests cancellation while waiting for a `Linearizer`."""
linearizer = Linearizer()
linearizer = Linearizer(clock=self.clock)
key = object()
@@ -226,7 +224,7 @@ class LinearizerTestCase(unittest.TestCase):
def test_cancellation_during_sleep(self) -> None:
"""Tests cancellation during the sleep just after waiting for a `Linearizer`."""
linearizer = Linearizer()
linearizer = Linearizer(clock=self.clock)
key = object()
@@ -246,7 +244,7 @@ class LinearizerTestCase(unittest.TestCase):
unblock1(pump_reactor=False)
self.successResultOf(d1)
d2.cancel()
self._pump()
self.reactor.pump([0.0])
self.assertTrue(d2.called)
self.failureResultOf(d2, CancelledError)
@@ -258,3 +256,87 @@ class LinearizerTestCase(unittest.TestCase):
)
unblock3()
self.successResultOf(d3)
def test_timeout(self) -> None:
"""Test the `Linearizer` timeout behaviour."""
linearizer = Linearizer(
clock=self.clock,
timeout=10_000,
limit_error_code=999,
)
key = object()
d1, acquired_d1, unblock1 = self._start_task(linearizer, key)
self.assertTrue(acquired_d1.called)
# Create a second task, waiting for the first task.
d2, acquired_d2, _ = self._start_task(linearizer, key)
self.assertFalse(acquired_d2.called)
self.assertFalse(d2.called)
# Wait for the timeout to occur.
self.reactor.advance(20_000)
# We should have received a timeout error for the second task, and *not*
# acquired the lock.
f = self.failureResultOf(d2, SynapseError)
self.assertEqual(f.value.code, 999)
self.assertFalse(acquired_d2.called)
# The first task should still be running.
self.assertFalse(d1.called)
# Create a third task, waiting for the first task.
d3, acquired_d3, _ = self._start_task(linearizer, key)
self.assertFalse(acquired_d3.called)
self.assertFalse(acquired_d2.called)
# Unblock the first task.
unblock1()
self.successResultOf(d1)
self.assertFalse(acquired_d2.called)
# The third task should have started running.
self.assertTrue(acquired_d3.called)
def test_max_queue_size(self) -> None:
"""Test the `Linearizer` max queue size behaviour."""
linearizer = Linearizer(
clock=self.clock,
max_queue_size=2,
limit_error_code=999,
)
key = object()
# Start three tasks.
d1, acquired_d1, unblock1 = self._start_task(linearizer, key)
d2, acquired_d2, unblock2 = self._start_task(linearizer, key)
d3, _, unblock3 = self._start_task(linearizer, key)
d4, _, _ = self._start_task(linearizer, key)
self.assertTrue(acquired_d1.called)
self.assertFalse(d1.called)
self.assertFalse(d2.called)
self.assertFalse(d3.called)
# The fourth task should have been rejected.
self.failureResultOf(d4, SynapseError)
# Unblock the first task.
unblock1()
# Second task should now be running.
self.assertTrue(acquired_d2.called)
# Adding one more task should succeed, but further tasks should be rejected.
d5, acquired_d5, _ = self._start_task(linearizer, key)
self.assertFalse(d5.called)
d6, _, _ = self._start_task(linearizer, key)
self.failureResultOf(d6, SynapseError)
# Unblock the second and third task should cause the fifth task to start running.
unblock2()
self.assertTrue(d2.called)
unblock3()
self.assertTrue(d3.called)
self.assertTrue(acquired_d5.called)