Compare commits

...

3 Commits

Author SHA1 Message Date
Erik Johnston
2f0c33a540 dfasd 2021-01-28 20:03:36 +00:00
Erik Johnston
ccdfa36131 Fixup 2021-01-28 19:42:41 +00:00
Erik Johnston
a1b6dea0b7 Add smoother 2021-01-28 19:28:22 +00:00
2 changed files with 136 additions and 2 deletions

View File

@@ -18,6 +18,7 @@ import collections
import inspect
import logging
from contextlib import contextmanager
from collections import deque
from typing import (
Any,
Awaitable,
@@ -30,6 +31,7 @@ from typing import (
Set,
TypeVar,
Union,
Deque,
)
import attr
@@ -37,7 +39,7 @@ from typing_extensions import ContextManager
from twisted.internet import defer
from twisted.internet.defer import CancelledError
from twisted.internet.interfaces import IReactorTime
from twisted.internet.interfaces import IReactorTime, IDelayedCall
from twisted.python import failure
from synapse.logging.context import (
@@ -552,3 +554,84 @@ def maybe_awaitable(value: Union[Awaitable[R], R]) -> Awaitable[R]:
return value
return DoneAwaitable(value)
@attr.s(slots=True)
class _SmootherEntry:
scheduled_at_ms = attr.ib(type=int)
scheduled_for_ms = attr.ib(type=int)
defer = attr.ib(type=defer.Deferred)
@attr.s(slots=True)
class Smoother:
_reactor = attr.ib(type=IReactorTime)
_target_ms = attr.ib(type=int)
_queue = attr.ib(type=Deque[_SmootherEntry], factory=deque)
_last_run = attr.ib(type=int, default=0)
_next_call = attr.ib(type=Optional[IDelayedCall], default=None)
def _fire_next(self):
if not self._queue:
return
self._next_call = None
entry = self._queue.popleft()
entry.defer.callback(None)
async def smooth(self) -> None:
now = self._reactor.seconds() * 1000.0
if not self._queue:
scheduled_for_ms = (now + self._target_ms + self._last_run) / 2
if scheduled_for_ms <= now:
self._last_run = now
return
entry = _SmootherEntry(
scheduled_at_ms=now,
scheduled_for_ms=scheduled_for_ms,
defer=defer.Deferred(),
)
self._queue.append(entry)
else:
last_entry = self._queue[-1]
scheduled_for_ms = (now + self._target_ms + last_entry.scheduled_for_ms) / 2
entry = _SmootherEntry(
scheduled_at_ms=now,
scheduled_for_ms=scheduled_for_ms,
defer=defer.Deferred(),
)
self._queue.append(entry)
step = self._target_ms / (len(self._queue) + 1)
for idx, entry in enumerate(self._queue):
new_time = now + (idx + 1) * step
if new_time < entry.scheduled_for_ms:
entry.scheduled_for_ms = new_time
if self._next_call and not self._next_call.active:
self._next_call.reset(
max(self._queue[0].scheduled_for_ms - now, 0) / 1000.0
)
else:
self._next_call = self._reactor.callLater(
max(self._queue[0].scheduled_for_ms - now, 0) / 1000.0, self._fire_next
)
await make_deferred_yieldable(entry.defer)
now = self._reactor.seconds() * 1000.0
self._last_run = now
if self._queue:
self._next_call = self._reactor.callLater(
max(self._queue[0].scheduled_for_ms - now, 0) / 1000.0, self._fire_next,
)
return

View File

@@ -22,7 +22,7 @@ from synapse.logging.context import (
PreserveLoggingContext,
current_context,
)
from synapse.util.async_helpers import timeout_deferred
from synapse.util.async_helpers import timeout_deferred, Smoother
from tests.unittest import TestCase
@@ -105,3 +105,54 @@ class TimeoutDeferredTest(TestCase):
)
self.failureResultOf(timing_out_d, defer.TimeoutError)
self.assertIs(current_context(), context_one)
class TestSmoother(TestCase):
def setUp(self):
self.clock = Clock()
self.smoother = Smoother(self.clock, 10 * 1000)
def test_first(self):
self.clock.advance(100)
d = self.smoother.smooth()
self.successResultOf(d)
def test_multiple_at_same_time(self):
self.clock.advance(100)
d1 = defer.ensureDeferred(self.smoother.smooth())
self.successResultOf(d1)
d2 = defer.ensureDeferred(self.smoother.smooth())
self.assertNoResult(d2)
self.assertAlmostEqual(
self.smoother._queue[0].scheduled_for_ms,
self.clock.seconds() * 1000 + self.smoother._target_ms / 2,
)
d3 = defer.ensureDeferred(self.smoother.smooth())
self.assertNoResult(d3)
self.assertAlmostEqual(
self.smoother._queue[0].scheduled_for_ms,
self.clock.seconds() * 1000 + self.smoother._target_ms / 3,
)
self.assertAlmostEqual(
self.smoother._queue[1].scheduled_for_ms,
self.clock.seconds() * 1000 + 2 * self.smoother._target_ms / 3,
)
self.clock.advance(4)
self.successResultOf(d2)
self.assertNoResult(d3)
self.clock.advance(0)
self.assertNoResult(d3)
self.clock.advance(4)
self.successResultOf(d3)
self.clock.advance(100)
self.assertNot(self.smoother._queue)