Compare commits

...

6 Commits

Author SHA1 Message Date
Erik Johnston
68b9eb694f Tidy up hs.get_federation_sender() calls 2021-04-15 14:13:45 +01:00
Erik Johnston
9f1a20f0c2 Remove the federation replication stream and associated commands 2021-04-15 14:13:18 +01:00
Erik Johnston
7575a686fd Fix module api 2021-04-15 13:49:21 +01:00
Erik Johnston
40bc8774c8 Newsfile 2021-04-15 12:04:13 +01:00
Erik Johnston
5c63b653c8 Add a presence federation replication stream 2021-04-15 12:03:23 +01:00
Erik Johnston
8f566077fb Always use send_presence_to_destinations rather than send_presence
This a) reduces the API surface and b) means that we calculate where to
send presence on the presence writer, rather than federation senders.
2021-04-15 11:33:54 +01:00
21 changed files with 455 additions and 1067 deletions

1
changelog.d/9819.feature Normal file
View File

@@ -0,0 +1 @@
Add experimental support for handling presence on a worker.

View File

@@ -216,10 +216,6 @@ Asks the server for the current position of all streams.
This is used when a worker is shutting down.
#### FEDERATION_ACK (C)
Acknowledge receipt of some federation data
### REMOTE_SERVER_UP (S, C)
Inform other processes that a remote server may have come back online.

View File

@@ -1,576 +0,0 @@
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A federation sender that forwards things to be sent across replication to
a worker process.
It assumes there is a single worker process feeding off of it.
Each row in the replication stream consists of a type and some json, where the
types indicate whether they are presence, or edus, etc.
Ephemeral or non-event data are queued up in-memory. When the worker requests
updates since a particular point, all in-memory data since before that point is
dropped. We also expire things in the queue after 5 minutes, to ensure that a
dead worker doesn't cause the queues to grow limitlessly.
Events are replicated via a separate events stream.
"""
import logging
from collections import namedtuple
from typing import (
TYPE_CHECKING,
Dict,
Hashable,
Iterable,
List,
Optional,
Sized,
Tuple,
Type,
)
from sortedcontainers import SortedDict
from synapse.api.presence import UserPresenceState
from synapse.federation.sender import AbstractFederationSender, FederationSender
from synapse.metrics import LaterGauge
from synapse.replication.tcp.streams.federation import FederationStream
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure
from .units import Edu
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class FederationRemoteSendQueue(AbstractFederationSender):
"""A drop in replacement for FederationSender"""
def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id
# We may have multiple federation sender instances, so we need to track
# their positions separately.
self._sender_instances = hs.config.worker.federation_shard_config.instances
self._sender_positions = {} # type: Dict[str, int]
# Pending presence map user_id -> UserPresenceState
self.presence_map = {} # type: Dict[str, UserPresenceState]
# Stream position -> list[user_id]
self.presence_changed = SortedDict() # type: SortedDict[int, List[str]]
# Stores the destinations we need to explicitly send presence to about a
# given user.
# Stream position -> (user_id, destinations)
self.presence_destinations = (
SortedDict()
) # type: SortedDict[int, Tuple[str, Iterable[str]]]
# (destination, key) -> EDU
self.keyed_edu = {} # type: Dict[Tuple[str, tuple], Edu]
# stream position -> (destination, key)
self.keyed_edu_changed = (
SortedDict()
) # type: SortedDict[int, Tuple[str, tuple]]
self.edus = SortedDict() # type: SortedDict[int, Edu]
# stream ID for the next entry into presence_changed/keyed_edu_changed/edus.
self.pos = 1
# map from stream ID to the time that stream entry was generated, so that we
# can clear out entries after a while
self.pos_time = SortedDict() # type: SortedDict[int, int]
# EVERYTHING IS SAD. In particular, python only makes new scopes when
# we make a new function, so we need to make a new function so the inner
# lambda binds to the queue rather than to the name of the queue which
# changes. ARGH.
def register(name: str, queue: Sized) -> None:
LaterGauge(
"synapse_federation_send_queue_%s_size" % (queue_name,),
"",
[],
lambda: len(queue),
)
for queue_name in [
"presence_map",
"presence_changed",
"keyed_edu",
"keyed_edu_changed",
"edus",
"pos_time",
"presence_destinations",
]:
register(queue_name, getattr(self, queue_name))
self.clock.looping_call(self._clear_queue, 30 * 1000)
def _next_pos(self) -> int:
pos = self.pos
self.pos += 1
self.pos_time[self.clock.time_msec()] = pos
return pos
def _clear_queue(self) -> None:
"""Clear the queues for anything older than N minutes"""
FIVE_MINUTES_AGO = 5 * 60 * 1000
now = self.clock.time_msec()
keys = self.pos_time.keys()
time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO)
if not keys[:time]:
return
position_to_delete = max(keys[:time])
for key in keys[:time]:
del self.pos_time[key]
self._clear_queue_before_pos(position_to_delete)
def _clear_queue_before_pos(self, position_to_delete: int) -> None:
"""Clear all the queues from before a given position"""
with Measure(self.clock, "send_queue._clear"):
# Delete things out of presence maps
keys = self.presence_changed.keys()
i = self.presence_changed.bisect_left(position_to_delete)
for key in keys[:i]:
del self.presence_changed[key]
user_ids = {
user_id for uids in self.presence_changed.values() for user_id in uids
}
keys = self.presence_destinations.keys()
i = self.presence_destinations.bisect_left(position_to_delete)
for key in keys[:i]:
del self.presence_destinations[key]
user_ids.update(
user_id for user_id, _ in self.presence_destinations.values()
)
to_del = [
user_id for user_id in self.presence_map if user_id not in user_ids
]
for user_id in to_del:
del self.presence_map[user_id]
# Delete things out of keyed edus
keys = self.keyed_edu_changed.keys()
i = self.keyed_edu_changed.bisect_left(position_to_delete)
for key in keys[:i]:
del self.keyed_edu_changed[key]
live_keys = set()
for edu_key in self.keyed_edu_changed.values():
live_keys.add(edu_key)
keys_to_del = [
edu_key for edu_key in self.keyed_edu if edu_key not in live_keys
]
for edu_key in keys_to_del:
del self.keyed_edu[edu_key]
# Delete things out of edu map
keys = self.edus.keys()
i = self.edus.bisect_left(position_to_delete)
for key in keys[:i]:
del self.edus[key]
def notify_new_events(self, max_token: RoomStreamToken) -> None:
"""As per FederationSender"""
# This should never get called.
raise NotImplementedError()
def build_and_send_edu(
self,
destination: str,
edu_type: str,
content: JsonDict,
key: Optional[Hashable] = None,
) -> None:
"""As per FederationSender"""
if destination == self.server_name:
logger.info("Not sending EDU to ourselves")
return
pos = self._next_pos()
edu = Edu(
origin=self.server_name,
destination=destination,
edu_type=edu_type,
content=content,
)
if key:
assert isinstance(key, tuple)
self.keyed_edu[(destination, key)] = edu
self.keyed_edu_changed[pos] = (destination, key)
else:
self.edus[pos] = edu
self.notifier.on_new_replication_data()
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""As per FederationSender
Args:
receipt:
"""
# nothing to do here: the replication listener will handle it.
def send_presence(self, states: List[UserPresenceState]) -> None:
"""As per FederationSender
Args:
states
"""
pos = self._next_pos()
# We only want to send presence for our own users, so lets always just
# filter here just in case.
local_states = [s for s in states if self.is_mine_id(s.user_id)]
self.presence_map.update({state.user_id: state for state in local_states})
self.presence_changed[pos] = [state.user_id for state in local_states]
self.notifier.on_new_replication_data()
def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
"""As per FederationSender
Args:
states
destinations
"""
for state in states:
pos = self._next_pos()
self.presence_map.update({state.user_id: state for state in states})
self.presence_destinations[pos] = (state.user_id, destinations)
self.notifier.on_new_replication_data()
def send_device_messages(self, destination: str) -> None:
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.
def wake_destination(self, server: str) -> None:
pass
def get_current_token(self) -> int:
return self.pos - 1
def federation_ack(self, instance_name: str, token: int) -> None:
if self._sender_instances:
# If we have configured multiple federation sender instances we need
# to track their positions separately, and only clear the queue up
# to the token all instances have acked.
self._sender_positions[instance_name] = token
token = min(self._sender_positions.values())
self._clear_queue_before_pos(token)
async def get_replication_rows(
self, instance_name: str, from_token: int, to_token: int, target_row_count: int
) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
"""Get rows to be sent over federation between the two tokens
Args:
instance_name: the name of the current process
from_token: the previous stream token: the starting point for fetching the
updates
to_token: the new stream token: the point to get updates up to
target_row_count: a target for the number of rows to be returned.
Returns: a triplet `(updates, new_last_token, limited)`, where:
* `updates` is a list of `(token, row)` entries.
* `new_last_token` is the new position in stream.
* `limited` is whether there are more updates to fetch.
"""
# TODO: Handle target_row_count.
# To handle restarts where we wrap around
if from_token > self.pos:
from_token = -1
# list of tuple(int, BaseFederationRow), where the first is the position
# of the federation stream.
rows = [] # type: List[Tuple[int, BaseFederationRow]]
# Fetch changed presence
i = self.presence_changed.bisect_right(from_token)
j = self.presence_changed.bisect_right(to_token) + 1
dest_user_ids = [
(pos, user_id)
for pos, user_id_list in self.presence_changed.items()[i:j]
for user_id in user_id_list
]
for (key, user_id) in dest_user_ids:
rows.append((key, PresenceRow(state=self.presence_map[user_id])))
# Fetch presence to send to destinations
i = self.presence_destinations.bisect_right(from_token)
j = self.presence_destinations.bisect_right(to_token) + 1
for pos, (user_id, dests) in self.presence_destinations.items()[i:j]:
rows.append(
(
pos,
PresenceDestinationsRow(
state=self.presence_map[user_id], destinations=list(dests)
),
)
)
# Fetch changes keyed edus
i = self.keyed_edu_changed.bisect_right(from_token)
j = self.keyed_edu_changed.bisect_right(to_token) + 1
# We purposefully clobber based on the key here, python dict comprehensions
# always use the last value, so this will correctly point to the last
# stream position.
keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
for ((destination, edu_key), pos) in keyed_edus.items():
rows.append(
(
pos,
KeyedEduRow(
key=edu_key, edu=self.keyed_edu[(destination, edu_key)]
),
)
)
# Fetch changed edus
i = self.edus.bisect_right(from_token)
j = self.edus.bisect_right(to_token) + 1
edus = self.edus.items()[i:j]
for (pos, edu) in edus:
rows.append((pos, EduRow(edu)))
# Sort rows based on pos
rows.sort()
return (
[(pos, (row.TypeId, row.to_data())) for pos, row in rows],
to_token,
False,
)
class BaseFederationRow:
"""Base class for rows to be sent in the federation stream.
Specifies how to identify, serialize and deserialize the different types.
"""
TypeId = "" # Unique string that ids the type. Must be overridden in sub classes.
@staticmethod
def from_data(data):
"""Parse the data from the federation stream into a row.
Args:
data: The value of ``data`` from FederationStreamRow.data, type
depends on the type of stream
"""
raise NotImplementedError()
def to_data(self):
"""Serialize this row to be sent over the federation stream.
Returns:
The value to be sent in FederationStreamRow.data. The type depends
on the type of stream.
"""
raise NotImplementedError()
def add_to_buffer(self, buff):
"""Add this row to the appropriate field in the buffer ready for this
to be sent over federation.
We use a buffer so that we can batch up events that have come in at
the same time and send them all at once.
Args:
buff (BufferedToSend)
"""
raise NotImplementedError()
class PresenceRow(
BaseFederationRow, namedtuple("PresenceRow", ("state",)) # UserPresenceState
):
TypeId = "p"
@staticmethod
def from_data(data):
return PresenceRow(state=UserPresenceState.from_dict(data))
def to_data(self):
return self.state.as_dict()
def add_to_buffer(self, buff):
buff.presence.append(self.state)
class PresenceDestinationsRow(
BaseFederationRow,
namedtuple(
"PresenceDestinationsRow",
("state", "destinations"), # UserPresenceState # list[str]
),
):
TypeId = "pd"
@staticmethod
def from_data(data):
return PresenceDestinationsRow(
state=UserPresenceState.from_dict(data["state"]), destinations=data["dests"]
)
def to_data(self):
return {"state": self.state.as_dict(), "dests": self.destinations}
def add_to_buffer(self, buff):
buff.presence_destinations.append((self.state, self.destinations))
class KeyedEduRow(
BaseFederationRow,
namedtuple(
"KeyedEduRow",
("key", "edu"), # tuple(str) - the edu key passed to send_edu # Edu
),
):
"""Streams EDUs that have an associated key that is ued to clobber. For example,
typing EDUs clobber based on room_id.
"""
TypeId = "k"
@staticmethod
def from_data(data):
return KeyedEduRow(key=tuple(data["key"]), edu=Edu(**data["edu"]))
def to_data(self):
return {"key": self.key, "edu": self.edu.get_internal_dict()}
def add_to_buffer(self, buff):
buff.keyed_edus.setdefault(self.edu.destination, {})[self.key] = self.edu
class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu
"""Streams EDUs that don't have keys. See KeyedEduRow"""
TypeId = "e"
@staticmethod
def from_data(data):
return EduRow(Edu(**data))
def to_data(self):
return self.edu.get_internal_dict()
def add_to_buffer(self, buff):
buff.edus.setdefault(self.edu.destination, []).append(self.edu)
_rowtypes = (
PresenceRow,
PresenceDestinationsRow,
KeyedEduRow,
EduRow,
) # type: Tuple[Type[BaseFederationRow], ...]
TypeToRow = {Row.TypeId: Row for Row in _rowtypes}
ParsedFederationStreamData = namedtuple(
"ParsedFederationStreamData",
(
"presence", # list(UserPresenceState)
"presence_destinations", # list of tuples of UserPresenceState and destinations
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
),
)
def process_rows_for_federation(
transaction_queue: FederationSender,
rows: List[FederationStream.FederationStreamRow],
) -> None:
"""Parse a list of rows from the federation stream and put them in the
transaction queue ready for sending to the relevant homeservers.
Args:
transaction_queue
rows
"""
# The federation stream contains a bunch of different types of
# rows that need to be handled differently. We parse the rows, put
# them into the appropriate collection and then send them off.
buff = ParsedFederationStreamData(
presence=[],
presence_destinations=[],
keyed_edus={},
edus={},
)
# Parse the rows in the stream and add to the buffer
for row in rows:
if row.type not in TypeToRow:
logger.error("Unrecognized federation row type %r", row.type)
continue
RowType = TypeToRow[row.type]
parsed_row = RowType.from_data(row.data)
parsed_row.add_to_buffer(buff)
if buff.presence:
transaction_queue.send_presence(buff.presence)
for state, destinations in buff.presence_destinations:
transaction_queue.send_presence_to_destinations(
states=[state], destinations=destinations
)
for destination, edu_map in buff.keyed_edus.items():
for key, edu in edu_map.items():
transaction_queue.send_edu(edu, key)
for destination, edu_list in buff.edus.items():
for edu in edu_list:
transaction_queue.send_edu(edu, None)

View File

@@ -24,8 +24,6 @@ from synapse.events import EventBase
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu
from synapse.handlers.presence import get_interested_remotes
from synapse.logging.context import preserve_fn
from synapse.metrics import (
LaterGauge,
event_processing_loop_counter,
@@ -34,7 +32,7 @@ from synapse.metrics import (
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import Collection, JsonDict, ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure, measure_func
from synapse.util.metrics import Measure
if TYPE_CHECKING:
from synapse.events.presence_router import PresenceRouter
@@ -79,15 +77,6 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
"""
raise NotImplementedError()
@abc.abstractmethod
def send_presence(self, states: List[UserPresenceState]) -> None:
"""Send the new presence states to the appropriate destinations.
This actually queues up the presence states ready for sending and
triggers a background task to process them and send out the transactions.
"""
raise NotImplementedError()
@abc.abstractmethod
def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
@@ -134,10 +123,6 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
def get_current_token(self) -> int:
raise NotImplementedError()
@abc.abstractmethod
def federation_ack(self, instance_name: str, token: int) -> None:
raise NotImplementedError()
@abc.abstractmethod
async def get_replication_rows(
self, instance_name: str, from_token: int, to_token: int, target_row_count: int
@@ -176,11 +161,6 @@ class FederationSender(AbstractFederationSender):
),
)
# Map of user_id -> UserPresenceState for all the pending presence
# to be sent out by user_id. Entries here get processed and put in
# pending_presence_by_dest
self.pending_presence = {} # type: Dict[str, UserPresenceState]
LaterGauge(
"synapse_federation_transaction_queue_pending_pdus",
"",
@@ -201,8 +181,6 @@ class FederationSender(AbstractFederationSender):
self._is_processing = False
self._last_poked_id = -1
self._processing_pending_presence = False
# map from room_id to a set of PerDestinationQueues which we believe are
# awaiting a call to flush_read_receipts_for_room. The presence of an entry
# here for a given room means that we are rate-limiting RR flushes to that room,
@@ -546,48 +524,6 @@ class FederationSender(AbstractFederationSender):
for queue in queues:
queue.flush_read_receipts_for_room(room_id)
@preserve_fn # the caller should not yield on this
async def send_presence(self, states: List[UserPresenceState]) -> None:
"""Send the new presence states to the appropriate destinations.
This actually queues up the presence states ready for sending and
triggers a background task to process them and send out the transactions.
"""
if not self.hs.config.use_presence:
# No-op if presence is disabled.
return
# First we queue up the new presence by user ID, so multiple presence
# updates in quick succession are correctly handled.
# We only want to send presence for our own users, so lets always just
# filter here just in case.
self.pending_presence.update(
{state.user_id: state for state in states if self.is_mine_id(state.user_id)}
)
# We then handle the new pending presence in batches, first figuring
# out the destinations we need to send each state to and then poking it
# to attempt a new transaction. We linearize this so that we don't
# accidentally mess up the ordering and send multiple presence updates
# in the wrong order
if self._processing_pending_presence:
return
self._processing_pending_presence = True
try:
while True:
states_map = self.pending_presence
self.pending_presence = {}
if not states_map:
break
await self._process_presence_inner(list(states_map.values()))
except Exception:
logger.exception("Error sending presence states to servers")
finally:
self._processing_pending_presence = False
def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
@@ -608,40 +544,6 @@ class FederationSender(AbstractFederationSender):
continue
self._get_per_destination_queue(destination).send_presence(states)
@measure_func("txnqueue._process_presence")
async def _process_presence_inner(self, states: List[UserPresenceState]) -> None:
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
"""
# We pull the presence router here instead of __init__
# to prevent a dependency cycle:
#
# AuthHandler -> Notifier -> FederationSender
# -> PresenceRouter -> ModuleApi -> AuthHandler
if self._presence_router is None:
self._presence_router = self.hs.get_presence_router()
assert self._presence_router is not None
hosts_and_states = await get_interested_remotes(
self.store,
self._presence_router,
states,
self.state,
)
for destinations, states in hosts_and_states:
for destination in destinations:
if destination == self.server_name:
continue
if not self._federation_shard_config.should_handle(
self._instance_name, destination
):
continue
self._get_per_destination_queue(destination).send_presence(states)
def build_and_send_edu(
self,
destination: str,
@@ -729,10 +631,6 @@ class FederationSender(AbstractFederationSender):
# to a worker.
return 0
def federation_ack(self, instance_name: str, token: int) -> None:
# It is not expected that this gets called on FederationSender.
raise NotImplementedError()
@staticmethod
async def get_replication_rows(
instance_name: str, from_token: int, to_token: int, target_row_count: int

View File

@@ -484,7 +484,7 @@ class DeviceHandler(DeviceWorkerHandler):
"device_list_key", position, users=[user_id], rooms=room_ids
)
if hosts:
if hosts and self.federation_sender:
logger.info(
"Sending device list update notif for %r to: %r", user_id, hosts
)

View File

@@ -51,9 +51,7 @@ class DeviceMessageHandler:
# same instance. Other federation sender instances will get notified by
# `synapse.app.generic_worker.FederationSenderHandler` when it sees it
# in the to-device replication stream.
self.federation_sender = None
if hs.should_send_federation():
self.federation_sender = hs.get_federation_sender()
self.federation_sender = hs.get_federation_sender()
# If we can handle the to device EDUs we do so, otherwise we route them
# to the appropriate worker.

View File

@@ -24,6 +24,7 @@ The methods that define policy are:
import abc
import contextlib
import logging
from bisect import bisect
from contextlib import contextmanager
from typing import (
TYPE_CHECKING,
@@ -53,7 +54,9 @@ from synapse.replication.http.presence import (
ReplicationBumpPresenceActiveTime,
ReplicationPresenceSetState,
)
from synapse.replication.http.streams import ReplicationGetStreamUpdates
from synapse.replication.tcp.commands import ClearUserSyncsCommand
from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
from synapse.state import StateHandler
from synapse.storage.databases.main import DataStore
from synapse.types import Collection, JsonDict, UserID, get_domain_from_id
@@ -124,6 +127,8 @@ class BasePresenceHandler(abc.ABC):
self.clock = hs.get_clock()
self.store = hs.get_datastore()
self.federation_queue = PresenceFederationQueue(hs, self)
self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
active_presence = self.store.take_presence_startup_info()
@@ -245,9 +250,17 @@ class BasePresenceHandler(abc.ABC):
"""
pass
async def process_replication_rows(self, token, rows):
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: list
):
"""Process presence stream rows received over replication."""
pass
await self.federation_queue.process_replication_rows(
stream_name, instance_name, token, rows
)
def get_federation_queue(self) -> "PresenceFederationQueue":
"""Get the presence federation queue, if any."""
return self.federation_queue
class _NullContextManager(ContextManager[None]):
@@ -265,6 +278,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
self.presence_router = hs.get_presence_router()
self._presence_enabled = hs.config.use_presence
self.state = hs.get_state_handler()
# The number of ongoing syncs on this process, by user id.
# Empty if _presence_enabled is false.
@@ -273,6 +287,8 @@ class WorkerPresenceHandler(BasePresenceHandler):
self.notifier = hs.get_notifier()
self.instance_id = hs.get_instance_id()
self._federation = hs.get_federation_sender()
# user_id -> last_sync_ms. Lists the users that have stopped syncing
# but we haven't notified the master of that yet
self.users_going_offline = {}
@@ -388,7 +404,14 @@ class WorkerPresenceHandler(BasePresenceHandler):
users=users_to_states.keys(),
)
async def process_replication_rows(self, token, rows):
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: list
):
await super().process_replication_rows(stream_name, instance_name, token, rows)
if stream_name != PresenceStream.NAME:
return
states = [
UserPresenceState(
row.user_id,
@@ -408,6 +431,20 @@ class WorkerPresenceHandler(BasePresenceHandler):
stream_id = token
await self.notify_from_replication(states, stream_id)
# Handle poking the local federation sender, if there is one.
if not self._federation:
return
hosts_and_states = await get_interested_remotes(
self.store,
self.presence_router,
states,
self.state,
)
for destinations, states in hosts_and_states:
self._federation.send_presence_to_destinations(states, destinations)
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
return [
user_id
@@ -463,11 +500,12 @@ class PresenceHandler(BasePresenceHandler):
self.server_name = hs.hostname
self.wheel_timer = WheelTimer()
self.notifier = hs.get_notifier()
self.federation = hs.get_federation_sender()
self.state = hs.get_state_handler()
self.presence_router = hs.get_presence_router()
self._presence_enabled = hs.config.use_presence
self.federation_sender = hs.get_federation_sender()
federation_registry = hs.get_federation_registry()
federation_registry.register_edu_handler("m.presence", self.incoming_presence)
@@ -680,7 +718,17 @@ class PresenceHandler(BasePresenceHandler):
if to_federation_ping:
federation_presence_out_counter.inc(len(to_federation_ping))
self._push_to_remotes(to_federation_ping.values())
hosts_and_states = await get_interested_remotes(
self.store,
self.presence_router,
list(to_federation_ping.values()),
self.state,
)
for destinations, states in hosts_and_states:
self.federation_queue.send_presence_to_destinations(
states, destinations
)
async def _handle_timeouts(self):
"""Checks the presence of users that have timed out and updates as
@@ -920,15 +968,21 @@ class PresenceHandler(BasePresenceHandler):
users=[UserID.from_string(u) for u in users_to_states],
)
self._push_to_remotes(states)
# We only need to tell the local federation sender, if any, that new
# presence has happened. Other federation senders will get notified via
# the presence replication stream.
if not self.federation_sender:
return
def _push_to_remotes(self, states):
"""Sends state updates to remote servers.
hosts_and_states = await get_interested_remotes(
self.store,
self.presence_router,
states,
self.state,
)
Args:
states (list(UserPresenceState))
"""
self.federation.send_presence(states)
for destinations, states in hosts_and_states:
self.federation_sender.send_presence_to_destinations(states, destinations)
async def incoming_presence(self, origin, content):
"""Called when we receive a `m.presence` EDU from a remote server."""
@@ -1166,7 +1220,7 @@ class PresenceHandler(BasePresenceHandler):
# Send out user presence updates for each destination
for destination, user_state_set in presence_destinations.items():
self.federation.send_presence_to_destinations(
self.federation_queue.send_presence_to_destinations(
destinations=[destination], states=user_state_set
)
@@ -1811,3 +1865,171 @@ async def get_interested_remotes(
hosts_and_states.append(([host], states))
return hosts_and_states
class PresenceFederationQueue:
"""Handles sending ad hoc presence updates over federation, which are *not*
due to state updates (that get handled via the presence stream), e.g.
federation pings and sending existing present states to newly joined hosts.
Only the last N minutes will be queued, so if a federation sender instance
is down for longer then some updates will be dropped. This is OK as presence
is ephemeral, and so it will self correct eventually.
"""
# How long to keep entries in the queue for. Workers that are down for
# longer than this duration will miss out on older updates.
_KEEP_ITEMS_IN_QUEUE_FOR_MS = 5 * 60 * 1000
# How often to check if we can expire entries from the queue.
_CLEAR_ITEMS_EVERY_MS = 60 * 1000
def __init__(self, hs: "HomeServer", presence_handler: BasePresenceHandler):
self._clock = hs.get_clock()
self._notifier = hs.get_notifier()
self._instance_name = hs.get_instance_name()
self._presence_handler = presence_handler
self._repl_client = ReplicationGetStreamUpdates.make_client(hs)
# Should we keep a queue of recent presence updates? We only bother if
# another process may be handling federation sending.
self._queue_presence_updates = True
# The federation sender if this instance is a federation sender.
self._federation = hs.get_federation_sender()
if self._federation:
# We don't bother queuing up presence states if only this instance
# is sending federation.
if hs.config.worker.federation_shard_config.instances == [
self._instance_name
]:
self._queue_presence_updates = False
# The queue of recently queued updates as tuples of: `(timestamp,
# stream_id, destinations, user_ids)`. We don't store the full states
# for efficiency, and remote workers will already have the full states
# cached.
self._queue = [] # type: List[Tuple[int, int, Collection[str], Set[str]]]
self._next_id = 1
# Map from instance name to current token
self._current_tokens = {} # type: Dict[str, int]
if self._queue_presence_updates:
self._clock.looping_call(self._clear_queue, self._CLEAR_ITEMS_EVERY_MS)
def _clear_queue(self):
"""Clear out older entries from the queue."""
clear_before = self._clock.time_msec() - self._KEEP_ITEMS_IN_QUEUE_FOR_MS
# The queue is sorted by timestamp, so we can bisect to find the right
# place to purge before. Note that we are searching using a 1-tuple with
# the time, which does The Right Thing since the queue is a tuple where
# the first item is a timestamp.
index = bisect(self._queue, (clear_before,))
self._queue = self._queue[index:]
def send_presence_to_destinations(
self, states: Collection[UserPresenceState], destinations: Collection[str]
) -> None:
"""Send the presence states to the given destinations.
Will forward to the local federation sender (if there is one) and queue
to send over replication (if there are other federation sender instances.).
"""
if self._federation:
self._federation.send_presence_to_destinations(states, destinations)
if not self._queue_presence_updates:
return
now = self._clock.time_msec()
stream_id = self._next_id
self._next_id += 1
self._queue.append((now, stream_id, destinations, {s.user_id for s in states}))
self._notifier.notify_replication()
def get_current_token(self, instance_name: str) -> int:
if instance_name == self._instance_name:
return self._next_id - 1
else:
return self._current_tokens.get(instance_name, 0)
async def get_replication_rows(
self,
instance_name: str,
from_token: int,
upto_token: int,
target_row_count: int,
) -> Tuple[List[Tuple[int, Tuple[str, str]]], int, bool]:
"""Get all the updates between the two tokens.
We return rows in the form of `(destination, user_id)` to keep the size
of each row bounded (rather than returning the sets in a row).
"""
if instance_name != self._instance_name:
# If not local we query over replication.
result = await self._repl_client(
instance_name=instance_name,
stream_name=PresenceFederationStream.NAME,
from_token=from_token,
upto_token=upto_token,
)
return result["updates"], result["upto_token"], result["limited"]
# We can find the correct position in the queue by noting that there is
# exactly one entry per stream ID, and that the last entry has an ID of
# `self._next_id - 1`, so we can count backwards from the end.
#
# Since the start of the queue is periodically truncated we need to
# handle the case where `from_token` stream ID has already been dropped.
start_idx = max(from_token - self._next_id, -len(self._queue))
to_send = [] # type: List[Tuple[int, Tuple[str, str]]]
limited = False
new_id = upto_token
for _, stream_id, destinations, user_ids in self._queue[start_idx:]:
if stream_id > upto_token:
break
new_id = stream_id
to_send.extend(
(stream_id, (destination, user_id))
for destination in destinations
for user_id in user_ids
)
if len(to_send) > target_row_count:
limited = True
break
return to_send, new_id, limited
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: list
):
if stream_name != PresenceFederationStream.NAME:
return
# We keep track of the current tokens
self._current_tokens[instance_name] = token
# If we're a federation sender we pull out the presence states to send
# and forward them on.
if not self._federation:
return
hosts_to_users = {} # type: Dict[str, Set[str]]
for row in rows:
hosts_to_users.setdefault(row.destination, set()).add(row.user_id)
for host, user_ids in hosts_to_users.items():
states = await self._presence_handler.current_state_for_users(user_ids)
self._federation.send_presence_to_destinations(states.values(), [host])

View File

@@ -36,9 +36,7 @@ class ReceiptsHandler(BaseHandler):
# same instance. Other federation sender instances will get notified by
# `synapse.app.generic_worker.FederationSenderHandler` when it sees it
# in the receipts stream.
self.federation_sender = None
if hs.should_send_federation():
self.federation_sender = hs.get_federation_sender()
self.federation_sender = hs.get_federation_sender()
# If we can handle the receipt EDUs we do so, otherwise we route them
# to the appropriate worker.

View File

@@ -57,9 +57,7 @@ class FollowerTypingHandler:
self.clock = hs.get_clock()
self.is_mine_id = hs.is_mine_id
self.federation = None
if hs.should_send_federation():
self.federation = hs.get_federation_sender()
self.federation = hs.get_federation_sender()
if hs.config.worker.writers.typing != hs.get_instance_name():
hs.get_federation_registry().register_instance_for_edu(

View File

@@ -18,6 +18,7 @@ from typing import TYPE_CHECKING, Any, Generator, Iterable, List, Optional, Tupl
from twisted.internet import defer
from synapse.events import EventBase
from synapse.handlers.presence import get_interested_remotes
from synapse.http.client import SimpleHttpClient
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable, run_in_background
@@ -50,6 +51,10 @@ class ModuleApi:
self._auth_handler = auth_handler
self._server_name = hs.hostname
self._presence_stream = hs.get_event_sources().sources["presence"]
self._state = hs.get_state_handler()
self._presence_router = hs.get_presence_router()
self._federation = self._hs.get_federation_sender()
# We expose these as properties below in order to attach a helpful docstring.
self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient
@@ -423,6 +428,9 @@ class ModuleApi:
# Force a presence initial_sync for this user next time
self._send_full_presence_to_local_users.add(user)
else:
if not self._federation:
continue
# Retrieve presence state for currently online users that this user
# is considered interested in
presence_events, _ = await self._presence_stream.get_new_events(
@@ -430,12 +438,16 @@ class ModuleApi:
)
# Send to remote destinations
await make_deferred_yieldable(
# We pull the federation sender here as we can only do so on workers
# that support sending presence
self._hs.get_federation_sender().send_presence(presence_events)
hosts_and_states = await get_interested_remotes(
self._store,
self._presence_router,
presence_events,
self._state,
)
for destinations, states in hosts_and_states:
self._federation.send_presence_to_destinations(states, destinations)
class PublicRoomListManager:
"""Contains methods for adding to, removing from and querying whether a room

View File

@@ -227,9 +227,7 @@ class Notifier:
self.appservice_handler = hs.get_application_service_handler()
self._pusher_pool = hs.get_pusherpool()
self.federation_sender = None
if hs.should_send_federation():
self.federation_sender = hs.get_federation_sender()
self.federation_sender = hs.get_federation_sender()
self.state_handler = hs.get_state_handler()

View File

@@ -20,16 +20,13 @@ from twisted.internet.defer import Deferred
from twisted.internet.protocol import ReconnectingClientFactory
from synapse.api.constants import EventTypes
from synapse.federation import send_queue
from synapse.federation.sender import FederationSender
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.streams import (
AccountDataStream,
DeviceListsStream,
GroupServerStream,
PresenceStream,
PushersStream,
PushRulesStream,
ReceiptsStream,
@@ -191,8 +188,6 @@ class ReplicationDataHandler:
self.stop_pusher(row.user_id, row.app_id, row.pushkey)
else:
await self.start_pusher(row.user_id, row.app_id, row.pushkey)
elif stream_name == PresenceStream.NAME:
await self._presence_handler.process_replication_rows(token, rows)
elif stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
@@ -221,6 +216,10 @@ class ReplicationDataHandler:
membership=row.data.membership,
)
await self._presence_handler.process_replication_rows(
stream_name, instance_name, token, rows
)
# Notify any waiting deferreds. The list is ordered by position so we
# just iterate through the list until we reach a position that is
# greater than the received row position.
@@ -338,6 +337,7 @@ class FederationSenderHandler:
self.store = hs.get_datastore()
self._is_mine_id = hs.is_mine_id
self._hs = hs
self._presence_handler = hs.get_presence_handler()
# We need to make a temporary value to ensure that mypy picks up the
# right type. We know we should have a federation sender instance since
@@ -356,14 +356,8 @@ class FederationSenderHandler:
self.federation_sender.wake_destination(server)
async def process_replication_rows(self, stream_name, token, rows):
# The federation stream contains things that we want to send out, e.g.
# presence, typing, etc.
if stream_name == "federation":
send_queue.process_rows_for_federation(self.federation_sender, rows)
await self.update_token(token)
# ... and when new receipts happen
elif stream_name == ReceiptsStream.NAME:
if stream_name == ReceiptsStream.NAME:
await self._on_new_receipts(rows)
# ... as well as device updates and messages
@@ -401,54 +395,3 @@ class FederationSenderHandler:
receipt.data,
)
await self.federation_sender.send_read_receipt(receipt_info)
async def update_token(self, token):
"""Update the record of where we have processed to in the federation stream.
Called after we have processed a an update received over replication. Sends
a FEDERATION_ACK back to the master, and stores the token that we have processed
in `federation_stream_position` so that we can restart where we left off.
"""
self.federation_position = token
# We save and send the ACK to master asynchronously, so we don't block
# processing on persistence. We don't need to do this operation for
# every single RDATA we receive, we just need to do it periodically.
if self._fed_position_linearizer.is_queued(None):
# There is already a task queued up to save and send the token, so
# no need to queue up another task.
return
run_as_background_process("_save_and_send_ack", self._save_and_send_ack)
async def _save_and_send_ack(self):
"""Save the current federation position in the database and send an ACK
to master with where we're up to.
"""
# We should only be calling this once we've got a token.
assert self.federation_position is not None
try:
# We linearize here to ensure we don't have races updating the token
#
# XXX this appears to be redundant, since the ReplicationCommandHandler
# has a linearizer which ensures that we only process one line of
# replication data at a time. Should we remove it, or is it doing useful
# service for robustness? Or could we replace it with an assertion that
# we're not being re-entered?
with (await self._fed_position_linearizer.queue(None)):
# We persist and ack the same position, so we take a copy of it
# here as otherwise it can get modified from underneath us.
current_position = self.federation_position
await self.store.update_federation_out_pos(
"federation", current_position
)
# We ACK this token over replication so that the master can drop
# its in memory queues
self._hs.get_tcp_replication().send_federation_ack(current_position)
except Exception:
logger.exception("Error updating federation stream position")

View File

@@ -297,33 +297,6 @@ class ClearUserSyncsCommand(Command):
return self.instance_id
class FederationAckCommand(Command):
"""Sent by the client when it has processed up to a given point in the
federation stream. This allows the master to drop in-memory caches of the
federation stream.
This must only be sent from one worker (i.e. the one sending federation)
Format::
FEDERATION_ACK <instance_name> <token>
"""
NAME = "FEDERATION_ACK"
def __init__(self, instance_name: str, token: int):
self.instance_name = instance_name
self.token = token
@classmethod
def from_line(cls, line: str) -> "FederationAckCommand":
instance_name, token = line.split(" ")
return cls(instance_name, int(token))
def to_line(self) -> str:
return "%s %s" % (self.instance_name, self.token)
class UserIpCommand(Command):
"""Sent periodically when a worker sees activity from a client.
@@ -389,7 +362,6 @@ _COMMANDS = (
NameCommand,
ReplicateCommand,
UserSyncCommand,
FederationAckCommand,
UserIpCommand,
RemoteServerUpCommand,
ClearUserSyncsCommand,
@@ -415,7 +387,6 @@ VALID_CLIENT_COMMANDS = (
PingCommand.NAME,
UserSyncCommand.NAME,
ClearUserSyncsCommand.NAME,
FederationAckCommand.NAME,
UserIpCommand.NAME,
ErrorCommand.NAME,
RemoteServerUpCommand.NAME,

View File

@@ -39,7 +39,6 @@ from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
from synapse.replication.tcp.commands import (
ClearUserSyncsCommand,
Command,
FederationAckCommand,
PositionCommand,
RdataCommand,
RemoteServerUpCommand,
@@ -54,7 +53,6 @@ from synapse.replication.tcp.streams import (
BackfillStream,
CachesStream,
EventsStream,
FederationStream,
ReceiptsStream,
Stream,
TagAccountDataStream,
@@ -73,7 +71,6 @@ inbound_rdata_count = Counter(
"synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"]
)
user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "")
remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")
@@ -157,11 +154,6 @@ class ReplicationCommandHandler:
if hs.config.worker_app is not None:
continue
if stream.NAME == FederationStream.NAME and hs.config.send_federation:
# We only support federation stream if federation sending
# has been disabled on the master.
continue
self._streams_to_replicate.append(stream)
# Map of stream name to batched updates. See RdataCommand for info on
@@ -365,14 +357,6 @@ class ReplicationCommandHandler:
else:
return None
def on_FEDERATION_ACK(
self, conn: IReplicationConnection, cmd: FederationAckCommand
):
federation_ack_counter.inc()
if self._federation_sender:
self._federation_sender.federation_ack(cmd.instance_name, cmd.token)
def on_USER_IP(
self, conn: IReplicationConnection, cmd: UserIpCommand
) -> Optional[Awaitable[None]]:
@@ -655,12 +639,6 @@ class ReplicationCommandHandler:
else:
logger.warning("Dropping command as not connected: %r", cmd.NAME)
def send_federation_ack(self, token: int):
"""Ack data for the federation stream. This allows the master to drop
data stored purely in memory.
"""
self.send_command(FederationAckCommand(self._instance_name, token))
def send_user_sync(
self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int
):

View File

@@ -30,6 +30,7 @@ from synapse.replication.tcp.streams._base import (
CachesStream,
DeviceListsStream,
GroupServerStream,
PresenceFederationStream,
PresenceStream,
PublicRoomsStream,
PushersStream,
@@ -42,7 +43,6 @@ from synapse.replication.tcp.streams._base import (
UserSignatureStream,
)
from synapse.replication.tcp.streams.events import EventsStream
from synapse.replication.tcp.streams.federation import FederationStream
STREAMS_MAP = {
stream.NAME: stream
@@ -50,6 +50,7 @@ STREAMS_MAP = {
EventsStream,
BackfillStream,
PresenceStream,
PresenceFederationStream,
TypingStream,
ReceiptsStream,
PushRulesStream,
@@ -58,7 +59,6 @@ STREAMS_MAP = {
PublicRoomsStream,
DeviceListsStream,
ToDeviceStream,
FederationStream,
TagAccountDataStream,
AccountDataStream,
GroupServerStream,
@@ -71,6 +71,7 @@ __all__ = [
"Stream",
"BackfillStream",
"PresenceStream",
"PresenceFederationStream",
"TypingStream",
"ReceiptsStream",
"PushRulesStream",

View File

@@ -290,6 +290,30 @@ class PresenceStream(Stream):
)
class PresenceFederationStream(Stream):
"""A stream used to send ad hoc presence updates over federation.
Streams the remote destination and the user ID of the presence state to
send.
"""
@attr.s(slots=True, auto_attribs=True)
class PresenceFederationStreamRow:
destination: str
user_id: str
NAME = "presence_federation"
ROW_TYPE = PresenceFederationStreamRow
def __init__(self, hs: "HomeServer"):
federation_queue = hs.get_presence_handler().get_federation_queue()
super().__init__(
hs.get_instance_name(),
federation_queue.get_current_token,
federation_queue.get_replication_rows,
)
class TypingStream(Stream):
TypingStreamRow = namedtuple(
"TypingStreamRow", ("room_id", "user_ids") # str # list(str)

View File

@@ -1,80 +0,0 @@
# Copyright 2017 Vector Creations Ltd
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import namedtuple
from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Tuple
from synapse.replication.tcp.streams._base import (
Stream,
current_token_without_instance,
make_http_update_function,
)
if TYPE_CHECKING:
from synapse.server import HomeServer
class FederationStream(Stream):
"""Data to be sent over federation. Only available when master has federation
sending disabled.
"""
FederationStreamRow = namedtuple(
"FederationStreamRow",
(
"type", # str, the type of data as defined in the BaseFederationRows
"data", # dict, serialization of a federation.send_queue.BaseFederationRow
),
)
NAME = "federation"
ROW_TYPE = FederationStreamRow
def __init__(self, hs: "HomeServer"):
if hs.config.worker_app is None:
# master process: get updates from the FederationRemoteSendQueue.
# (if the master is configured to send federation itself, federation_sender
# will be a real FederationSender, which has stubs for current_token and
# get_replication_rows.)
federation_sender = hs.get_federation_sender()
current_token = current_token_without_instance(
federation_sender.get_current_token
)
update_function = (
federation_sender.get_replication_rows
) # type: Callable[[str, int, int, int], Awaitable[Tuple[List[Tuple[int, Any]], int, bool]]]
elif hs.should_send_federation():
# federation sender: Query master process
update_function = make_http_update_function(hs, self.NAME)
current_token = self._stub_current_token
else:
# other worker: stub out the update function (we're not interested in
# any updates so when we get a POSITION we do nothing)
update_function = self._stub_update_function
current_token = self._stub_current_token
super().__init__(hs.get_instance_name(), current_token, update_function)
@staticmethod
def _stub_current_token(instance_name: str) -> int:
# dummy current-token method for use on workers
return 0
@staticmethod
async def _stub_update_function(
instance_name: str, from_token: int, upto_token: int, limit: int
) -> Tuple[list, int, bool]:
return [], upto_token, False

View File

@@ -59,7 +59,6 @@ from synapse.federation.federation_server import (
FederationHandlerRegistry,
FederationServer,
)
from synapse.federation.send_queue import FederationRemoteSendQueue
from synapse.federation.sender import AbstractFederationSender, FederationSender
from synapse.federation.transport.client import TransportLayerClient
from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer
@@ -580,13 +579,11 @@ class HomeServer(metaclass=abc.ABCMeta):
return TransportLayerClient(self)
@cache_in_self
def get_federation_sender(self) -> AbstractFederationSender:
def get_federation_sender(self) -> Optional[AbstractFederationSender]:
if self.should_send_federation():
return FederationSender(self)
elif not self.config.worker_app:
return FederationRemoteSendQueue(self)
else:
raise Exception("Workers cannot send federation traffic")
return None
@cache_in_self
def get_receipts_handler(self) -> ReceiptsHandler:

View File

@@ -471,6 +471,168 @@ class PresenceHandlerTestCase(unittest.HomeserverTestCase):
self.assertEqual(state.state, PresenceState.OFFLINE)
class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, hs):
self.presence_handler = hs.get_presence_handler()
self.clock = hs.get_clock()
self.instance_name = hs.get_instance_name()
self.queue = self.presence_handler.get_federation_queue()
def test_send_and_get(self):
state1 = UserPresenceState.default("@user1:test")
state2 = UserPresenceState.default("@user2:test")
state3 = UserPresenceState.default("@user3:test")
prev_token = self.queue.get_current_token(self.instance_name)
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
self.queue.send_presence_to_destinations((state3,), ("dest3",))
now_token = self.queue.get_current_token(self.instance_name)
rows, upto_token, limited = self.get_success(
self.queue.get_replication_rows("master", prev_token, now_token, 10)
)
self.assertEqual(upto_token, now_token)
self.assertFalse(limited)
expected_rows = [
(1, ("dest1", "@user1:test")),
(1, ("dest2", "@user1:test")),
(1, ("dest1", "@user2:test")),
(1, ("dest2", "@user2:test")),
(2, ("dest3", "@user3:test")),
]
self.assertCountEqual(rows, expected_rows)
def test_send_and_get_split(self):
state1 = UserPresenceState.default("@user1:test")
state2 = UserPresenceState.default("@user2:test")
state3 = UserPresenceState.default("@user3:test")
prev_token = self.queue.get_current_token(self.instance_name)
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
now_token = self.queue.get_current_token(self.instance_name)
self.queue.send_presence_to_destinations((state3,), ("dest3",))
rows, upto_token, limited = self.get_success(
self.queue.get_replication_rows("master", prev_token, now_token, 10)
)
self.assertEqual(upto_token, now_token)
self.assertFalse(limited)
expected_rows = [
(1, ("dest1", "@user1:test")),
(1, ("dest2", "@user1:test")),
(1, ("dest1", "@user2:test")),
(1, ("dest2", "@user2:test")),
]
self.assertCountEqual(rows, expected_rows)
def test_clear_queue_all(self):
state1 = UserPresenceState.default("@user1:test")
state2 = UserPresenceState.default("@user2:test")
state3 = UserPresenceState.default("@user3:test")
prev_token = self.queue.get_current_token(self.instance_name)
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
self.queue.send_presence_to_destinations((state3,), ("dest3",))
self.reactor.advance(10 * 60 * 1000)
now_token = self.queue.get_current_token(self.instance_name)
rows, upto_token, limited = self.get_success(
self.queue.get_replication_rows("master", prev_token, now_token, 10)
)
self.assertEqual(upto_token, now_token)
self.assertFalse(limited)
self.assertCountEqual(rows, [])
prev_token = self.queue.get_current_token(self.instance_name)
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
self.queue.send_presence_to_destinations((state3,), ("dest3",))
now_token = self.queue.get_current_token(self.instance_name)
rows, upto_token, limited = self.get_success(
self.queue.get_replication_rows("master", prev_token, now_token, 10)
)
self.assertEqual(upto_token, now_token)
self.assertFalse(limited)
expected_rows = [
(3, ("dest1", "@user1:test")),
(3, ("dest2", "@user1:test")),
(3, ("dest1", "@user2:test")),
(3, ("dest2", "@user2:test")),
(4, ("dest3", "@user3:test")),
]
self.assertCountEqual(rows, expected_rows)
def test_partially_clear_queue(self):
state1 = UserPresenceState.default("@user1:test")
state2 = UserPresenceState.default("@user2:test")
state3 = UserPresenceState.default("@user3:test")
prev_token = self.queue.get_current_token(self.instance_name)
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
self.reactor.advance(2 * 60 * 1000)
self.queue.send_presence_to_destinations((state3,), ("dest3",))
self.reactor.advance(4 * 60 * 1000)
now_token = self.queue.get_current_token(self.instance_name)
rows, upto_token, limited = self.get_success(
self.queue.get_replication_rows("master", prev_token, now_token, 10)
)
self.assertEqual(upto_token, now_token)
self.assertFalse(limited)
expected_rows = [
(2, ("dest3", "@user3:test")),
]
self.assertCountEqual(rows, [])
prev_token = self.queue.get_current_token(self.instance_name)
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
self.queue.send_presence_to_destinations((state3,), ("dest3",))
now_token = self.queue.get_current_token(self.instance_name)
rows, upto_token, limited = self.get_success(
self.queue.get_replication_rows("master", prev_token, now_token, 10)
)
self.assertEqual(upto_token, now_token)
self.assertFalse(limited)
expected_rows = [
(3, ("dest1", "@user1:test")),
(3, ("dest2", "@user1:test")),
(3, ("dest1", "@user2:test")),
(3, ("dest2", "@user2:test")),
(4, ("dest3", "@user3:test")),
]
self.assertCountEqual(rows, expected_rows)
class PresenceJoinTestCase(unittest.HomeserverTestCase):
"""Tests remote servers get told about presence of users in the room when
they join and when new local users join.

View File

@@ -1,80 +0,0 @@
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.federation.send_queue import EduRow
from synapse.replication.tcp.streams.federation import FederationStream
from tests.replication._base import BaseStreamTestCase
class FederationStreamTestCase(BaseStreamTestCase):
def _get_worker_hs_config(self) -> dict:
# enable federation sending on the worker
config = super()._get_worker_hs_config()
# TODO: make it so we don't need both of these
config["send_federation"] = False
config["worker_app"] = "synapse.app.federation_sender"
return config
def test_catchup(self):
"""Basic test of catchup on reconnect
Makes sure that updates sent while we are offline are received later.
"""
fed_sender = self.hs.get_federation_sender()
received_rows = self.test_handler.received_rdata_rows
fed_sender.build_and_send_edu("testdest", "m.test_edu", {"a": "b"})
self.reconnect()
self.reactor.advance(0)
# check we're testing what we think we are: no rows should yet have been
# received
self.assertEqual(received_rows, [])
# We should now see an attempt to connect to the master
request = self.handle_http_replication_attempt()
self.assert_request_is_get_repl_stream_updates(request, "federation")
# we should have received an update row
stream_name, token, row = received_rows.pop()
self.assertEqual(stream_name, "federation")
self.assertIsInstance(row, FederationStream.FederationStreamRow)
self.assertEqual(row.type, EduRow.TypeId)
edurow = EduRow.from_data(row.data)
self.assertEqual(edurow.edu.edu_type, "m.test_edu")
self.assertEqual(edurow.edu.origin, self.hs.hostname)
self.assertEqual(edurow.edu.destination, "testdest")
self.assertEqual(edurow.edu.content, {"a": "b"})
self.assertEqual(received_rows, [])
# additional updates should be transferred without an HTTP hit
fed_sender.build_and_send_edu("testdest", "m.test1", {"c": "d"})
self.reactor.advance(0)
# there should be no http hit
self.assertEqual(len(self.reactor.tcpClients), 0)
# ... but we should have a row
self.assertEqual(len(received_rows), 1)
stream_name, token, row = received_rows.pop()
self.assertEqual(stream_name, "federation")
self.assertIsInstance(row, FederationStream.FederationStreamRow)
self.assertEqual(row.type, EduRow.TypeId)
edurow = EduRow.from_data(row.data)
self.assertEqual(edurow.edu.edu_type, "m.test1")
self.assertEqual(edurow.edu.origin, self.hs.hostname)
self.assertEqual(edurow.edu.destination, "testdest")
self.assertEqual(edurow.edu.content, {"c": "d"})

View File

@@ -1,73 +0,0 @@
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import mock
from synapse.app.generic_worker import GenericWorkerServer
from synapse.replication.tcp.commands import FederationAckCommand
from synapse.replication.tcp.protocol import IReplicationConnection
from synapse.replication.tcp.streams.federation import FederationStream
from tests.unittest import HomeserverTestCase
class FederationAckTestCase(HomeserverTestCase):
def default_config(self) -> dict:
config = super().default_config()
config["worker_app"] = "synapse.app.federation_sender"
config["send_federation"] = False
return config
def make_homeserver(self, reactor, clock):
hs = self.setup_test_homeserver(homeserver_to_use=GenericWorkerServer)
return hs
def test_federation_ack_sent(self):
"""A FEDERATION_ACK should be sent back after each RDATA federation
This test checks that the federation sender is correctly sending back
FEDERATION_ACK messages. The test works by spinning up a federation_sender
worker server, and then fishing out its ReplicationCommandHandler. We wire
the RCH up to a mock connection (so that we can observe the command being sent)
and then poke in an RDATA row.
XXX: it might be nice to do this by pretending to be a synapse master worker
(or a redis server), and having the worker connect to us via a mocked-up TCP
transport, rather than assuming that the implementation has a
ReplicationCommandHandler.
"""
rch = self.hs.get_tcp_replication()
# wire up the ReplicationCommandHandler to a mock connection, which needs
# to implement IReplicationConnection. (Note that Mock doesn't understand
# interfaces, but casing an interface to a list gives the attributes.)
mock_connection = mock.Mock(spec=list(IReplicationConnection))
rch.new_connection(mock_connection)
# tell it it received an RDATA row
self.get_success(
rch.on_rdata(
"federation",
"master",
token=10,
rows=[FederationStream.FederationStreamRow(type="x", data=[1, 2, 3])],
)
)
# now check that the FEDERATION_ACK was sent
mock_connection.send_command.assert_called_once()
cmd = mock_connection.send_command.call_args[0][0]
assert isinstance(cmd, FederationAckCommand)
self.assertEqual(cmd.token, 10)