mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-13 01:50:46 +00:00
Compare commits
4 Commits
quenting/t
...
erikj/bett
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6002debbde | ||
|
|
4247fa48b5 | ||
|
|
b946b028bd | ||
|
|
f401976fd8 |
@@ -76,7 +76,13 @@ from synapse.types import (
|
|||||||
create_requester,
|
create_requester,
|
||||||
)
|
)
|
||||||
from synapse.types.state import StateFilter
|
from synapse.types.state import StateFilter
|
||||||
from synapse.util import json_decoder, json_encoder, log_failure, unwrapFirstError
|
from synapse.util import (
|
||||||
|
Duration,
|
||||||
|
json_decoder,
|
||||||
|
json_encoder,
|
||||||
|
log_failure,
|
||||||
|
unwrapFirstError,
|
||||||
|
)
|
||||||
from synapse.util.async_helpers import Linearizer, gather_results
|
from synapse.util.async_helpers import Linearizer, gather_results
|
||||||
from synapse.util.caches.expiringcache import ExpiringCache
|
from synapse.util.caches.expiringcache import ExpiringCache
|
||||||
from synapse.util.metrics import measure_func
|
from synapse.util.metrics import measure_func
|
||||||
@@ -506,7 +512,13 @@ class EventCreationHandler:
|
|||||||
|
|
||||||
# We limit concurrent event creation for a room to 1. This prevents state resolution
|
# We limit concurrent event creation for a room to 1. This prevents state resolution
|
||||||
# from occurring when sending bursts of events to a local room
|
# from occurring when sending bursts of events to a local room
|
||||||
self.limiter = Linearizer(max_count=1, name="room_event_creation_limit")
|
self.limiter = Linearizer(
|
||||||
|
max_count=1,
|
||||||
|
name="room_event_creation_limit",
|
||||||
|
# We timeout queued requests after 90 seconds, as the client will
|
||||||
|
# likely have timed out by then.
|
||||||
|
timeout=90 * Duration.SECOND_MS,
|
||||||
|
)
|
||||||
|
|
||||||
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
|
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
|
||||||
|
|
||||||
|
|||||||
@@ -59,7 +59,8 @@ class Duration:
|
|||||||
"""Helper class that holds constants for common time durations in
|
"""Helper class that holds constants for common time durations in
|
||||||
milliseconds."""
|
milliseconds."""
|
||||||
|
|
||||||
MINUTE_MS = 60 * 1000
|
SECOND_MS = 1000
|
||||||
|
MINUTE_MS = 60 * SECOND_MS
|
||||||
HOUR_MS = 60 * MINUTE_MS
|
HOUR_MS = 60 * MINUTE_MS
|
||||||
DAY_MS = 24 * HOUR_MS
|
DAY_MS = 24 * HOUR_MS
|
||||||
|
|
||||||
|
|||||||
@@ -59,6 +59,7 @@ from twisted.internet.defer import CancelledError
|
|||||||
from twisted.internet.interfaces import IReactorTime
|
from twisted.internet.interfaces import IReactorTime
|
||||||
from twisted.python.failure import Failure
|
from twisted.python.failure import Failure
|
||||||
|
|
||||||
|
from synapse.api.errors import SynapseError
|
||||||
from synapse.logging.context import (
|
from synapse.logging.context import (
|
||||||
PreserveLoggingContext,
|
PreserveLoggingContext,
|
||||||
make_deferred_yieldable,
|
make_deferred_yieldable,
|
||||||
@@ -535,10 +536,19 @@ class Linearizer:
|
|||||||
name: Optional[str] = None,
|
name: Optional[str] = None,
|
||||||
max_count: int = 1,
|
max_count: int = 1,
|
||||||
clock: Optional[Clock] = None,
|
clock: Optional[Clock] = None,
|
||||||
|
timeout: Optional[int] = None,
|
||||||
|
max_queue_size: Optional[int] = None,
|
||||||
|
limit_error_code: int = 503,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
max_count: The maximum number of concurrent accesses
|
max_count: The maximum number of concurrent accesses
|
||||||
|
timeout: The maximum time to wait for a lock to be acquired, in
|
||||||
|
milliseconds. Optional.
|
||||||
|
max_queue_size: The maximum number of items for a given key that
|
||||||
|
can be queued up waiting for a lock. Optional.
|
||||||
|
limit_error_code: The HTTP error code to return when one of the
|
||||||
|
above limits is reached.
|
||||||
"""
|
"""
|
||||||
if name is None:
|
if name is None:
|
||||||
self.name: Union[str, int] = id(self)
|
self.name: Union[str, int] = id(self)
|
||||||
@@ -551,6 +561,9 @@ class Linearizer:
|
|||||||
clock = Clock(cast(IReactorTime, reactor))
|
clock = Clock(cast(IReactorTime, reactor))
|
||||||
self._clock = clock
|
self._clock = clock
|
||||||
self.max_count = max_count
|
self.max_count = max_count
|
||||||
|
self._timeout = timeout
|
||||||
|
self._max_queue_size = max_queue_size
|
||||||
|
self._limit_error_code = limit_error_code
|
||||||
|
|
||||||
# key_to_defer is a map from the key to a _LinearizerEntry.
|
# key_to_defer is a map from the key to a _LinearizerEntry.
|
||||||
self.key_to_defer: Dict[Hashable, _LinearizerEntry] = {}
|
self.key_to_defer: Dict[Hashable, _LinearizerEntry] = {}
|
||||||
@@ -594,6 +607,17 @@ class Linearizer:
|
|||||||
entry.count += 1
|
entry.count += 1
|
||||||
return entry
|
return entry
|
||||||
|
|
||||||
|
# Check if the number of deferreds waiting for this key has reached the
|
||||||
|
# maximum queue size.
|
||||||
|
if self._max_queue_size and len(entry.deferreds) >= self._max_queue_size:
|
||||||
|
logger.warning(
|
||||||
|
"Linearizer %r for key %r has reached max queue size %d",
|
||||||
|
self.name,
|
||||||
|
key,
|
||||||
|
self._max_queue_size,
|
||||||
|
)
|
||||||
|
raise SynapseError(code=self._limit_error_code, msg="Limit exceeded")
|
||||||
|
|
||||||
# Otherwise, the number of things executing is at the maximum and we have to
|
# Otherwise, the number of things executing is at the maximum and we have to
|
||||||
# add a deferred to the list of blocked items.
|
# add a deferred to the list of blocked items.
|
||||||
# When one of the things currently executing finishes it will callback
|
# When one of the things currently executing finishes it will callback
|
||||||
@@ -604,7 +628,22 @@ class Linearizer:
|
|||||||
entry.deferreds[new_defer] = 1
|
entry.deferreds[new_defer] = 1
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
if self._timeout:
|
||||||
|
await timeout_deferred(
|
||||||
|
new_defer,
|
||||||
|
timeout=self._timeout / 1000,
|
||||||
|
reactor=self._clock._reactor,
|
||||||
|
)
|
||||||
|
else:
|
||||||
await new_defer
|
await new_defer
|
||||||
|
except defer.TimeoutError:
|
||||||
|
logger.warning(
|
||||||
|
"Timed out waiting for linearizer lock %r for key %r",
|
||||||
|
self.name,
|
||||||
|
key,
|
||||||
|
)
|
||||||
|
del entry.deferreds[new_defer]
|
||||||
|
raise SynapseError(code=self._limit_error_code, msg="Limit exceeded")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.info("defer %r got err %r", new_defer, e)
|
logger.info("defer %r got err %r", new_defer, e)
|
||||||
if isinstance(e, CancelledError):
|
if isinstance(e, CancelledError):
|
||||||
@@ -653,12 +692,14 @@ class Linearizer:
|
|||||||
# blocked waiting to execute and start one of them
|
# blocked waiting to execute and start one of them
|
||||||
entry.count -= 1
|
entry.count -= 1
|
||||||
|
|
||||||
|
# Find the first deferred in the list that is pending completion and
|
||||||
|
# call it.
|
||||||
if entry.deferreds:
|
if entry.deferreds:
|
||||||
(next_def, _) = entry.deferreds.popitem(last=False)
|
(next_def, _) = entry.deferreds.popitem(last=False)
|
||||||
|
|
||||||
# we need to run the next thing in the sentinel context.
|
# we need to run the next thing in the sentinel context.
|
||||||
with PreserveLoggingContext():
|
with PreserveLoggingContext():
|
||||||
next_def.callback(None)
|
next_def.callback(None)
|
||||||
|
|
||||||
elif entry.count == 0:
|
elif entry.count == 0:
|
||||||
# We were the last thing for this key: remove it from the
|
# We were the last thing for this key: remove it from the
|
||||||
# map.
|
# map.
|
||||||
|
|||||||
@@ -21,14 +21,15 @@
|
|||||||
|
|
||||||
from typing import Hashable, Protocol, Tuple
|
from typing import Hashable, Protocol, Tuple
|
||||||
|
|
||||||
from twisted.internet import defer, reactor
|
from twisted.internet import defer
|
||||||
from twisted.internet.base import ReactorBase
|
|
||||||
from twisted.internet.defer import CancelledError, Deferred
|
from twisted.internet.defer import CancelledError, Deferred
|
||||||
|
|
||||||
|
from synapse.api.errors import SynapseError
|
||||||
from synapse.logging.context import LoggingContext, current_context
|
from synapse.logging.context import LoggingContext, current_context
|
||||||
from synapse.util.async_helpers import Linearizer
|
from synapse.util.async_helpers import Linearizer
|
||||||
|
|
||||||
from tests import unittest
|
from tests import unittest
|
||||||
|
from tests.server import get_clock
|
||||||
|
|
||||||
|
|
||||||
class UnblockFunction(Protocol):
|
class UnblockFunction(Protocol):
|
||||||
@@ -36,6 +37,9 @@ class UnblockFunction(Protocol):
|
|||||||
|
|
||||||
|
|
||||||
class LinearizerTestCase(unittest.TestCase):
|
class LinearizerTestCase(unittest.TestCase):
|
||||||
|
def setUp(self) -> None:
|
||||||
|
self.reactor, self.clock = get_clock()
|
||||||
|
|
||||||
def _start_task(
|
def _start_task(
|
||||||
self, linearizer: Linearizer, key: Hashable
|
self, linearizer: Linearizer, key: Hashable
|
||||||
) -> Tuple["Deferred[None]", "Deferred[None]", UnblockFunction]:
|
) -> Tuple["Deferred[None]", "Deferred[None]", UnblockFunction]:
|
||||||
@@ -67,19 +71,13 @@ class LinearizerTestCase(unittest.TestCase):
|
|||||||
# The next task, if it exists, will acquire the lock and require a kick of
|
# The next task, if it exists, will acquire the lock and require a kick of
|
||||||
# the reactor to advance.
|
# the reactor to advance.
|
||||||
if pump_reactor:
|
if pump_reactor:
|
||||||
self._pump()
|
self.reactor.pump([0.0])
|
||||||
|
|
||||||
return d, acquired_d, unblock
|
return d, acquired_d, unblock
|
||||||
|
|
||||||
def _pump(self) -> None:
|
|
||||||
"""Pump the reactor to advance `Linearizer`s."""
|
|
||||||
assert isinstance(reactor, ReactorBase)
|
|
||||||
while reactor.getDelayedCalls():
|
|
||||||
reactor.runUntilCurrent()
|
|
||||||
|
|
||||||
def test_linearizer(self) -> None:
|
def test_linearizer(self) -> None:
|
||||||
"""Tests that a task is queued up behind an earlier task."""
|
"""Tests that a task is queued up behind an earlier task."""
|
||||||
linearizer = Linearizer()
|
linearizer = Linearizer(clock=self.clock)
|
||||||
|
|
||||||
key = object()
|
key = object()
|
||||||
|
|
||||||
@@ -100,7 +98,7 @@ class LinearizerTestCase(unittest.TestCase):
|
|||||||
|
|
||||||
Runs through the same scenario as `test_linearizer`.
|
Runs through the same scenario as `test_linearizer`.
|
||||||
"""
|
"""
|
||||||
linearizer = Linearizer()
|
linearizer = Linearizer(clock=self.clock)
|
||||||
|
|
||||||
key = object()
|
key = object()
|
||||||
|
|
||||||
@@ -131,7 +129,7 @@ class LinearizerTestCase(unittest.TestCase):
|
|||||||
|
|
||||||
The stack should *not* explode when the slow thing completes.
|
The stack should *not* explode when the slow thing completes.
|
||||||
"""
|
"""
|
||||||
linearizer = Linearizer()
|
linearizer = Linearizer(clock=self.clock)
|
||||||
key = ""
|
key = ""
|
||||||
|
|
||||||
async def func(i: int) -> None:
|
async def func(i: int) -> None:
|
||||||
@@ -151,7 +149,7 @@ class LinearizerTestCase(unittest.TestCase):
|
|||||||
|
|
||||||
def test_multiple_entries(self) -> None:
|
def test_multiple_entries(self) -> None:
|
||||||
"""Tests a `Linearizer` with a concurrency above 1."""
|
"""Tests a `Linearizer` with a concurrency above 1."""
|
||||||
limiter = Linearizer(max_count=3)
|
limiter = Linearizer(clock=self.clock, max_count=3)
|
||||||
|
|
||||||
key = object()
|
key = object()
|
||||||
|
|
||||||
@@ -192,7 +190,7 @@ class LinearizerTestCase(unittest.TestCase):
|
|||||||
|
|
||||||
def test_cancellation(self) -> None:
|
def test_cancellation(self) -> None:
|
||||||
"""Tests cancellation while waiting for a `Linearizer`."""
|
"""Tests cancellation while waiting for a `Linearizer`."""
|
||||||
linearizer = Linearizer()
|
linearizer = Linearizer(clock=self.clock)
|
||||||
|
|
||||||
key = object()
|
key = object()
|
||||||
|
|
||||||
@@ -226,7 +224,7 @@ class LinearizerTestCase(unittest.TestCase):
|
|||||||
|
|
||||||
def test_cancellation_during_sleep(self) -> None:
|
def test_cancellation_during_sleep(self) -> None:
|
||||||
"""Tests cancellation during the sleep just after waiting for a `Linearizer`."""
|
"""Tests cancellation during the sleep just after waiting for a `Linearizer`."""
|
||||||
linearizer = Linearizer()
|
linearizer = Linearizer(clock=self.clock)
|
||||||
|
|
||||||
key = object()
|
key = object()
|
||||||
|
|
||||||
@@ -246,7 +244,7 @@ class LinearizerTestCase(unittest.TestCase):
|
|||||||
unblock1(pump_reactor=False)
|
unblock1(pump_reactor=False)
|
||||||
self.successResultOf(d1)
|
self.successResultOf(d1)
|
||||||
d2.cancel()
|
d2.cancel()
|
||||||
self._pump()
|
self.reactor.pump([0.0])
|
||||||
|
|
||||||
self.assertTrue(d2.called)
|
self.assertTrue(d2.called)
|
||||||
self.failureResultOf(d2, CancelledError)
|
self.failureResultOf(d2, CancelledError)
|
||||||
@@ -258,3 +256,87 @@ class LinearizerTestCase(unittest.TestCase):
|
|||||||
)
|
)
|
||||||
unblock3()
|
unblock3()
|
||||||
self.successResultOf(d3)
|
self.successResultOf(d3)
|
||||||
|
|
||||||
|
def test_timeout(self) -> None:
|
||||||
|
"""Test the `Linearizer` timeout behaviour."""
|
||||||
|
linearizer = Linearizer(
|
||||||
|
clock=self.clock,
|
||||||
|
timeout=10_000,
|
||||||
|
limit_error_code=999,
|
||||||
|
)
|
||||||
|
key = object()
|
||||||
|
d1, acquired_d1, unblock1 = self._start_task(linearizer, key)
|
||||||
|
self.assertTrue(acquired_d1.called)
|
||||||
|
|
||||||
|
# Create a second task, waiting for the first task.
|
||||||
|
d2, acquired_d2, _ = self._start_task(linearizer, key)
|
||||||
|
self.assertFalse(acquired_d2.called)
|
||||||
|
self.assertFalse(d2.called)
|
||||||
|
|
||||||
|
# Wait for the timeout to occur.
|
||||||
|
self.reactor.advance(20_000)
|
||||||
|
|
||||||
|
# We should have received a timeout error for the second task, and *not*
|
||||||
|
# acquired the lock.
|
||||||
|
f = self.failureResultOf(d2, SynapseError)
|
||||||
|
self.assertEqual(f.value.code, 999)
|
||||||
|
self.assertFalse(acquired_d2.called)
|
||||||
|
|
||||||
|
# The first task should still be running.
|
||||||
|
self.assertFalse(d1.called)
|
||||||
|
|
||||||
|
# Create a third task, waiting for the first task.
|
||||||
|
d3, acquired_d3, _ = self._start_task(linearizer, key)
|
||||||
|
self.assertFalse(acquired_d3.called)
|
||||||
|
self.assertFalse(acquired_d2.called)
|
||||||
|
|
||||||
|
# Unblock the first task.
|
||||||
|
unblock1()
|
||||||
|
self.successResultOf(d1)
|
||||||
|
self.assertFalse(acquired_d2.called)
|
||||||
|
|
||||||
|
# The third task should have started running.
|
||||||
|
self.assertTrue(acquired_d3.called)
|
||||||
|
|
||||||
|
def test_max_queue_size(self) -> None:
|
||||||
|
"""Test the `Linearizer` max queue size behaviour."""
|
||||||
|
linearizer = Linearizer(
|
||||||
|
clock=self.clock,
|
||||||
|
max_queue_size=2,
|
||||||
|
limit_error_code=999,
|
||||||
|
)
|
||||||
|
key = object()
|
||||||
|
|
||||||
|
# Start three tasks.
|
||||||
|
d1, acquired_d1, unblock1 = self._start_task(linearizer, key)
|
||||||
|
d2, acquired_d2, unblock2 = self._start_task(linearizer, key)
|
||||||
|
d3, _, unblock3 = self._start_task(linearizer, key)
|
||||||
|
d4, _, _ = self._start_task(linearizer, key)
|
||||||
|
|
||||||
|
self.assertTrue(acquired_d1.called)
|
||||||
|
self.assertFalse(d1.called)
|
||||||
|
self.assertFalse(d2.called)
|
||||||
|
self.assertFalse(d3.called)
|
||||||
|
|
||||||
|
# The fourth task should have been rejected.
|
||||||
|
self.failureResultOf(d4, SynapseError)
|
||||||
|
|
||||||
|
# Unblock the first task.
|
||||||
|
unblock1()
|
||||||
|
|
||||||
|
# Second task should now be running.
|
||||||
|
self.assertTrue(acquired_d2.called)
|
||||||
|
|
||||||
|
# Adding one more task should succeed, but further tasks should be rejected.
|
||||||
|
d5, acquired_d5, _ = self._start_task(linearizer, key)
|
||||||
|
self.assertFalse(d5.called)
|
||||||
|
|
||||||
|
d6, _, _ = self._start_task(linearizer, key)
|
||||||
|
self.failureResultOf(d6, SynapseError)
|
||||||
|
|
||||||
|
# Unblock the second and third task should cause the fifth task to start running.
|
||||||
|
unblock2()
|
||||||
|
self.assertTrue(d2.called)
|
||||||
|
unblock3()
|
||||||
|
self.assertTrue(d3.called)
|
||||||
|
self.assertTrue(acquired_d5.called)
|
||||||
|
|||||||
Reference in New Issue
Block a user