mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Compare commits
6 Commits
erikj/rust
...
erikj/remo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
68b9eb694f | ||
|
|
9f1a20f0c2 | ||
|
|
7575a686fd | ||
|
|
40bc8774c8 | ||
|
|
5c63b653c8 | ||
|
|
8f566077fb |
1
changelog.d/9819.feature
Normal file
1
changelog.d/9819.feature
Normal file
@@ -0,0 +1 @@
|
||||
Add experimental support for handling presence on a worker.
|
||||
@@ -216,10 +216,6 @@ Asks the server for the current position of all streams.
|
||||
|
||||
This is used when a worker is shutting down.
|
||||
|
||||
#### FEDERATION_ACK (C)
|
||||
|
||||
Acknowledge receipt of some federation data
|
||||
|
||||
### REMOTE_SERVER_UP (S, C)
|
||||
|
||||
Inform other processes that a remote server may have come back online.
|
||||
|
||||
@@ -1,576 +0,0 @@
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""A federation sender that forwards things to be sent across replication to
|
||||
a worker process.
|
||||
|
||||
It assumes there is a single worker process feeding off of it.
|
||||
|
||||
Each row in the replication stream consists of a type and some json, where the
|
||||
types indicate whether they are presence, or edus, etc.
|
||||
|
||||
Ephemeral or non-event data are queued up in-memory. When the worker requests
|
||||
updates since a particular point, all in-memory data since before that point is
|
||||
dropped. We also expire things in the queue after 5 minutes, to ensure that a
|
||||
dead worker doesn't cause the queues to grow limitlessly.
|
||||
|
||||
Events are replicated via a separate events stream.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from collections import namedtuple
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Dict,
|
||||
Hashable,
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Sized,
|
||||
Tuple,
|
||||
Type,
|
||||
)
|
||||
|
||||
from sortedcontainers import SortedDict
|
||||
|
||||
from synapse.api.presence import UserPresenceState
|
||||
from synapse.federation.sender import AbstractFederationSender, FederationSender
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.replication.tcp.streams.federation import FederationStream
|
||||
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
from .units import Edu
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FederationRemoteSendQueue(AbstractFederationSender):
|
||||
"""A drop in replacement for FederationSender"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.server_name = hs.hostname
|
||||
self.clock = hs.get_clock()
|
||||
self.notifier = hs.get_notifier()
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
|
||||
# We may have multiple federation sender instances, so we need to track
|
||||
# their positions separately.
|
||||
self._sender_instances = hs.config.worker.federation_shard_config.instances
|
||||
self._sender_positions = {} # type: Dict[str, int]
|
||||
|
||||
# Pending presence map user_id -> UserPresenceState
|
||||
self.presence_map = {} # type: Dict[str, UserPresenceState]
|
||||
|
||||
# Stream position -> list[user_id]
|
||||
self.presence_changed = SortedDict() # type: SortedDict[int, List[str]]
|
||||
|
||||
# Stores the destinations we need to explicitly send presence to about a
|
||||
# given user.
|
||||
# Stream position -> (user_id, destinations)
|
||||
self.presence_destinations = (
|
||||
SortedDict()
|
||||
) # type: SortedDict[int, Tuple[str, Iterable[str]]]
|
||||
|
||||
# (destination, key) -> EDU
|
||||
self.keyed_edu = {} # type: Dict[Tuple[str, tuple], Edu]
|
||||
|
||||
# stream position -> (destination, key)
|
||||
self.keyed_edu_changed = (
|
||||
SortedDict()
|
||||
) # type: SortedDict[int, Tuple[str, tuple]]
|
||||
|
||||
self.edus = SortedDict() # type: SortedDict[int, Edu]
|
||||
|
||||
# stream ID for the next entry into presence_changed/keyed_edu_changed/edus.
|
||||
self.pos = 1
|
||||
|
||||
# map from stream ID to the time that stream entry was generated, so that we
|
||||
# can clear out entries after a while
|
||||
self.pos_time = SortedDict() # type: SortedDict[int, int]
|
||||
|
||||
# EVERYTHING IS SAD. In particular, python only makes new scopes when
|
||||
# we make a new function, so we need to make a new function so the inner
|
||||
# lambda binds to the queue rather than to the name of the queue which
|
||||
# changes. ARGH.
|
||||
def register(name: str, queue: Sized) -> None:
|
||||
LaterGauge(
|
||||
"synapse_federation_send_queue_%s_size" % (queue_name,),
|
||||
"",
|
||||
[],
|
||||
lambda: len(queue),
|
||||
)
|
||||
|
||||
for queue_name in [
|
||||
"presence_map",
|
||||
"presence_changed",
|
||||
"keyed_edu",
|
||||
"keyed_edu_changed",
|
||||
"edus",
|
||||
"pos_time",
|
||||
"presence_destinations",
|
||||
]:
|
||||
register(queue_name, getattr(self, queue_name))
|
||||
|
||||
self.clock.looping_call(self._clear_queue, 30 * 1000)
|
||||
|
||||
def _next_pos(self) -> int:
|
||||
pos = self.pos
|
||||
self.pos += 1
|
||||
self.pos_time[self.clock.time_msec()] = pos
|
||||
return pos
|
||||
|
||||
def _clear_queue(self) -> None:
|
||||
"""Clear the queues for anything older than N minutes"""
|
||||
|
||||
FIVE_MINUTES_AGO = 5 * 60 * 1000
|
||||
now = self.clock.time_msec()
|
||||
|
||||
keys = self.pos_time.keys()
|
||||
time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO)
|
||||
if not keys[:time]:
|
||||
return
|
||||
|
||||
position_to_delete = max(keys[:time])
|
||||
for key in keys[:time]:
|
||||
del self.pos_time[key]
|
||||
|
||||
self._clear_queue_before_pos(position_to_delete)
|
||||
|
||||
def _clear_queue_before_pos(self, position_to_delete: int) -> None:
|
||||
"""Clear all the queues from before a given position"""
|
||||
with Measure(self.clock, "send_queue._clear"):
|
||||
# Delete things out of presence maps
|
||||
keys = self.presence_changed.keys()
|
||||
i = self.presence_changed.bisect_left(position_to_delete)
|
||||
for key in keys[:i]:
|
||||
del self.presence_changed[key]
|
||||
|
||||
user_ids = {
|
||||
user_id for uids in self.presence_changed.values() for user_id in uids
|
||||
}
|
||||
|
||||
keys = self.presence_destinations.keys()
|
||||
i = self.presence_destinations.bisect_left(position_to_delete)
|
||||
for key in keys[:i]:
|
||||
del self.presence_destinations[key]
|
||||
|
||||
user_ids.update(
|
||||
user_id for user_id, _ in self.presence_destinations.values()
|
||||
)
|
||||
|
||||
to_del = [
|
||||
user_id for user_id in self.presence_map if user_id not in user_ids
|
||||
]
|
||||
for user_id in to_del:
|
||||
del self.presence_map[user_id]
|
||||
|
||||
# Delete things out of keyed edus
|
||||
keys = self.keyed_edu_changed.keys()
|
||||
i = self.keyed_edu_changed.bisect_left(position_to_delete)
|
||||
for key in keys[:i]:
|
||||
del self.keyed_edu_changed[key]
|
||||
|
||||
live_keys = set()
|
||||
for edu_key in self.keyed_edu_changed.values():
|
||||
live_keys.add(edu_key)
|
||||
|
||||
keys_to_del = [
|
||||
edu_key for edu_key in self.keyed_edu if edu_key not in live_keys
|
||||
]
|
||||
for edu_key in keys_to_del:
|
||||
del self.keyed_edu[edu_key]
|
||||
|
||||
# Delete things out of edu map
|
||||
keys = self.edus.keys()
|
||||
i = self.edus.bisect_left(position_to_delete)
|
||||
for key in keys[:i]:
|
||||
del self.edus[key]
|
||||
|
||||
def notify_new_events(self, max_token: RoomStreamToken) -> None:
|
||||
"""As per FederationSender"""
|
||||
# This should never get called.
|
||||
raise NotImplementedError()
|
||||
|
||||
def build_and_send_edu(
|
||||
self,
|
||||
destination: str,
|
||||
edu_type: str,
|
||||
content: JsonDict,
|
||||
key: Optional[Hashable] = None,
|
||||
) -> None:
|
||||
"""As per FederationSender"""
|
||||
if destination == self.server_name:
|
||||
logger.info("Not sending EDU to ourselves")
|
||||
return
|
||||
|
||||
pos = self._next_pos()
|
||||
|
||||
edu = Edu(
|
||||
origin=self.server_name,
|
||||
destination=destination,
|
||||
edu_type=edu_type,
|
||||
content=content,
|
||||
)
|
||||
|
||||
if key:
|
||||
assert isinstance(key, tuple)
|
||||
self.keyed_edu[(destination, key)] = edu
|
||||
self.keyed_edu_changed[pos] = (destination, key)
|
||||
else:
|
||||
self.edus[pos] = edu
|
||||
|
||||
self.notifier.on_new_replication_data()
|
||||
|
||||
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
|
||||
"""As per FederationSender
|
||||
|
||||
Args:
|
||||
receipt:
|
||||
"""
|
||||
# nothing to do here: the replication listener will handle it.
|
||||
|
||||
def send_presence(self, states: List[UserPresenceState]) -> None:
|
||||
"""As per FederationSender
|
||||
|
||||
Args:
|
||||
states
|
||||
"""
|
||||
pos = self._next_pos()
|
||||
|
||||
# We only want to send presence for our own users, so lets always just
|
||||
# filter here just in case.
|
||||
local_states = [s for s in states if self.is_mine_id(s.user_id)]
|
||||
|
||||
self.presence_map.update({state.user_id: state for state in local_states})
|
||||
self.presence_changed[pos] = [state.user_id for state in local_states]
|
||||
|
||||
self.notifier.on_new_replication_data()
|
||||
|
||||
def send_presence_to_destinations(
|
||||
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
|
||||
) -> None:
|
||||
"""As per FederationSender
|
||||
|
||||
Args:
|
||||
states
|
||||
destinations
|
||||
"""
|
||||
for state in states:
|
||||
pos = self._next_pos()
|
||||
self.presence_map.update({state.user_id: state for state in states})
|
||||
self.presence_destinations[pos] = (state.user_id, destinations)
|
||||
|
||||
self.notifier.on_new_replication_data()
|
||||
|
||||
def send_device_messages(self, destination: str) -> None:
|
||||
"""As per FederationSender"""
|
||||
# We don't need to replicate this as it gets sent down a different
|
||||
# stream.
|
||||
|
||||
def wake_destination(self, server: str) -> None:
|
||||
pass
|
||||
|
||||
def get_current_token(self) -> int:
|
||||
return self.pos - 1
|
||||
|
||||
def federation_ack(self, instance_name: str, token: int) -> None:
|
||||
if self._sender_instances:
|
||||
# If we have configured multiple federation sender instances we need
|
||||
# to track their positions separately, and only clear the queue up
|
||||
# to the token all instances have acked.
|
||||
self._sender_positions[instance_name] = token
|
||||
token = min(self._sender_positions.values())
|
||||
|
||||
self._clear_queue_before_pos(token)
|
||||
|
||||
async def get_replication_rows(
|
||||
self, instance_name: str, from_token: int, to_token: int, target_row_count: int
|
||||
) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
|
||||
"""Get rows to be sent over federation between the two tokens
|
||||
|
||||
Args:
|
||||
instance_name: the name of the current process
|
||||
from_token: the previous stream token: the starting point for fetching the
|
||||
updates
|
||||
to_token: the new stream token: the point to get updates up to
|
||||
target_row_count: a target for the number of rows to be returned.
|
||||
|
||||
Returns: a triplet `(updates, new_last_token, limited)`, where:
|
||||
* `updates` is a list of `(token, row)` entries.
|
||||
* `new_last_token` is the new position in stream.
|
||||
* `limited` is whether there are more updates to fetch.
|
||||
"""
|
||||
# TODO: Handle target_row_count.
|
||||
|
||||
# To handle restarts where we wrap around
|
||||
if from_token > self.pos:
|
||||
from_token = -1
|
||||
|
||||
# list of tuple(int, BaseFederationRow), where the first is the position
|
||||
# of the federation stream.
|
||||
rows = [] # type: List[Tuple[int, BaseFederationRow]]
|
||||
|
||||
# Fetch changed presence
|
||||
i = self.presence_changed.bisect_right(from_token)
|
||||
j = self.presence_changed.bisect_right(to_token) + 1
|
||||
dest_user_ids = [
|
||||
(pos, user_id)
|
||||
for pos, user_id_list in self.presence_changed.items()[i:j]
|
||||
for user_id in user_id_list
|
||||
]
|
||||
|
||||
for (key, user_id) in dest_user_ids:
|
||||
rows.append((key, PresenceRow(state=self.presence_map[user_id])))
|
||||
|
||||
# Fetch presence to send to destinations
|
||||
i = self.presence_destinations.bisect_right(from_token)
|
||||
j = self.presence_destinations.bisect_right(to_token) + 1
|
||||
|
||||
for pos, (user_id, dests) in self.presence_destinations.items()[i:j]:
|
||||
rows.append(
|
||||
(
|
||||
pos,
|
||||
PresenceDestinationsRow(
|
||||
state=self.presence_map[user_id], destinations=list(dests)
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
# Fetch changes keyed edus
|
||||
i = self.keyed_edu_changed.bisect_right(from_token)
|
||||
j = self.keyed_edu_changed.bisect_right(to_token) + 1
|
||||
# We purposefully clobber based on the key here, python dict comprehensions
|
||||
# always use the last value, so this will correctly point to the last
|
||||
# stream position.
|
||||
keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
|
||||
|
||||
for ((destination, edu_key), pos) in keyed_edus.items():
|
||||
rows.append(
|
||||
(
|
||||
pos,
|
||||
KeyedEduRow(
|
||||
key=edu_key, edu=self.keyed_edu[(destination, edu_key)]
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
# Fetch changed edus
|
||||
i = self.edus.bisect_right(from_token)
|
||||
j = self.edus.bisect_right(to_token) + 1
|
||||
edus = self.edus.items()[i:j]
|
||||
|
||||
for (pos, edu) in edus:
|
||||
rows.append((pos, EduRow(edu)))
|
||||
|
||||
# Sort rows based on pos
|
||||
rows.sort()
|
||||
|
||||
return (
|
||||
[(pos, (row.TypeId, row.to_data())) for pos, row in rows],
|
||||
to_token,
|
||||
False,
|
||||
)
|
||||
|
||||
|
||||
class BaseFederationRow:
|
||||
"""Base class for rows to be sent in the federation stream.
|
||||
|
||||
Specifies how to identify, serialize and deserialize the different types.
|
||||
"""
|
||||
|
||||
TypeId = "" # Unique string that ids the type. Must be overridden in sub classes.
|
||||
|
||||
@staticmethod
|
||||
def from_data(data):
|
||||
"""Parse the data from the federation stream into a row.
|
||||
|
||||
Args:
|
||||
data: The value of ``data`` from FederationStreamRow.data, type
|
||||
depends on the type of stream
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def to_data(self):
|
||||
"""Serialize this row to be sent over the federation stream.
|
||||
|
||||
Returns:
|
||||
The value to be sent in FederationStreamRow.data. The type depends
|
||||
on the type of stream.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def add_to_buffer(self, buff):
|
||||
"""Add this row to the appropriate field in the buffer ready for this
|
||||
to be sent over federation.
|
||||
|
||||
We use a buffer so that we can batch up events that have come in at
|
||||
the same time and send them all at once.
|
||||
|
||||
Args:
|
||||
buff (BufferedToSend)
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class PresenceRow(
|
||||
BaseFederationRow, namedtuple("PresenceRow", ("state",)) # UserPresenceState
|
||||
):
|
||||
TypeId = "p"
|
||||
|
||||
@staticmethod
|
||||
def from_data(data):
|
||||
return PresenceRow(state=UserPresenceState.from_dict(data))
|
||||
|
||||
def to_data(self):
|
||||
return self.state.as_dict()
|
||||
|
||||
def add_to_buffer(self, buff):
|
||||
buff.presence.append(self.state)
|
||||
|
||||
|
||||
class PresenceDestinationsRow(
|
||||
BaseFederationRow,
|
||||
namedtuple(
|
||||
"PresenceDestinationsRow",
|
||||
("state", "destinations"), # UserPresenceState # list[str]
|
||||
),
|
||||
):
|
||||
TypeId = "pd"
|
||||
|
||||
@staticmethod
|
||||
def from_data(data):
|
||||
return PresenceDestinationsRow(
|
||||
state=UserPresenceState.from_dict(data["state"]), destinations=data["dests"]
|
||||
)
|
||||
|
||||
def to_data(self):
|
||||
return {"state": self.state.as_dict(), "dests": self.destinations}
|
||||
|
||||
def add_to_buffer(self, buff):
|
||||
buff.presence_destinations.append((self.state, self.destinations))
|
||||
|
||||
|
||||
class KeyedEduRow(
|
||||
BaseFederationRow,
|
||||
namedtuple(
|
||||
"KeyedEduRow",
|
||||
("key", "edu"), # tuple(str) - the edu key passed to send_edu # Edu
|
||||
),
|
||||
):
|
||||
"""Streams EDUs that have an associated key that is ued to clobber. For example,
|
||||
typing EDUs clobber based on room_id.
|
||||
"""
|
||||
|
||||
TypeId = "k"
|
||||
|
||||
@staticmethod
|
||||
def from_data(data):
|
||||
return KeyedEduRow(key=tuple(data["key"]), edu=Edu(**data["edu"]))
|
||||
|
||||
def to_data(self):
|
||||
return {"key": self.key, "edu": self.edu.get_internal_dict()}
|
||||
|
||||
def add_to_buffer(self, buff):
|
||||
buff.keyed_edus.setdefault(self.edu.destination, {})[self.key] = self.edu
|
||||
|
||||
|
||||
class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu
|
||||
"""Streams EDUs that don't have keys. See KeyedEduRow"""
|
||||
|
||||
TypeId = "e"
|
||||
|
||||
@staticmethod
|
||||
def from_data(data):
|
||||
return EduRow(Edu(**data))
|
||||
|
||||
def to_data(self):
|
||||
return self.edu.get_internal_dict()
|
||||
|
||||
def add_to_buffer(self, buff):
|
||||
buff.edus.setdefault(self.edu.destination, []).append(self.edu)
|
||||
|
||||
|
||||
_rowtypes = (
|
||||
PresenceRow,
|
||||
PresenceDestinationsRow,
|
||||
KeyedEduRow,
|
||||
EduRow,
|
||||
) # type: Tuple[Type[BaseFederationRow], ...]
|
||||
|
||||
TypeToRow = {Row.TypeId: Row for Row in _rowtypes}
|
||||
|
||||
|
||||
ParsedFederationStreamData = namedtuple(
|
||||
"ParsedFederationStreamData",
|
||||
(
|
||||
"presence", # list(UserPresenceState)
|
||||
"presence_destinations", # list of tuples of UserPresenceState and destinations
|
||||
"keyed_edus", # dict of destination -> { key -> Edu }
|
||||
"edus", # dict of destination -> [Edu]
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def process_rows_for_federation(
|
||||
transaction_queue: FederationSender,
|
||||
rows: List[FederationStream.FederationStreamRow],
|
||||
) -> None:
|
||||
"""Parse a list of rows from the federation stream and put them in the
|
||||
transaction queue ready for sending to the relevant homeservers.
|
||||
|
||||
Args:
|
||||
transaction_queue
|
||||
rows
|
||||
"""
|
||||
|
||||
# The federation stream contains a bunch of different types of
|
||||
# rows that need to be handled differently. We parse the rows, put
|
||||
# them into the appropriate collection and then send them off.
|
||||
|
||||
buff = ParsedFederationStreamData(
|
||||
presence=[],
|
||||
presence_destinations=[],
|
||||
keyed_edus={},
|
||||
edus={},
|
||||
)
|
||||
|
||||
# Parse the rows in the stream and add to the buffer
|
||||
for row in rows:
|
||||
if row.type not in TypeToRow:
|
||||
logger.error("Unrecognized federation row type %r", row.type)
|
||||
continue
|
||||
|
||||
RowType = TypeToRow[row.type]
|
||||
parsed_row = RowType.from_data(row.data)
|
||||
parsed_row.add_to_buffer(buff)
|
||||
|
||||
if buff.presence:
|
||||
transaction_queue.send_presence(buff.presence)
|
||||
|
||||
for state, destinations in buff.presence_destinations:
|
||||
transaction_queue.send_presence_to_destinations(
|
||||
states=[state], destinations=destinations
|
||||
)
|
||||
|
||||
for destination, edu_map in buff.keyed_edus.items():
|
||||
for key, edu in edu_map.items():
|
||||
transaction_queue.send_edu(edu, key)
|
||||
|
||||
for destination, edu_list in buff.edus.items():
|
||||
for edu in edu_list:
|
||||
transaction_queue.send_edu(edu, None)
|
||||
@@ -24,8 +24,6 @@ from synapse.events import EventBase
|
||||
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
|
||||
from synapse.federation.sender.transaction_manager import TransactionManager
|
||||
from synapse.federation.units import Edu
|
||||
from synapse.handlers.presence import get_interested_remotes
|
||||
from synapse.logging.context import preserve_fn
|
||||
from synapse.metrics import (
|
||||
LaterGauge,
|
||||
event_processing_loop_counter,
|
||||
@@ -34,7 +32,7 @@ from synapse.metrics import (
|
||||
)
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.types import Collection, JsonDict, ReadReceipt, RoomStreamToken
|
||||
from synapse.util.metrics import Measure, measure_func
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.events.presence_router import PresenceRouter
|
||||
@@ -79,15 +77,6 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def send_presence(self, states: List[UserPresenceState]) -> None:
|
||||
"""Send the new presence states to the appropriate destinations.
|
||||
|
||||
This actually queues up the presence states ready for sending and
|
||||
triggers a background task to process them and send out the transactions.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def send_presence_to_destinations(
|
||||
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
|
||||
@@ -134,10 +123,6 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
|
||||
def get_current_token(self) -> int:
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def federation_ack(self, instance_name: str, token: int) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
async def get_replication_rows(
|
||||
self, instance_name: str, from_token: int, to_token: int, target_row_count: int
|
||||
@@ -176,11 +161,6 @@ class FederationSender(AbstractFederationSender):
|
||||
),
|
||||
)
|
||||
|
||||
# Map of user_id -> UserPresenceState for all the pending presence
|
||||
# to be sent out by user_id. Entries here get processed and put in
|
||||
# pending_presence_by_dest
|
||||
self.pending_presence = {} # type: Dict[str, UserPresenceState]
|
||||
|
||||
LaterGauge(
|
||||
"synapse_federation_transaction_queue_pending_pdus",
|
||||
"",
|
||||
@@ -201,8 +181,6 @@ class FederationSender(AbstractFederationSender):
|
||||
self._is_processing = False
|
||||
self._last_poked_id = -1
|
||||
|
||||
self._processing_pending_presence = False
|
||||
|
||||
# map from room_id to a set of PerDestinationQueues which we believe are
|
||||
# awaiting a call to flush_read_receipts_for_room. The presence of an entry
|
||||
# here for a given room means that we are rate-limiting RR flushes to that room,
|
||||
@@ -546,48 +524,6 @@ class FederationSender(AbstractFederationSender):
|
||||
for queue in queues:
|
||||
queue.flush_read_receipts_for_room(room_id)
|
||||
|
||||
@preserve_fn # the caller should not yield on this
|
||||
async def send_presence(self, states: List[UserPresenceState]) -> None:
|
||||
"""Send the new presence states to the appropriate destinations.
|
||||
|
||||
This actually queues up the presence states ready for sending and
|
||||
triggers a background task to process them and send out the transactions.
|
||||
"""
|
||||
if not self.hs.config.use_presence:
|
||||
# No-op if presence is disabled.
|
||||
return
|
||||
|
||||
# First we queue up the new presence by user ID, so multiple presence
|
||||
# updates in quick succession are correctly handled.
|
||||
# We only want to send presence for our own users, so lets always just
|
||||
# filter here just in case.
|
||||
self.pending_presence.update(
|
||||
{state.user_id: state for state in states if self.is_mine_id(state.user_id)}
|
||||
)
|
||||
|
||||
# We then handle the new pending presence in batches, first figuring
|
||||
# out the destinations we need to send each state to and then poking it
|
||||
# to attempt a new transaction. We linearize this so that we don't
|
||||
# accidentally mess up the ordering and send multiple presence updates
|
||||
# in the wrong order
|
||||
if self._processing_pending_presence:
|
||||
return
|
||||
|
||||
self._processing_pending_presence = True
|
||||
try:
|
||||
while True:
|
||||
states_map = self.pending_presence
|
||||
self.pending_presence = {}
|
||||
|
||||
if not states_map:
|
||||
break
|
||||
|
||||
await self._process_presence_inner(list(states_map.values()))
|
||||
except Exception:
|
||||
logger.exception("Error sending presence states to servers")
|
||||
finally:
|
||||
self._processing_pending_presence = False
|
||||
|
||||
def send_presence_to_destinations(
|
||||
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
|
||||
) -> None:
|
||||
@@ -608,40 +544,6 @@ class FederationSender(AbstractFederationSender):
|
||||
continue
|
||||
self._get_per_destination_queue(destination).send_presence(states)
|
||||
|
||||
@measure_func("txnqueue._process_presence")
|
||||
async def _process_presence_inner(self, states: List[UserPresenceState]) -> None:
|
||||
"""Given a list of states populate self.pending_presence_by_dest and
|
||||
poke to send a new transaction to each destination
|
||||
"""
|
||||
# We pull the presence router here instead of __init__
|
||||
# to prevent a dependency cycle:
|
||||
#
|
||||
# AuthHandler -> Notifier -> FederationSender
|
||||
# -> PresenceRouter -> ModuleApi -> AuthHandler
|
||||
if self._presence_router is None:
|
||||
self._presence_router = self.hs.get_presence_router()
|
||||
|
||||
assert self._presence_router is not None
|
||||
|
||||
hosts_and_states = await get_interested_remotes(
|
||||
self.store,
|
||||
self._presence_router,
|
||||
states,
|
||||
self.state,
|
||||
)
|
||||
|
||||
for destinations, states in hosts_and_states:
|
||||
for destination in destinations:
|
||||
if destination == self.server_name:
|
||||
continue
|
||||
|
||||
if not self._federation_shard_config.should_handle(
|
||||
self._instance_name, destination
|
||||
):
|
||||
continue
|
||||
|
||||
self._get_per_destination_queue(destination).send_presence(states)
|
||||
|
||||
def build_and_send_edu(
|
||||
self,
|
||||
destination: str,
|
||||
@@ -729,10 +631,6 @@ class FederationSender(AbstractFederationSender):
|
||||
# to a worker.
|
||||
return 0
|
||||
|
||||
def federation_ack(self, instance_name: str, token: int) -> None:
|
||||
# It is not expected that this gets called on FederationSender.
|
||||
raise NotImplementedError()
|
||||
|
||||
@staticmethod
|
||||
async def get_replication_rows(
|
||||
instance_name: str, from_token: int, to_token: int, target_row_count: int
|
||||
|
||||
@@ -484,7 +484,7 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
"device_list_key", position, users=[user_id], rooms=room_ids
|
||||
)
|
||||
|
||||
if hosts:
|
||||
if hosts and self.federation_sender:
|
||||
logger.info(
|
||||
"Sending device list update notif for %r to: %r", user_id, hosts
|
||||
)
|
||||
|
||||
@@ -51,9 +51,7 @@ class DeviceMessageHandler:
|
||||
# same instance. Other federation sender instances will get notified by
|
||||
# `synapse.app.generic_worker.FederationSenderHandler` when it sees it
|
||||
# in the to-device replication stream.
|
||||
self.federation_sender = None
|
||||
if hs.should_send_federation():
|
||||
self.federation_sender = hs.get_federation_sender()
|
||||
self.federation_sender = hs.get_federation_sender()
|
||||
|
||||
# If we can handle the to device EDUs we do so, otherwise we route them
|
||||
# to the appropriate worker.
|
||||
|
||||
@@ -24,6 +24,7 @@ The methods that define policy are:
|
||||
import abc
|
||||
import contextlib
|
||||
import logging
|
||||
from bisect import bisect
|
||||
from contextlib import contextmanager
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
@@ -53,7 +54,9 @@ from synapse.replication.http.presence import (
|
||||
ReplicationBumpPresenceActiveTime,
|
||||
ReplicationPresenceSetState,
|
||||
)
|
||||
from synapse.replication.http.streams import ReplicationGetStreamUpdates
|
||||
from synapse.replication.tcp.commands import ClearUserSyncsCommand
|
||||
from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
|
||||
from synapse.state import StateHandler
|
||||
from synapse.storage.databases.main import DataStore
|
||||
from synapse.types import Collection, JsonDict, UserID, get_domain_from_id
|
||||
@@ -124,6 +127,8 @@ class BasePresenceHandler(abc.ABC):
|
||||
self.clock = hs.get_clock()
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
self.federation_queue = PresenceFederationQueue(hs, self)
|
||||
|
||||
self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
|
||||
|
||||
active_presence = self.store.take_presence_startup_info()
|
||||
@@ -245,9 +250,17 @@ class BasePresenceHandler(abc.ABC):
|
||||
"""
|
||||
pass
|
||||
|
||||
async def process_replication_rows(self, token, rows):
|
||||
async def process_replication_rows(
|
||||
self, stream_name: str, instance_name: str, token: int, rows: list
|
||||
):
|
||||
"""Process presence stream rows received over replication."""
|
||||
pass
|
||||
await self.federation_queue.process_replication_rows(
|
||||
stream_name, instance_name, token, rows
|
||||
)
|
||||
|
||||
def get_federation_queue(self) -> "PresenceFederationQueue":
|
||||
"""Get the presence federation queue, if any."""
|
||||
return self.federation_queue
|
||||
|
||||
|
||||
class _NullContextManager(ContextManager[None]):
|
||||
@@ -265,6 +278,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||
|
||||
self.presence_router = hs.get_presence_router()
|
||||
self._presence_enabled = hs.config.use_presence
|
||||
self.state = hs.get_state_handler()
|
||||
|
||||
# The number of ongoing syncs on this process, by user id.
|
||||
# Empty if _presence_enabled is false.
|
||||
@@ -273,6 +287,8 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||
self.notifier = hs.get_notifier()
|
||||
self.instance_id = hs.get_instance_id()
|
||||
|
||||
self._federation = hs.get_federation_sender()
|
||||
|
||||
# user_id -> last_sync_ms. Lists the users that have stopped syncing
|
||||
# but we haven't notified the master of that yet
|
||||
self.users_going_offline = {}
|
||||
@@ -388,7 +404,14 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||
users=users_to_states.keys(),
|
||||
)
|
||||
|
||||
async def process_replication_rows(self, token, rows):
|
||||
async def process_replication_rows(
|
||||
self, stream_name: str, instance_name: str, token: int, rows: list
|
||||
):
|
||||
await super().process_replication_rows(stream_name, instance_name, token, rows)
|
||||
|
||||
if stream_name != PresenceStream.NAME:
|
||||
return
|
||||
|
||||
states = [
|
||||
UserPresenceState(
|
||||
row.user_id,
|
||||
@@ -408,6 +431,20 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||
stream_id = token
|
||||
await self.notify_from_replication(states, stream_id)
|
||||
|
||||
# Handle poking the local federation sender, if there is one.
|
||||
if not self._federation:
|
||||
return
|
||||
|
||||
hosts_and_states = await get_interested_remotes(
|
||||
self.store,
|
||||
self.presence_router,
|
||||
states,
|
||||
self.state,
|
||||
)
|
||||
|
||||
for destinations, states in hosts_and_states:
|
||||
self._federation.send_presence_to_destinations(states, destinations)
|
||||
|
||||
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
|
||||
return [
|
||||
user_id
|
||||
@@ -463,11 +500,12 @@ class PresenceHandler(BasePresenceHandler):
|
||||
self.server_name = hs.hostname
|
||||
self.wheel_timer = WheelTimer()
|
||||
self.notifier = hs.get_notifier()
|
||||
self.federation = hs.get_federation_sender()
|
||||
self.state = hs.get_state_handler()
|
||||
self.presence_router = hs.get_presence_router()
|
||||
self._presence_enabled = hs.config.use_presence
|
||||
|
||||
self.federation_sender = hs.get_federation_sender()
|
||||
|
||||
federation_registry = hs.get_federation_registry()
|
||||
|
||||
federation_registry.register_edu_handler("m.presence", self.incoming_presence)
|
||||
@@ -680,7 +718,17 @@ class PresenceHandler(BasePresenceHandler):
|
||||
if to_federation_ping:
|
||||
federation_presence_out_counter.inc(len(to_federation_ping))
|
||||
|
||||
self._push_to_remotes(to_federation_ping.values())
|
||||
hosts_and_states = await get_interested_remotes(
|
||||
self.store,
|
||||
self.presence_router,
|
||||
list(to_federation_ping.values()),
|
||||
self.state,
|
||||
)
|
||||
|
||||
for destinations, states in hosts_and_states:
|
||||
self.federation_queue.send_presence_to_destinations(
|
||||
states, destinations
|
||||
)
|
||||
|
||||
async def _handle_timeouts(self):
|
||||
"""Checks the presence of users that have timed out and updates as
|
||||
@@ -920,15 +968,21 @@ class PresenceHandler(BasePresenceHandler):
|
||||
users=[UserID.from_string(u) for u in users_to_states],
|
||||
)
|
||||
|
||||
self._push_to_remotes(states)
|
||||
# We only need to tell the local federation sender, if any, that new
|
||||
# presence has happened. Other federation senders will get notified via
|
||||
# the presence replication stream.
|
||||
if not self.federation_sender:
|
||||
return
|
||||
|
||||
def _push_to_remotes(self, states):
|
||||
"""Sends state updates to remote servers.
|
||||
hosts_and_states = await get_interested_remotes(
|
||||
self.store,
|
||||
self.presence_router,
|
||||
states,
|
||||
self.state,
|
||||
)
|
||||
|
||||
Args:
|
||||
states (list(UserPresenceState))
|
||||
"""
|
||||
self.federation.send_presence(states)
|
||||
for destinations, states in hosts_and_states:
|
||||
self.federation_sender.send_presence_to_destinations(states, destinations)
|
||||
|
||||
async def incoming_presence(self, origin, content):
|
||||
"""Called when we receive a `m.presence` EDU from a remote server."""
|
||||
@@ -1166,7 +1220,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
|
||||
# Send out user presence updates for each destination
|
||||
for destination, user_state_set in presence_destinations.items():
|
||||
self.federation.send_presence_to_destinations(
|
||||
self.federation_queue.send_presence_to_destinations(
|
||||
destinations=[destination], states=user_state_set
|
||||
)
|
||||
|
||||
@@ -1811,3 +1865,171 @@ async def get_interested_remotes(
|
||||
hosts_and_states.append(([host], states))
|
||||
|
||||
return hosts_and_states
|
||||
|
||||
|
||||
class PresenceFederationQueue:
|
||||
"""Handles sending ad hoc presence updates over federation, which are *not*
|
||||
due to state updates (that get handled via the presence stream), e.g.
|
||||
federation pings and sending existing present states to newly joined hosts.
|
||||
|
||||
Only the last N minutes will be queued, so if a federation sender instance
|
||||
is down for longer then some updates will be dropped. This is OK as presence
|
||||
is ephemeral, and so it will self correct eventually.
|
||||
"""
|
||||
|
||||
# How long to keep entries in the queue for. Workers that are down for
|
||||
# longer than this duration will miss out on older updates.
|
||||
_KEEP_ITEMS_IN_QUEUE_FOR_MS = 5 * 60 * 1000
|
||||
|
||||
# How often to check if we can expire entries from the queue.
|
||||
_CLEAR_ITEMS_EVERY_MS = 60 * 1000
|
||||
|
||||
def __init__(self, hs: "HomeServer", presence_handler: BasePresenceHandler):
|
||||
self._clock = hs.get_clock()
|
||||
self._notifier = hs.get_notifier()
|
||||
self._instance_name = hs.get_instance_name()
|
||||
self._presence_handler = presence_handler
|
||||
self._repl_client = ReplicationGetStreamUpdates.make_client(hs)
|
||||
|
||||
# Should we keep a queue of recent presence updates? We only bother if
|
||||
# another process may be handling federation sending.
|
||||
self._queue_presence_updates = True
|
||||
|
||||
# The federation sender if this instance is a federation sender.
|
||||
self._federation = hs.get_federation_sender()
|
||||
|
||||
if self._federation:
|
||||
# We don't bother queuing up presence states if only this instance
|
||||
# is sending federation.
|
||||
if hs.config.worker.federation_shard_config.instances == [
|
||||
self._instance_name
|
||||
]:
|
||||
self._queue_presence_updates = False
|
||||
|
||||
# The queue of recently queued updates as tuples of: `(timestamp,
|
||||
# stream_id, destinations, user_ids)`. We don't store the full states
|
||||
# for efficiency, and remote workers will already have the full states
|
||||
# cached.
|
||||
self._queue = [] # type: List[Tuple[int, int, Collection[str], Set[str]]]
|
||||
|
||||
self._next_id = 1
|
||||
|
||||
# Map from instance name to current token
|
||||
self._current_tokens = {} # type: Dict[str, int]
|
||||
|
||||
if self._queue_presence_updates:
|
||||
self._clock.looping_call(self._clear_queue, self._CLEAR_ITEMS_EVERY_MS)
|
||||
|
||||
def _clear_queue(self):
|
||||
"""Clear out older entries from the queue."""
|
||||
clear_before = self._clock.time_msec() - self._KEEP_ITEMS_IN_QUEUE_FOR_MS
|
||||
|
||||
# The queue is sorted by timestamp, so we can bisect to find the right
|
||||
# place to purge before. Note that we are searching using a 1-tuple with
|
||||
# the time, which does The Right Thing since the queue is a tuple where
|
||||
# the first item is a timestamp.
|
||||
index = bisect(self._queue, (clear_before,))
|
||||
self._queue = self._queue[index:]
|
||||
|
||||
def send_presence_to_destinations(
|
||||
self, states: Collection[UserPresenceState], destinations: Collection[str]
|
||||
) -> None:
|
||||
"""Send the presence states to the given destinations.
|
||||
|
||||
Will forward to the local federation sender (if there is one) and queue
|
||||
to send over replication (if there are other federation sender instances.).
|
||||
"""
|
||||
|
||||
if self._federation:
|
||||
self._federation.send_presence_to_destinations(states, destinations)
|
||||
|
||||
if not self._queue_presence_updates:
|
||||
return
|
||||
|
||||
now = self._clock.time_msec()
|
||||
|
||||
stream_id = self._next_id
|
||||
self._next_id += 1
|
||||
|
||||
self._queue.append((now, stream_id, destinations, {s.user_id for s in states}))
|
||||
|
||||
self._notifier.notify_replication()
|
||||
|
||||
def get_current_token(self, instance_name: str) -> int:
|
||||
if instance_name == self._instance_name:
|
||||
return self._next_id - 1
|
||||
else:
|
||||
return self._current_tokens.get(instance_name, 0)
|
||||
|
||||
async def get_replication_rows(
|
||||
self,
|
||||
instance_name: str,
|
||||
from_token: int,
|
||||
upto_token: int,
|
||||
target_row_count: int,
|
||||
) -> Tuple[List[Tuple[int, Tuple[str, str]]], int, bool]:
|
||||
"""Get all the updates between the two tokens.
|
||||
|
||||
We return rows in the form of `(destination, user_id)` to keep the size
|
||||
of each row bounded (rather than returning the sets in a row).
|
||||
"""
|
||||
if instance_name != self._instance_name:
|
||||
# If not local we query over replication.
|
||||
result = await self._repl_client(
|
||||
instance_name=instance_name,
|
||||
stream_name=PresenceFederationStream.NAME,
|
||||
from_token=from_token,
|
||||
upto_token=upto_token,
|
||||
)
|
||||
return result["updates"], result["upto_token"], result["limited"]
|
||||
|
||||
# We can find the correct position in the queue by noting that there is
|
||||
# exactly one entry per stream ID, and that the last entry has an ID of
|
||||
# `self._next_id - 1`, so we can count backwards from the end.
|
||||
#
|
||||
# Since the start of the queue is periodically truncated we need to
|
||||
# handle the case where `from_token` stream ID has already been dropped.
|
||||
start_idx = max(from_token - self._next_id, -len(self._queue))
|
||||
|
||||
to_send = [] # type: List[Tuple[int, Tuple[str, str]]]
|
||||
limited = False
|
||||
new_id = upto_token
|
||||
for _, stream_id, destinations, user_ids in self._queue[start_idx:]:
|
||||
if stream_id > upto_token:
|
||||
break
|
||||
|
||||
new_id = stream_id
|
||||
|
||||
to_send.extend(
|
||||
(stream_id, (destination, user_id))
|
||||
for destination in destinations
|
||||
for user_id in user_ids
|
||||
)
|
||||
|
||||
if len(to_send) > target_row_count:
|
||||
limited = True
|
||||
break
|
||||
|
||||
return to_send, new_id, limited
|
||||
|
||||
async def process_replication_rows(
|
||||
self, stream_name: str, instance_name: str, token: int, rows: list
|
||||
):
|
||||
if stream_name != PresenceFederationStream.NAME:
|
||||
return
|
||||
|
||||
# We keep track of the current tokens
|
||||
self._current_tokens[instance_name] = token
|
||||
|
||||
# If we're a federation sender we pull out the presence states to send
|
||||
# and forward them on.
|
||||
if not self._federation:
|
||||
return
|
||||
|
||||
hosts_to_users = {} # type: Dict[str, Set[str]]
|
||||
for row in rows:
|
||||
hosts_to_users.setdefault(row.destination, set()).add(row.user_id)
|
||||
|
||||
for host, user_ids in hosts_to_users.items():
|
||||
states = await self._presence_handler.current_state_for_users(user_ids)
|
||||
self._federation.send_presence_to_destinations(states.values(), [host])
|
||||
|
||||
@@ -36,9 +36,7 @@ class ReceiptsHandler(BaseHandler):
|
||||
# same instance. Other federation sender instances will get notified by
|
||||
# `synapse.app.generic_worker.FederationSenderHandler` when it sees it
|
||||
# in the receipts stream.
|
||||
self.federation_sender = None
|
||||
if hs.should_send_federation():
|
||||
self.federation_sender = hs.get_federation_sender()
|
||||
self.federation_sender = hs.get_federation_sender()
|
||||
|
||||
# If we can handle the receipt EDUs we do so, otherwise we route them
|
||||
# to the appropriate worker.
|
||||
|
||||
@@ -57,9 +57,7 @@ class FollowerTypingHandler:
|
||||
self.clock = hs.get_clock()
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
|
||||
self.federation = None
|
||||
if hs.should_send_federation():
|
||||
self.federation = hs.get_federation_sender()
|
||||
self.federation = hs.get_federation_sender()
|
||||
|
||||
if hs.config.worker.writers.typing != hs.get_instance_name():
|
||||
hs.get_federation_registry().register_instance_for_edu(
|
||||
|
||||
@@ -18,6 +18,7 @@ from typing import TYPE_CHECKING, Any, Generator, Iterable, List, Optional, Tupl
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.events import EventBase
|
||||
from synapse.handlers.presence import get_interested_remotes
|
||||
from synapse.http.client import SimpleHttpClient
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
@@ -50,6 +51,10 @@ class ModuleApi:
|
||||
self._auth_handler = auth_handler
|
||||
self._server_name = hs.hostname
|
||||
self._presence_stream = hs.get_event_sources().sources["presence"]
|
||||
self._state = hs.get_state_handler()
|
||||
self._presence_router = hs.get_presence_router()
|
||||
|
||||
self._federation = self._hs.get_federation_sender()
|
||||
|
||||
# We expose these as properties below in order to attach a helpful docstring.
|
||||
self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient
|
||||
@@ -423,6 +428,9 @@ class ModuleApi:
|
||||
# Force a presence initial_sync for this user next time
|
||||
self._send_full_presence_to_local_users.add(user)
|
||||
else:
|
||||
if not self._federation:
|
||||
continue
|
||||
|
||||
# Retrieve presence state for currently online users that this user
|
||||
# is considered interested in
|
||||
presence_events, _ = await self._presence_stream.get_new_events(
|
||||
@@ -430,12 +438,16 @@ class ModuleApi:
|
||||
)
|
||||
|
||||
# Send to remote destinations
|
||||
await make_deferred_yieldable(
|
||||
# We pull the federation sender here as we can only do so on workers
|
||||
# that support sending presence
|
||||
self._hs.get_federation_sender().send_presence(presence_events)
|
||||
hosts_and_states = await get_interested_remotes(
|
||||
self._store,
|
||||
self._presence_router,
|
||||
presence_events,
|
||||
self._state,
|
||||
)
|
||||
|
||||
for destinations, states in hosts_and_states:
|
||||
self._federation.send_presence_to_destinations(states, destinations)
|
||||
|
||||
|
||||
class PublicRoomListManager:
|
||||
"""Contains methods for adding to, removing from and querying whether a room
|
||||
|
||||
@@ -227,9 +227,7 @@ class Notifier:
|
||||
self.appservice_handler = hs.get_application_service_handler()
|
||||
self._pusher_pool = hs.get_pusherpool()
|
||||
|
||||
self.federation_sender = None
|
||||
if hs.should_send_federation():
|
||||
self.federation_sender = hs.get_federation_sender()
|
||||
self.federation_sender = hs.get_federation_sender()
|
||||
|
||||
self.state_handler = hs.get_state_handler()
|
||||
|
||||
|
||||
@@ -20,16 +20,13 @@ from twisted.internet.defer import Deferred
|
||||
from twisted.internet.protocol import ReconnectingClientFactory
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.federation import send_queue
|
||||
from synapse.federation.sender import FederationSender
|
||||
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
|
||||
from synapse.replication.tcp.streams import (
|
||||
AccountDataStream,
|
||||
DeviceListsStream,
|
||||
GroupServerStream,
|
||||
PresenceStream,
|
||||
PushersStream,
|
||||
PushRulesStream,
|
||||
ReceiptsStream,
|
||||
@@ -191,8 +188,6 @@ class ReplicationDataHandler:
|
||||
self.stop_pusher(row.user_id, row.app_id, row.pushkey)
|
||||
else:
|
||||
await self.start_pusher(row.user_id, row.app_id, row.pushkey)
|
||||
elif stream_name == PresenceStream.NAME:
|
||||
await self._presence_handler.process_replication_rows(token, rows)
|
||||
elif stream_name == EventsStream.NAME:
|
||||
# We shouldn't get multiple rows per token for events stream, so
|
||||
# we don't need to optimise this for multiple rows.
|
||||
@@ -221,6 +216,10 @@ class ReplicationDataHandler:
|
||||
membership=row.data.membership,
|
||||
)
|
||||
|
||||
await self._presence_handler.process_replication_rows(
|
||||
stream_name, instance_name, token, rows
|
||||
)
|
||||
|
||||
# Notify any waiting deferreds. The list is ordered by position so we
|
||||
# just iterate through the list until we reach a position that is
|
||||
# greater than the received row position.
|
||||
@@ -338,6 +337,7 @@ class FederationSenderHandler:
|
||||
self.store = hs.get_datastore()
|
||||
self._is_mine_id = hs.is_mine_id
|
||||
self._hs = hs
|
||||
self._presence_handler = hs.get_presence_handler()
|
||||
|
||||
# We need to make a temporary value to ensure that mypy picks up the
|
||||
# right type. We know we should have a federation sender instance since
|
||||
@@ -356,14 +356,8 @@ class FederationSenderHandler:
|
||||
self.federation_sender.wake_destination(server)
|
||||
|
||||
async def process_replication_rows(self, stream_name, token, rows):
|
||||
# The federation stream contains things that we want to send out, e.g.
|
||||
# presence, typing, etc.
|
||||
if stream_name == "federation":
|
||||
send_queue.process_rows_for_federation(self.federation_sender, rows)
|
||||
await self.update_token(token)
|
||||
|
||||
# ... and when new receipts happen
|
||||
elif stream_name == ReceiptsStream.NAME:
|
||||
if stream_name == ReceiptsStream.NAME:
|
||||
await self._on_new_receipts(rows)
|
||||
|
||||
# ... as well as device updates and messages
|
||||
@@ -401,54 +395,3 @@ class FederationSenderHandler:
|
||||
receipt.data,
|
||||
)
|
||||
await self.federation_sender.send_read_receipt(receipt_info)
|
||||
|
||||
async def update_token(self, token):
|
||||
"""Update the record of where we have processed to in the federation stream.
|
||||
|
||||
Called after we have processed a an update received over replication. Sends
|
||||
a FEDERATION_ACK back to the master, and stores the token that we have processed
|
||||
in `federation_stream_position` so that we can restart where we left off.
|
||||
"""
|
||||
self.federation_position = token
|
||||
|
||||
# We save and send the ACK to master asynchronously, so we don't block
|
||||
# processing on persistence. We don't need to do this operation for
|
||||
# every single RDATA we receive, we just need to do it periodically.
|
||||
|
||||
if self._fed_position_linearizer.is_queued(None):
|
||||
# There is already a task queued up to save and send the token, so
|
||||
# no need to queue up another task.
|
||||
return
|
||||
|
||||
run_as_background_process("_save_and_send_ack", self._save_and_send_ack)
|
||||
|
||||
async def _save_and_send_ack(self):
|
||||
"""Save the current federation position in the database and send an ACK
|
||||
to master with where we're up to.
|
||||
"""
|
||||
# We should only be calling this once we've got a token.
|
||||
assert self.federation_position is not None
|
||||
|
||||
try:
|
||||
# We linearize here to ensure we don't have races updating the token
|
||||
#
|
||||
# XXX this appears to be redundant, since the ReplicationCommandHandler
|
||||
# has a linearizer which ensures that we only process one line of
|
||||
# replication data at a time. Should we remove it, or is it doing useful
|
||||
# service for robustness? Or could we replace it with an assertion that
|
||||
# we're not being re-entered?
|
||||
|
||||
with (await self._fed_position_linearizer.queue(None)):
|
||||
# We persist and ack the same position, so we take a copy of it
|
||||
# here as otherwise it can get modified from underneath us.
|
||||
current_position = self.federation_position
|
||||
|
||||
await self.store.update_federation_out_pos(
|
||||
"federation", current_position
|
||||
)
|
||||
|
||||
# We ACK this token over replication so that the master can drop
|
||||
# its in memory queues
|
||||
self._hs.get_tcp_replication().send_federation_ack(current_position)
|
||||
except Exception:
|
||||
logger.exception("Error updating federation stream position")
|
||||
|
||||
@@ -297,33 +297,6 @@ class ClearUserSyncsCommand(Command):
|
||||
return self.instance_id
|
||||
|
||||
|
||||
class FederationAckCommand(Command):
|
||||
"""Sent by the client when it has processed up to a given point in the
|
||||
federation stream. This allows the master to drop in-memory caches of the
|
||||
federation stream.
|
||||
|
||||
This must only be sent from one worker (i.e. the one sending federation)
|
||||
|
||||
Format::
|
||||
|
||||
FEDERATION_ACK <instance_name> <token>
|
||||
"""
|
||||
|
||||
NAME = "FEDERATION_ACK"
|
||||
|
||||
def __init__(self, instance_name: str, token: int):
|
||||
self.instance_name = instance_name
|
||||
self.token = token
|
||||
|
||||
@classmethod
|
||||
def from_line(cls, line: str) -> "FederationAckCommand":
|
||||
instance_name, token = line.split(" ")
|
||||
return cls(instance_name, int(token))
|
||||
|
||||
def to_line(self) -> str:
|
||||
return "%s %s" % (self.instance_name, self.token)
|
||||
|
||||
|
||||
class UserIpCommand(Command):
|
||||
"""Sent periodically when a worker sees activity from a client.
|
||||
|
||||
@@ -389,7 +362,6 @@ _COMMANDS = (
|
||||
NameCommand,
|
||||
ReplicateCommand,
|
||||
UserSyncCommand,
|
||||
FederationAckCommand,
|
||||
UserIpCommand,
|
||||
RemoteServerUpCommand,
|
||||
ClearUserSyncsCommand,
|
||||
@@ -415,7 +387,6 @@ VALID_CLIENT_COMMANDS = (
|
||||
PingCommand.NAME,
|
||||
UserSyncCommand.NAME,
|
||||
ClearUserSyncsCommand.NAME,
|
||||
FederationAckCommand.NAME,
|
||||
UserIpCommand.NAME,
|
||||
ErrorCommand.NAME,
|
||||
RemoteServerUpCommand.NAME,
|
||||
|
||||
@@ -39,7 +39,6 @@ from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
|
||||
from synapse.replication.tcp.commands import (
|
||||
ClearUserSyncsCommand,
|
||||
Command,
|
||||
FederationAckCommand,
|
||||
PositionCommand,
|
||||
RdataCommand,
|
||||
RemoteServerUpCommand,
|
||||
@@ -54,7 +53,6 @@ from synapse.replication.tcp.streams import (
|
||||
BackfillStream,
|
||||
CachesStream,
|
||||
EventsStream,
|
||||
FederationStream,
|
||||
ReceiptsStream,
|
||||
Stream,
|
||||
TagAccountDataStream,
|
||||
@@ -73,7 +71,6 @@ inbound_rdata_count = Counter(
|
||||
"synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"]
|
||||
)
|
||||
user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
|
||||
federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "")
|
||||
remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
|
||||
|
||||
user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")
|
||||
@@ -157,11 +154,6 @@ class ReplicationCommandHandler:
|
||||
if hs.config.worker_app is not None:
|
||||
continue
|
||||
|
||||
if stream.NAME == FederationStream.NAME and hs.config.send_federation:
|
||||
# We only support federation stream if federation sending
|
||||
# has been disabled on the master.
|
||||
continue
|
||||
|
||||
self._streams_to_replicate.append(stream)
|
||||
|
||||
# Map of stream name to batched updates. See RdataCommand for info on
|
||||
@@ -365,14 +357,6 @@ class ReplicationCommandHandler:
|
||||
else:
|
||||
return None
|
||||
|
||||
def on_FEDERATION_ACK(
|
||||
self, conn: IReplicationConnection, cmd: FederationAckCommand
|
||||
):
|
||||
federation_ack_counter.inc()
|
||||
|
||||
if self._federation_sender:
|
||||
self._federation_sender.federation_ack(cmd.instance_name, cmd.token)
|
||||
|
||||
def on_USER_IP(
|
||||
self, conn: IReplicationConnection, cmd: UserIpCommand
|
||||
) -> Optional[Awaitable[None]]:
|
||||
@@ -655,12 +639,6 @@ class ReplicationCommandHandler:
|
||||
else:
|
||||
logger.warning("Dropping command as not connected: %r", cmd.NAME)
|
||||
|
||||
def send_federation_ack(self, token: int):
|
||||
"""Ack data for the federation stream. This allows the master to drop
|
||||
data stored purely in memory.
|
||||
"""
|
||||
self.send_command(FederationAckCommand(self._instance_name, token))
|
||||
|
||||
def send_user_sync(
|
||||
self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int
|
||||
):
|
||||
|
||||
@@ -30,6 +30,7 @@ from synapse.replication.tcp.streams._base import (
|
||||
CachesStream,
|
||||
DeviceListsStream,
|
||||
GroupServerStream,
|
||||
PresenceFederationStream,
|
||||
PresenceStream,
|
||||
PublicRoomsStream,
|
||||
PushersStream,
|
||||
@@ -42,7 +43,6 @@ from synapse.replication.tcp.streams._base import (
|
||||
UserSignatureStream,
|
||||
)
|
||||
from synapse.replication.tcp.streams.events import EventsStream
|
||||
from synapse.replication.tcp.streams.federation import FederationStream
|
||||
|
||||
STREAMS_MAP = {
|
||||
stream.NAME: stream
|
||||
@@ -50,6 +50,7 @@ STREAMS_MAP = {
|
||||
EventsStream,
|
||||
BackfillStream,
|
||||
PresenceStream,
|
||||
PresenceFederationStream,
|
||||
TypingStream,
|
||||
ReceiptsStream,
|
||||
PushRulesStream,
|
||||
@@ -58,7 +59,6 @@ STREAMS_MAP = {
|
||||
PublicRoomsStream,
|
||||
DeviceListsStream,
|
||||
ToDeviceStream,
|
||||
FederationStream,
|
||||
TagAccountDataStream,
|
||||
AccountDataStream,
|
||||
GroupServerStream,
|
||||
@@ -71,6 +71,7 @@ __all__ = [
|
||||
"Stream",
|
||||
"BackfillStream",
|
||||
"PresenceStream",
|
||||
"PresenceFederationStream",
|
||||
"TypingStream",
|
||||
"ReceiptsStream",
|
||||
"PushRulesStream",
|
||||
|
||||
@@ -290,6 +290,30 @@ class PresenceStream(Stream):
|
||||
)
|
||||
|
||||
|
||||
class PresenceFederationStream(Stream):
|
||||
"""A stream used to send ad hoc presence updates over federation.
|
||||
|
||||
Streams the remote destination and the user ID of the presence state to
|
||||
send.
|
||||
"""
|
||||
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class PresenceFederationStreamRow:
|
||||
destination: str
|
||||
user_id: str
|
||||
|
||||
NAME = "presence_federation"
|
||||
ROW_TYPE = PresenceFederationStreamRow
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
federation_queue = hs.get_presence_handler().get_federation_queue()
|
||||
super().__init__(
|
||||
hs.get_instance_name(),
|
||||
federation_queue.get_current_token,
|
||||
federation_queue.get_replication_rows,
|
||||
)
|
||||
|
||||
|
||||
class TypingStream(Stream):
|
||||
TypingStreamRow = namedtuple(
|
||||
"TypingStreamRow", ("room_id", "user_ids") # str # list(str)
|
||||
|
||||
@@ -1,80 +0,0 @@
|
||||
# Copyright 2017 Vector Creations Ltd
|
||||
# Copyright 2019 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from collections import namedtuple
|
||||
from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Tuple
|
||||
|
||||
from synapse.replication.tcp.streams._base import (
|
||||
Stream,
|
||||
current_token_without_instance,
|
||||
make_http_update_function,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
|
||||
class FederationStream(Stream):
|
||||
"""Data to be sent over federation. Only available when master has federation
|
||||
sending disabled.
|
||||
"""
|
||||
|
||||
FederationStreamRow = namedtuple(
|
||||
"FederationStreamRow",
|
||||
(
|
||||
"type", # str, the type of data as defined in the BaseFederationRows
|
||||
"data", # dict, serialization of a federation.send_queue.BaseFederationRow
|
||||
),
|
||||
)
|
||||
|
||||
NAME = "federation"
|
||||
ROW_TYPE = FederationStreamRow
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
if hs.config.worker_app is None:
|
||||
# master process: get updates from the FederationRemoteSendQueue.
|
||||
# (if the master is configured to send federation itself, federation_sender
|
||||
# will be a real FederationSender, which has stubs for current_token and
|
||||
# get_replication_rows.)
|
||||
federation_sender = hs.get_federation_sender()
|
||||
current_token = current_token_without_instance(
|
||||
federation_sender.get_current_token
|
||||
)
|
||||
update_function = (
|
||||
federation_sender.get_replication_rows
|
||||
) # type: Callable[[str, int, int, int], Awaitable[Tuple[List[Tuple[int, Any]], int, bool]]]
|
||||
|
||||
elif hs.should_send_federation():
|
||||
# federation sender: Query master process
|
||||
update_function = make_http_update_function(hs, self.NAME)
|
||||
current_token = self._stub_current_token
|
||||
|
||||
else:
|
||||
# other worker: stub out the update function (we're not interested in
|
||||
# any updates so when we get a POSITION we do nothing)
|
||||
update_function = self._stub_update_function
|
||||
current_token = self._stub_current_token
|
||||
|
||||
super().__init__(hs.get_instance_name(), current_token, update_function)
|
||||
|
||||
@staticmethod
|
||||
def _stub_current_token(instance_name: str) -> int:
|
||||
# dummy current-token method for use on workers
|
||||
return 0
|
||||
|
||||
@staticmethod
|
||||
async def _stub_update_function(
|
||||
instance_name: str, from_token: int, upto_token: int, limit: int
|
||||
) -> Tuple[list, int, bool]:
|
||||
return [], upto_token, False
|
||||
@@ -59,7 +59,6 @@ from synapse.federation.federation_server import (
|
||||
FederationHandlerRegistry,
|
||||
FederationServer,
|
||||
)
|
||||
from synapse.federation.send_queue import FederationRemoteSendQueue
|
||||
from synapse.federation.sender import AbstractFederationSender, FederationSender
|
||||
from synapse.federation.transport.client import TransportLayerClient
|
||||
from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer
|
||||
@@ -580,13 +579,11 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
return TransportLayerClient(self)
|
||||
|
||||
@cache_in_self
|
||||
def get_federation_sender(self) -> AbstractFederationSender:
|
||||
def get_federation_sender(self) -> Optional[AbstractFederationSender]:
|
||||
if self.should_send_federation():
|
||||
return FederationSender(self)
|
||||
elif not self.config.worker_app:
|
||||
return FederationRemoteSendQueue(self)
|
||||
else:
|
||||
raise Exception("Workers cannot send federation traffic")
|
||||
return None
|
||||
|
||||
@cache_in_self
|
||||
def get_receipts_handler(self) -> ReceiptsHandler:
|
||||
|
||||
@@ -471,6 +471,168 @@ class PresenceHandlerTestCase(unittest.HomeserverTestCase):
|
||||
self.assertEqual(state.state, PresenceState.OFFLINE)
|
||||
|
||||
|
||||
class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
|
||||
def prepare(self, reactor, clock, hs):
|
||||
self.presence_handler = hs.get_presence_handler()
|
||||
self.clock = hs.get_clock()
|
||||
self.instance_name = hs.get_instance_name()
|
||||
|
||||
self.queue = self.presence_handler.get_federation_queue()
|
||||
|
||||
def test_send_and_get(self):
|
||||
state1 = UserPresenceState.default("@user1:test")
|
||||
state2 = UserPresenceState.default("@user2:test")
|
||||
state3 = UserPresenceState.default("@user3:test")
|
||||
|
||||
prev_token = self.queue.get_current_token(self.instance_name)
|
||||
|
||||
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
|
||||
self.queue.send_presence_to_destinations((state3,), ("dest3",))
|
||||
|
||||
now_token = self.queue.get_current_token(self.instance_name)
|
||||
|
||||
rows, upto_token, limited = self.get_success(
|
||||
self.queue.get_replication_rows("master", prev_token, now_token, 10)
|
||||
)
|
||||
|
||||
self.assertEqual(upto_token, now_token)
|
||||
self.assertFalse(limited)
|
||||
|
||||
expected_rows = [
|
||||
(1, ("dest1", "@user1:test")),
|
||||
(1, ("dest2", "@user1:test")),
|
||||
(1, ("dest1", "@user2:test")),
|
||||
(1, ("dest2", "@user2:test")),
|
||||
(2, ("dest3", "@user3:test")),
|
||||
]
|
||||
|
||||
self.assertCountEqual(rows, expected_rows)
|
||||
|
||||
def test_send_and_get_split(self):
|
||||
state1 = UserPresenceState.default("@user1:test")
|
||||
state2 = UserPresenceState.default("@user2:test")
|
||||
state3 = UserPresenceState.default("@user3:test")
|
||||
|
||||
prev_token = self.queue.get_current_token(self.instance_name)
|
||||
|
||||
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
|
||||
|
||||
now_token = self.queue.get_current_token(self.instance_name)
|
||||
|
||||
self.queue.send_presence_to_destinations((state3,), ("dest3",))
|
||||
|
||||
rows, upto_token, limited = self.get_success(
|
||||
self.queue.get_replication_rows("master", prev_token, now_token, 10)
|
||||
)
|
||||
|
||||
self.assertEqual(upto_token, now_token)
|
||||
self.assertFalse(limited)
|
||||
|
||||
expected_rows = [
|
||||
(1, ("dest1", "@user1:test")),
|
||||
(1, ("dest2", "@user1:test")),
|
||||
(1, ("dest1", "@user2:test")),
|
||||
(1, ("dest2", "@user2:test")),
|
||||
]
|
||||
|
||||
self.assertCountEqual(rows, expected_rows)
|
||||
|
||||
def test_clear_queue_all(self):
|
||||
state1 = UserPresenceState.default("@user1:test")
|
||||
state2 = UserPresenceState.default("@user2:test")
|
||||
state3 = UserPresenceState.default("@user3:test")
|
||||
|
||||
prev_token = self.queue.get_current_token(self.instance_name)
|
||||
|
||||
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
|
||||
self.queue.send_presence_to_destinations((state3,), ("dest3",))
|
||||
|
||||
self.reactor.advance(10 * 60 * 1000)
|
||||
|
||||
now_token = self.queue.get_current_token(self.instance_name)
|
||||
|
||||
rows, upto_token, limited = self.get_success(
|
||||
self.queue.get_replication_rows("master", prev_token, now_token, 10)
|
||||
)
|
||||
self.assertEqual(upto_token, now_token)
|
||||
self.assertFalse(limited)
|
||||
self.assertCountEqual(rows, [])
|
||||
|
||||
prev_token = self.queue.get_current_token(self.instance_name)
|
||||
|
||||
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
|
||||
self.queue.send_presence_to_destinations((state3,), ("dest3",))
|
||||
|
||||
now_token = self.queue.get_current_token(self.instance_name)
|
||||
|
||||
rows, upto_token, limited = self.get_success(
|
||||
self.queue.get_replication_rows("master", prev_token, now_token, 10)
|
||||
)
|
||||
self.assertEqual(upto_token, now_token)
|
||||
self.assertFalse(limited)
|
||||
|
||||
expected_rows = [
|
||||
(3, ("dest1", "@user1:test")),
|
||||
(3, ("dest2", "@user1:test")),
|
||||
(3, ("dest1", "@user2:test")),
|
||||
(3, ("dest2", "@user2:test")),
|
||||
(4, ("dest3", "@user3:test")),
|
||||
]
|
||||
|
||||
self.assertCountEqual(rows, expected_rows)
|
||||
|
||||
def test_partially_clear_queue(self):
|
||||
state1 = UserPresenceState.default("@user1:test")
|
||||
state2 = UserPresenceState.default("@user2:test")
|
||||
state3 = UserPresenceState.default("@user3:test")
|
||||
|
||||
prev_token = self.queue.get_current_token(self.instance_name)
|
||||
|
||||
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
|
||||
|
||||
self.reactor.advance(2 * 60 * 1000)
|
||||
|
||||
self.queue.send_presence_to_destinations((state3,), ("dest3",))
|
||||
|
||||
self.reactor.advance(4 * 60 * 1000)
|
||||
|
||||
now_token = self.queue.get_current_token(self.instance_name)
|
||||
|
||||
rows, upto_token, limited = self.get_success(
|
||||
self.queue.get_replication_rows("master", prev_token, now_token, 10)
|
||||
)
|
||||
self.assertEqual(upto_token, now_token)
|
||||
self.assertFalse(limited)
|
||||
|
||||
expected_rows = [
|
||||
(2, ("dest3", "@user3:test")),
|
||||
]
|
||||
self.assertCountEqual(rows, [])
|
||||
|
||||
prev_token = self.queue.get_current_token(self.instance_name)
|
||||
|
||||
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
|
||||
self.queue.send_presence_to_destinations((state3,), ("dest3",))
|
||||
|
||||
now_token = self.queue.get_current_token(self.instance_name)
|
||||
|
||||
rows, upto_token, limited = self.get_success(
|
||||
self.queue.get_replication_rows("master", prev_token, now_token, 10)
|
||||
)
|
||||
self.assertEqual(upto_token, now_token)
|
||||
self.assertFalse(limited)
|
||||
|
||||
expected_rows = [
|
||||
(3, ("dest1", "@user1:test")),
|
||||
(3, ("dest2", "@user1:test")),
|
||||
(3, ("dest1", "@user2:test")),
|
||||
(3, ("dest2", "@user2:test")),
|
||||
(4, ("dest3", "@user3:test")),
|
||||
]
|
||||
|
||||
self.assertCountEqual(rows, expected_rows)
|
||||
|
||||
|
||||
class PresenceJoinTestCase(unittest.HomeserverTestCase):
|
||||
"""Tests remote servers get told about presence of users in the room when
|
||||
they join and when new local users join.
|
||||
|
||||
@@ -1,80 +0,0 @@
|
||||
# Copyright 2019 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.federation.send_queue import EduRow
|
||||
from synapse.replication.tcp.streams.federation import FederationStream
|
||||
|
||||
from tests.replication._base import BaseStreamTestCase
|
||||
|
||||
|
||||
class FederationStreamTestCase(BaseStreamTestCase):
|
||||
def _get_worker_hs_config(self) -> dict:
|
||||
# enable federation sending on the worker
|
||||
config = super()._get_worker_hs_config()
|
||||
# TODO: make it so we don't need both of these
|
||||
config["send_federation"] = False
|
||||
config["worker_app"] = "synapse.app.federation_sender"
|
||||
return config
|
||||
|
||||
def test_catchup(self):
|
||||
"""Basic test of catchup on reconnect
|
||||
|
||||
Makes sure that updates sent while we are offline are received later.
|
||||
"""
|
||||
fed_sender = self.hs.get_federation_sender()
|
||||
received_rows = self.test_handler.received_rdata_rows
|
||||
|
||||
fed_sender.build_and_send_edu("testdest", "m.test_edu", {"a": "b"})
|
||||
|
||||
self.reconnect()
|
||||
self.reactor.advance(0)
|
||||
|
||||
# check we're testing what we think we are: no rows should yet have been
|
||||
# received
|
||||
self.assertEqual(received_rows, [])
|
||||
|
||||
# We should now see an attempt to connect to the master
|
||||
request = self.handle_http_replication_attempt()
|
||||
self.assert_request_is_get_repl_stream_updates(request, "federation")
|
||||
|
||||
# we should have received an update row
|
||||
stream_name, token, row = received_rows.pop()
|
||||
self.assertEqual(stream_name, "federation")
|
||||
self.assertIsInstance(row, FederationStream.FederationStreamRow)
|
||||
self.assertEqual(row.type, EduRow.TypeId)
|
||||
edurow = EduRow.from_data(row.data)
|
||||
self.assertEqual(edurow.edu.edu_type, "m.test_edu")
|
||||
self.assertEqual(edurow.edu.origin, self.hs.hostname)
|
||||
self.assertEqual(edurow.edu.destination, "testdest")
|
||||
self.assertEqual(edurow.edu.content, {"a": "b"})
|
||||
|
||||
self.assertEqual(received_rows, [])
|
||||
|
||||
# additional updates should be transferred without an HTTP hit
|
||||
fed_sender.build_and_send_edu("testdest", "m.test1", {"c": "d"})
|
||||
self.reactor.advance(0)
|
||||
# there should be no http hit
|
||||
self.assertEqual(len(self.reactor.tcpClients), 0)
|
||||
# ... but we should have a row
|
||||
self.assertEqual(len(received_rows), 1)
|
||||
|
||||
stream_name, token, row = received_rows.pop()
|
||||
self.assertEqual(stream_name, "federation")
|
||||
self.assertIsInstance(row, FederationStream.FederationStreamRow)
|
||||
self.assertEqual(row.type, EduRow.TypeId)
|
||||
edurow = EduRow.from_data(row.data)
|
||||
self.assertEqual(edurow.edu.edu_type, "m.test1")
|
||||
self.assertEqual(edurow.edu.origin, self.hs.hostname)
|
||||
self.assertEqual(edurow.edu.destination, "testdest")
|
||||
self.assertEqual(edurow.edu.content, {"c": "d"})
|
||||
@@ -1,73 +0,0 @@
|
||||
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from unittest import mock
|
||||
|
||||
from synapse.app.generic_worker import GenericWorkerServer
|
||||
from synapse.replication.tcp.commands import FederationAckCommand
|
||||
from synapse.replication.tcp.protocol import IReplicationConnection
|
||||
from synapse.replication.tcp.streams.federation import FederationStream
|
||||
|
||||
from tests.unittest import HomeserverTestCase
|
||||
|
||||
|
||||
class FederationAckTestCase(HomeserverTestCase):
|
||||
def default_config(self) -> dict:
|
||||
config = super().default_config()
|
||||
config["worker_app"] = "synapse.app.federation_sender"
|
||||
config["send_federation"] = False
|
||||
return config
|
||||
|
||||
def make_homeserver(self, reactor, clock):
|
||||
hs = self.setup_test_homeserver(homeserver_to_use=GenericWorkerServer)
|
||||
|
||||
return hs
|
||||
|
||||
def test_federation_ack_sent(self):
|
||||
"""A FEDERATION_ACK should be sent back after each RDATA federation
|
||||
|
||||
This test checks that the federation sender is correctly sending back
|
||||
FEDERATION_ACK messages. The test works by spinning up a federation_sender
|
||||
worker server, and then fishing out its ReplicationCommandHandler. We wire
|
||||
the RCH up to a mock connection (so that we can observe the command being sent)
|
||||
and then poke in an RDATA row.
|
||||
|
||||
XXX: it might be nice to do this by pretending to be a synapse master worker
|
||||
(or a redis server), and having the worker connect to us via a mocked-up TCP
|
||||
transport, rather than assuming that the implementation has a
|
||||
ReplicationCommandHandler.
|
||||
"""
|
||||
rch = self.hs.get_tcp_replication()
|
||||
|
||||
# wire up the ReplicationCommandHandler to a mock connection, which needs
|
||||
# to implement IReplicationConnection. (Note that Mock doesn't understand
|
||||
# interfaces, but casing an interface to a list gives the attributes.)
|
||||
mock_connection = mock.Mock(spec=list(IReplicationConnection))
|
||||
rch.new_connection(mock_connection)
|
||||
|
||||
# tell it it received an RDATA row
|
||||
self.get_success(
|
||||
rch.on_rdata(
|
||||
"federation",
|
||||
"master",
|
||||
token=10,
|
||||
rows=[FederationStream.FederationStreamRow(type="x", data=[1, 2, 3])],
|
||||
)
|
||||
)
|
||||
|
||||
# now check that the FEDERATION_ACK was sent
|
||||
mock_connection.send_command.assert_called_once()
|
||||
cmd = mock_connection.send_command.call_args[0][0]
|
||||
assert isinstance(cmd, FederationAckCommand)
|
||||
self.assertEqual(cmd.token, 10)
|
||||
Reference in New Issue
Block a user