Compare commits

...

7 Commits

Author SHA1 Message Date
Erik Johnston
e49ee3dd1c Newsfile 2024-09-27 10:36:13 +01:00
Erik Johnston
e52725b2ac Optimise notifier mk2 2024-09-27 10:32:58 +01:00
Erik Johnston
210f76f4b9 Fix up doc string 2024-09-27 10:23:37 +01:00
Erik Johnston
e85f8f77c2 Fix typing tests 2024-09-27 10:19:42 +01:00
Erik Johnston
6452e22e8f Newsfile 2024-09-27 09:53:33 +01:00
Erik Johnston
9d3e8d7fcd Calculate new current token once
Turns out doing `.copy_and_advance` can be expensive
2024-09-27 09:13:43 +01:00
Erik Johnston
a3f8ec284a Move metrics out of hot path
We can update the counter once outside of the loop.
2024-09-27 09:12:54 +01:00
4 changed files with 100 additions and 65 deletions

1
changelog.d/17765.misc Normal file
View File

@@ -0,0 +1 @@
Increase performance of the notifier when there are many syncing users.

1
changelog.d/17766.misc Normal file
View File

@@ -0,0 +1 @@
Increase performance of the notifier when there are many syncing users.

View File

@@ -41,6 +41,7 @@ import attr
from prometheus_client import Counter
from twisted.internet import defer
from twisted.internet.defer import Deferred
from synapse.api.constants import EduTypes, EventTypes, HistoryVisibility, Membership
from synapse.api.errors import AuthError
@@ -52,6 +53,7 @@ from synapse.logging.opentracing import log_kv, start_active_span
from synapse.metrics import LaterGauge
from synapse.streams.config import PaginationConfig
from synapse.types import (
ISynapseReactor,
JsonDict,
MultiWriterStreamToken,
PersistedEventPosition,
@@ -61,8 +63,11 @@ from synapse.types import (
StreamToken,
UserID,
)
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.async_helpers import (
timeout_deferred,
)
from synapse.util.metrics import Measure
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
@@ -89,18 +94,6 @@ def count(func: Callable[[T], bool], it: Iterable[T]) -> int:
return n
class _NotificationListener:
"""This represents a single client connection to the events stream.
The events stream handler will have yielded to the deferred, so to
notify the handler it is sufficient to resolve the deferred.
"""
__slots__ = ["deferred"]
def __init__(self, deferred: "defer.Deferred"):
self.deferred = deferred
class _NotifierUserStream:
"""This represents a user connected to the event stream.
It tracks the most recent stream token for that user.
@@ -113,59 +106,49 @@ class _NotifierUserStream:
def __init__(
self,
reactor: ISynapseReactor,
user_id: str,
rooms: StrCollection,
current_token: StreamToken,
time_now_ms: int,
):
self.reactor = reactor
self.user_id = user_id
self.rooms = set(rooms)
self.current_token = current_token
# The last token for which we should wake up any streams that have a
# token that comes before it. This gets updated every time we get poked.
# We start it at the current token since if we get any streams
# that have a token from before we have no idea whether they should be
# woken up or not, so lets just wake them up.
self.last_notified_token = current_token
self.current_token = current_token
self.last_notified_ms = time_now_ms
self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred(
defer.Deferred()
)
# Set of listeners that we need to wake up when there has been a change.
self.listeners: Set[Deferred[StreamToken]] = set()
def notify(
def update_and_fetch_deferreds(
self,
stream_key: StreamKeyType,
stream_id: Union[int, RoomStreamToken, MultiWriterStreamToken],
current_token: StreamToken,
time_now_ms: int,
) -> None:
"""Notify any listeners for this user of a new event from an
event source.
) -> Collection["Deferred[StreamToken]"]:
"""Update the stream for this user because of a new event from an
event source, and return the set of deferreds to wake up.
Args:
stream_key: The stream the event came from.
stream_id: The new id for the stream the event came from.
current_token: The new current token.
time_now_ms: The current time in milliseconds.
Returns:
The set of deferreds that need to be called.
"""
self.current_token = self.current_token.copy_and_advance(stream_key, stream_id)
self.last_notified_token = self.current_token
self.current_token = current_token
self.last_notified_ms = time_now_ms
notify_deferred = self.notify_deferred
log_kv(
{
"notify": self.user_id,
"stream": stream_key,
"stream_id": stream_id,
"listeners": self.count_listeners(),
}
)
listeners = self.listeners
self.listeners = set()
users_woken_by_stream_counter.labels(stream_key).inc()
with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())
notify_deferred.callback(self.current_token)
return listeners
def remove(self, notifier: "Notifier") -> None:
"""Remove this listener from all the indexes in the Notifier
@@ -179,9 +162,9 @@ class _NotifierUserStream:
notifier.user_to_user_stream.pop(self.user_id)
def count_listeners(self) -> int:
return len(self.notify_deferred.observers())
return len(self.listeners)
def new_listener(self, token: StreamToken) -> _NotificationListener:
def new_listener(self, token: StreamToken) -> "Deferred[StreamToken]":
"""Returns a deferred that is resolved when there is a new token
greater than the given token.
@@ -191,10 +174,17 @@ class _NotifierUserStream:
"""
# Immediately wake up stream if something has already since happened
# since their last token.
if self.last_notified_token != token:
return _NotificationListener(defer.succeed(self.current_token))
else:
return _NotificationListener(self.notify_deferred.observe())
if token != self.current_token:
return defer.succeed(self.current_token)
# Create a new deferred and add it to the set of listeners. We add a
# cancel handler to remove it from the set again, to handle timeouts.
deferred: "Deferred[StreamToken]" = Deferred(
canceller=lambda d: self.listeners.discard(d)
)
self.listeners.add(deferred)
return deferred
@attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -247,6 +237,7 @@ class Notifier:
# List of callbacks to be notified when a lock is released
self._lock_released_callback: List[Callable[[str, str, str], None]] = []
self.reactor = hs.get_reactor()
self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
self._pusher_pool = hs.get_pusherpool()
@@ -342,14 +333,25 @@ class Notifier:
# Wake up all related user stream notifiers
user_streams = self.room_to_user_streams.get(room_id, set())
time_now_ms = self.clock.time_msec()
current_token = self.event_sources.get_current_token()
listeners: List["Deferred[StreamToken]"] = []
for user_stream in user_streams:
try:
user_stream.notify(
StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
listeners.extend(
user_stream.update_and_fetch_deferreds(current_token, time_now_ms)
)
except Exception:
logger.exception("Failed to notify listener")
with PreserveLoggingContext():
for listener in listeners:
listener.callback(current_token)
users_woken_by_stream_counter.labels(StreamKeyType.UN_PARTIAL_STATED_ROOMS).inc(
len(user_streams)
)
# Poke the replication so that other workers also see the write to
# the un-partial-stated rooms stream.
self.notify_replication()
@@ -519,12 +521,16 @@ class Notifier:
rooms = rooms or []
with Measure(self.clock, "on_new_event"):
user_streams = set()
user_streams: Set[_NotifierUserStream] = set()
log_kv(
{
"waking_up_explicit_users": len(users),
"waking_up_explicit_rooms": len(rooms),
"users": shortstr(users),
"rooms": shortstr(rooms),
"stream": stream_key,
"stream_id": new_token,
}
)
@@ -544,12 +550,27 @@ class Notifier:
)
time_now_ms = self.clock.time_msec()
current_token = self.event_sources.get_current_token()
listeners: List["Deferred[StreamToken]"] = []
for user_stream in user_streams:
try:
user_stream.notify(stream_key, new_token, time_now_ms)
listeners.extend(
user_stream.update_and_fetch_deferreds(
current_token, time_now_ms
)
)
except Exception:
logger.exception("Failed to notify listener")
# We resolve all these deferreds in one go so that we only need to
# call `PreserveLoggingContext` once, as it has a bunch of overhead
# (to calculate performance stats)
with PreserveLoggingContext():
for listener in listeners:
listener.callback(current_token)
users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
self.notify_replication()
# Notify appservices.
@@ -586,6 +607,7 @@ class Notifier:
if room_ids is None:
room_ids = await self.store.get_rooms_for_user(user_id)
user_stream = _NotifierUserStream(
reactor=self.reactor,
user_id=user_id,
rooms=room_ids,
current_token=current_token,
@@ -608,8 +630,8 @@ class Notifier:
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
listener.deferred = timeout_deferred(
listener.deferred,
listener = timeout_deferred(
listener,
(end_time - now) / 1000.0,
self.hs.get_reactor(),
)
@@ -622,7 +644,7 @@ class Notifier:
)
with PreserveLoggingContext():
await listener.deferred
await listener
log_kv(
{

View File

@@ -282,22 +282,33 @@ class SyncTypingTests(unittest.HomeserverTestCase):
self.assertEqual(200, channel.code)
next_batch = channel.json_body["next_batch"]
# This should time out! But it does not, because our stream token is
# ahead, and therefore it's saying the typing (that we've actually
# already seen) is new, since it's got a token above our new, now-reset
# stream token.
channel = self.make_request("GET", sync_url % (access_token, next_batch))
self.assertEqual(200, channel.code)
next_batch = channel.json_body["next_batch"]
# Clear the typing information, so that it doesn't think everything is
# in the future.
# in the future. This happens automatically when the typing stream
# resets.
typing._reset()
# Now it SHOULD fail as it never completes!
# Nothing new, so we time out.
with self.assertRaises(TimedOutException):
self.make_request("GET", sync_url % (access_token, next_batch))
# Sync and start typing again.
sync_channel = self.make_request(
"GET", sync_url % (access_token, next_batch), await_result=False
)
self.assertFalse(sync_channel.is_finished())
channel = self.make_request(
"PUT",
typing_url % (room, other_user_id, other_access_token),
b'{"typing": true, "timeout": 30000}',
)
self.assertEqual(200, channel.code)
# Sync should now return.
sync_channel.await_result()
self.assertEqual(200, sync_channel.code)
next_batch = sync_channel.json_body["next_batch"]
class SyncKnockTestCase(KnockingStrippedStateEventHelperMixin):
servlets = [