mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-11 01:40:27 +00:00
Compare commits
6 Commits
madlittlem
...
erikj/remo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
68b9eb694f | ||
|
|
9f1a20f0c2 | ||
|
|
7575a686fd | ||
|
|
40bc8774c8 | ||
|
|
5c63b653c8 | ||
|
|
8f566077fb |
1
changelog.d/9819.feature
Normal file
1
changelog.d/9819.feature
Normal file
@@ -0,0 +1 @@
|
|||||||
|
Add experimental support for handling presence on a worker.
|
||||||
@@ -216,10 +216,6 @@ Asks the server for the current position of all streams.
|
|||||||
|
|
||||||
This is used when a worker is shutting down.
|
This is used when a worker is shutting down.
|
||||||
|
|
||||||
#### FEDERATION_ACK (C)
|
|
||||||
|
|
||||||
Acknowledge receipt of some federation data
|
|
||||||
|
|
||||||
### REMOTE_SERVER_UP (S, C)
|
### REMOTE_SERVER_UP (S, C)
|
||||||
|
|
||||||
Inform other processes that a remote server may have come back online.
|
Inform other processes that a remote server may have come back online.
|
||||||
|
|||||||
@@ -1,576 +0,0 @@
|
|||||||
# Copyright 2014-2016 OpenMarket Ltd
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
"""A federation sender that forwards things to be sent across replication to
|
|
||||||
a worker process.
|
|
||||||
|
|
||||||
It assumes there is a single worker process feeding off of it.
|
|
||||||
|
|
||||||
Each row in the replication stream consists of a type and some json, where the
|
|
||||||
types indicate whether they are presence, or edus, etc.
|
|
||||||
|
|
||||||
Ephemeral or non-event data are queued up in-memory. When the worker requests
|
|
||||||
updates since a particular point, all in-memory data since before that point is
|
|
||||||
dropped. We also expire things in the queue after 5 minutes, to ensure that a
|
|
||||||
dead worker doesn't cause the queues to grow limitlessly.
|
|
||||||
|
|
||||||
Events are replicated via a separate events stream.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import logging
|
|
||||||
from collections import namedtuple
|
|
||||||
from typing import (
|
|
||||||
TYPE_CHECKING,
|
|
||||||
Dict,
|
|
||||||
Hashable,
|
|
||||||
Iterable,
|
|
||||||
List,
|
|
||||||
Optional,
|
|
||||||
Sized,
|
|
||||||
Tuple,
|
|
||||||
Type,
|
|
||||||
)
|
|
||||||
|
|
||||||
from sortedcontainers import SortedDict
|
|
||||||
|
|
||||||
from synapse.api.presence import UserPresenceState
|
|
||||||
from synapse.federation.sender import AbstractFederationSender, FederationSender
|
|
||||||
from synapse.metrics import LaterGauge
|
|
||||||
from synapse.replication.tcp.streams.federation import FederationStream
|
|
||||||
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
|
|
||||||
from synapse.util.metrics import Measure
|
|
||||||
|
|
||||||
from .units import Edu
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from synapse.server import HomeServer
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class FederationRemoteSendQueue(AbstractFederationSender):
|
|
||||||
"""A drop in replacement for FederationSender"""
|
|
||||||
|
|
||||||
def __init__(self, hs: "HomeServer"):
|
|
||||||
self.server_name = hs.hostname
|
|
||||||
self.clock = hs.get_clock()
|
|
||||||
self.notifier = hs.get_notifier()
|
|
||||||
self.is_mine_id = hs.is_mine_id
|
|
||||||
|
|
||||||
# We may have multiple federation sender instances, so we need to track
|
|
||||||
# their positions separately.
|
|
||||||
self._sender_instances = hs.config.worker.federation_shard_config.instances
|
|
||||||
self._sender_positions = {} # type: Dict[str, int]
|
|
||||||
|
|
||||||
# Pending presence map user_id -> UserPresenceState
|
|
||||||
self.presence_map = {} # type: Dict[str, UserPresenceState]
|
|
||||||
|
|
||||||
# Stream position -> list[user_id]
|
|
||||||
self.presence_changed = SortedDict() # type: SortedDict[int, List[str]]
|
|
||||||
|
|
||||||
# Stores the destinations we need to explicitly send presence to about a
|
|
||||||
# given user.
|
|
||||||
# Stream position -> (user_id, destinations)
|
|
||||||
self.presence_destinations = (
|
|
||||||
SortedDict()
|
|
||||||
) # type: SortedDict[int, Tuple[str, Iterable[str]]]
|
|
||||||
|
|
||||||
# (destination, key) -> EDU
|
|
||||||
self.keyed_edu = {} # type: Dict[Tuple[str, tuple], Edu]
|
|
||||||
|
|
||||||
# stream position -> (destination, key)
|
|
||||||
self.keyed_edu_changed = (
|
|
||||||
SortedDict()
|
|
||||||
) # type: SortedDict[int, Tuple[str, tuple]]
|
|
||||||
|
|
||||||
self.edus = SortedDict() # type: SortedDict[int, Edu]
|
|
||||||
|
|
||||||
# stream ID for the next entry into presence_changed/keyed_edu_changed/edus.
|
|
||||||
self.pos = 1
|
|
||||||
|
|
||||||
# map from stream ID to the time that stream entry was generated, so that we
|
|
||||||
# can clear out entries after a while
|
|
||||||
self.pos_time = SortedDict() # type: SortedDict[int, int]
|
|
||||||
|
|
||||||
# EVERYTHING IS SAD. In particular, python only makes new scopes when
|
|
||||||
# we make a new function, so we need to make a new function so the inner
|
|
||||||
# lambda binds to the queue rather than to the name of the queue which
|
|
||||||
# changes. ARGH.
|
|
||||||
def register(name: str, queue: Sized) -> None:
|
|
||||||
LaterGauge(
|
|
||||||
"synapse_federation_send_queue_%s_size" % (queue_name,),
|
|
||||||
"",
|
|
||||||
[],
|
|
||||||
lambda: len(queue),
|
|
||||||
)
|
|
||||||
|
|
||||||
for queue_name in [
|
|
||||||
"presence_map",
|
|
||||||
"presence_changed",
|
|
||||||
"keyed_edu",
|
|
||||||
"keyed_edu_changed",
|
|
||||||
"edus",
|
|
||||||
"pos_time",
|
|
||||||
"presence_destinations",
|
|
||||||
]:
|
|
||||||
register(queue_name, getattr(self, queue_name))
|
|
||||||
|
|
||||||
self.clock.looping_call(self._clear_queue, 30 * 1000)
|
|
||||||
|
|
||||||
def _next_pos(self) -> int:
|
|
||||||
pos = self.pos
|
|
||||||
self.pos += 1
|
|
||||||
self.pos_time[self.clock.time_msec()] = pos
|
|
||||||
return pos
|
|
||||||
|
|
||||||
def _clear_queue(self) -> None:
|
|
||||||
"""Clear the queues for anything older than N minutes"""
|
|
||||||
|
|
||||||
FIVE_MINUTES_AGO = 5 * 60 * 1000
|
|
||||||
now = self.clock.time_msec()
|
|
||||||
|
|
||||||
keys = self.pos_time.keys()
|
|
||||||
time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO)
|
|
||||||
if not keys[:time]:
|
|
||||||
return
|
|
||||||
|
|
||||||
position_to_delete = max(keys[:time])
|
|
||||||
for key in keys[:time]:
|
|
||||||
del self.pos_time[key]
|
|
||||||
|
|
||||||
self._clear_queue_before_pos(position_to_delete)
|
|
||||||
|
|
||||||
def _clear_queue_before_pos(self, position_to_delete: int) -> None:
|
|
||||||
"""Clear all the queues from before a given position"""
|
|
||||||
with Measure(self.clock, "send_queue._clear"):
|
|
||||||
# Delete things out of presence maps
|
|
||||||
keys = self.presence_changed.keys()
|
|
||||||
i = self.presence_changed.bisect_left(position_to_delete)
|
|
||||||
for key in keys[:i]:
|
|
||||||
del self.presence_changed[key]
|
|
||||||
|
|
||||||
user_ids = {
|
|
||||||
user_id for uids in self.presence_changed.values() for user_id in uids
|
|
||||||
}
|
|
||||||
|
|
||||||
keys = self.presence_destinations.keys()
|
|
||||||
i = self.presence_destinations.bisect_left(position_to_delete)
|
|
||||||
for key in keys[:i]:
|
|
||||||
del self.presence_destinations[key]
|
|
||||||
|
|
||||||
user_ids.update(
|
|
||||||
user_id for user_id, _ in self.presence_destinations.values()
|
|
||||||
)
|
|
||||||
|
|
||||||
to_del = [
|
|
||||||
user_id for user_id in self.presence_map if user_id not in user_ids
|
|
||||||
]
|
|
||||||
for user_id in to_del:
|
|
||||||
del self.presence_map[user_id]
|
|
||||||
|
|
||||||
# Delete things out of keyed edus
|
|
||||||
keys = self.keyed_edu_changed.keys()
|
|
||||||
i = self.keyed_edu_changed.bisect_left(position_to_delete)
|
|
||||||
for key in keys[:i]:
|
|
||||||
del self.keyed_edu_changed[key]
|
|
||||||
|
|
||||||
live_keys = set()
|
|
||||||
for edu_key in self.keyed_edu_changed.values():
|
|
||||||
live_keys.add(edu_key)
|
|
||||||
|
|
||||||
keys_to_del = [
|
|
||||||
edu_key for edu_key in self.keyed_edu if edu_key not in live_keys
|
|
||||||
]
|
|
||||||
for edu_key in keys_to_del:
|
|
||||||
del self.keyed_edu[edu_key]
|
|
||||||
|
|
||||||
# Delete things out of edu map
|
|
||||||
keys = self.edus.keys()
|
|
||||||
i = self.edus.bisect_left(position_to_delete)
|
|
||||||
for key in keys[:i]:
|
|
||||||
del self.edus[key]
|
|
||||||
|
|
||||||
def notify_new_events(self, max_token: RoomStreamToken) -> None:
|
|
||||||
"""As per FederationSender"""
|
|
||||||
# This should never get called.
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
def build_and_send_edu(
|
|
||||||
self,
|
|
||||||
destination: str,
|
|
||||||
edu_type: str,
|
|
||||||
content: JsonDict,
|
|
||||||
key: Optional[Hashable] = None,
|
|
||||||
) -> None:
|
|
||||||
"""As per FederationSender"""
|
|
||||||
if destination == self.server_name:
|
|
||||||
logger.info("Not sending EDU to ourselves")
|
|
||||||
return
|
|
||||||
|
|
||||||
pos = self._next_pos()
|
|
||||||
|
|
||||||
edu = Edu(
|
|
||||||
origin=self.server_name,
|
|
||||||
destination=destination,
|
|
||||||
edu_type=edu_type,
|
|
||||||
content=content,
|
|
||||||
)
|
|
||||||
|
|
||||||
if key:
|
|
||||||
assert isinstance(key, tuple)
|
|
||||||
self.keyed_edu[(destination, key)] = edu
|
|
||||||
self.keyed_edu_changed[pos] = (destination, key)
|
|
||||||
else:
|
|
||||||
self.edus[pos] = edu
|
|
||||||
|
|
||||||
self.notifier.on_new_replication_data()
|
|
||||||
|
|
||||||
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
|
|
||||||
"""As per FederationSender
|
|
||||||
|
|
||||||
Args:
|
|
||||||
receipt:
|
|
||||||
"""
|
|
||||||
# nothing to do here: the replication listener will handle it.
|
|
||||||
|
|
||||||
def send_presence(self, states: List[UserPresenceState]) -> None:
|
|
||||||
"""As per FederationSender
|
|
||||||
|
|
||||||
Args:
|
|
||||||
states
|
|
||||||
"""
|
|
||||||
pos = self._next_pos()
|
|
||||||
|
|
||||||
# We only want to send presence for our own users, so lets always just
|
|
||||||
# filter here just in case.
|
|
||||||
local_states = [s for s in states if self.is_mine_id(s.user_id)]
|
|
||||||
|
|
||||||
self.presence_map.update({state.user_id: state for state in local_states})
|
|
||||||
self.presence_changed[pos] = [state.user_id for state in local_states]
|
|
||||||
|
|
||||||
self.notifier.on_new_replication_data()
|
|
||||||
|
|
||||||
def send_presence_to_destinations(
|
|
||||||
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
|
|
||||||
) -> None:
|
|
||||||
"""As per FederationSender
|
|
||||||
|
|
||||||
Args:
|
|
||||||
states
|
|
||||||
destinations
|
|
||||||
"""
|
|
||||||
for state in states:
|
|
||||||
pos = self._next_pos()
|
|
||||||
self.presence_map.update({state.user_id: state for state in states})
|
|
||||||
self.presence_destinations[pos] = (state.user_id, destinations)
|
|
||||||
|
|
||||||
self.notifier.on_new_replication_data()
|
|
||||||
|
|
||||||
def send_device_messages(self, destination: str) -> None:
|
|
||||||
"""As per FederationSender"""
|
|
||||||
# We don't need to replicate this as it gets sent down a different
|
|
||||||
# stream.
|
|
||||||
|
|
||||||
def wake_destination(self, server: str) -> None:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def get_current_token(self) -> int:
|
|
||||||
return self.pos - 1
|
|
||||||
|
|
||||||
def federation_ack(self, instance_name: str, token: int) -> None:
|
|
||||||
if self._sender_instances:
|
|
||||||
# If we have configured multiple federation sender instances we need
|
|
||||||
# to track their positions separately, and only clear the queue up
|
|
||||||
# to the token all instances have acked.
|
|
||||||
self._sender_positions[instance_name] = token
|
|
||||||
token = min(self._sender_positions.values())
|
|
||||||
|
|
||||||
self._clear_queue_before_pos(token)
|
|
||||||
|
|
||||||
async def get_replication_rows(
|
|
||||||
self, instance_name: str, from_token: int, to_token: int, target_row_count: int
|
|
||||||
) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
|
|
||||||
"""Get rows to be sent over federation between the two tokens
|
|
||||||
|
|
||||||
Args:
|
|
||||||
instance_name: the name of the current process
|
|
||||||
from_token: the previous stream token: the starting point for fetching the
|
|
||||||
updates
|
|
||||||
to_token: the new stream token: the point to get updates up to
|
|
||||||
target_row_count: a target for the number of rows to be returned.
|
|
||||||
|
|
||||||
Returns: a triplet `(updates, new_last_token, limited)`, where:
|
|
||||||
* `updates` is a list of `(token, row)` entries.
|
|
||||||
* `new_last_token` is the new position in stream.
|
|
||||||
* `limited` is whether there are more updates to fetch.
|
|
||||||
"""
|
|
||||||
# TODO: Handle target_row_count.
|
|
||||||
|
|
||||||
# To handle restarts where we wrap around
|
|
||||||
if from_token > self.pos:
|
|
||||||
from_token = -1
|
|
||||||
|
|
||||||
# list of tuple(int, BaseFederationRow), where the first is the position
|
|
||||||
# of the federation stream.
|
|
||||||
rows = [] # type: List[Tuple[int, BaseFederationRow]]
|
|
||||||
|
|
||||||
# Fetch changed presence
|
|
||||||
i = self.presence_changed.bisect_right(from_token)
|
|
||||||
j = self.presence_changed.bisect_right(to_token) + 1
|
|
||||||
dest_user_ids = [
|
|
||||||
(pos, user_id)
|
|
||||||
for pos, user_id_list in self.presence_changed.items()[i:j]
|
|
||||||
for user_id in user_id_list
|
|
||||||
]
|
|
||||||
|
|
||||||
for (key, user_id) in dest_user_ids:
|
|
||||||
rows.append((key, PresenceRow(state=self.presence_map[user_id])))
|
|
||||||
|
|
||||||
# Fetch presence to send to destinations
|
|
||||||
i = self.presence_destinations.bisect_right(from_token)
|
|
||||||
j = self.presence_destinations.bisect_right(to_token) + 1
|
|
||||||
|
|
||||||
for pos, (user_id, dests) in self.presence_destinations.items()[i:j]:
|
|
||||||
rows.append(
|
|
||||||
(
|
|
||||||
pos,
|
|
||||||
PresenceDestinationsRow(
|
|
||||||
state=self.presence_map[user_id], destinations=list(dests)
|
|
||||||
),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Fetch changes keyed edus
|
|
||||||
i = self.keyed_edu_changed.bisect_right(from_token)
|
|
||||||
j = self.keyed_edu_changed.bisect_right(to_token) + 1
|
|
||||||
# We purposefully clobber based on the key here, python dict comprehensions
|
|
||||||
# always use the last value, so this will correctly point to the last
|
|
||||||
# stream position.
|
|
||||||
keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
|
|
||||||
|
|
||||||
for ((destination, edu_key), pos) in keyed_edus.items():
|
|
||||||
rows.append(
|
|
||||||
(
|
|
||||||
pos,
|
|
||||||
KeyedEduRow(
|
|
||||||
key=edu_key, edu=self.keyed_edu[(destination, edu_key)]
|
|
||||||
),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Fetch changed edus
|
|
||||||
i = self.edus.bisect_right(from_token)
|
|
||||||
j = self.edus.bisect_right(to_token) + 1
|
|
||||||
edus = self.edus.items()[i:j]
|
|
||||||
|
|
||||||
for (pos, edu) in edus:
|
|
||||||
rows.append((pos, EduRow(edu)))
|
|
||||||
|
|
||||||
# Sort rows based on pos
|
|
||||||
rows.sort()
|
|
||||||
|
|
||||||
return (
|
|
||||||
[(pos, (row.TypeId, row.to_data())) for pos, row in rows],
|
|
||||||
to_token,
|
|
||||||
False,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class BaseFederationRow:
|
|
||||||
"""Base class for rows to be sent in the federation stream.
|
|
||||||
|
|
||||||
Specifies how to identify, serialize and deserialize the different types.
|
|
||||||
"""
|
|
||||||
|
|
||||||
TypeId = "" # Unique string that ids the type. Must be overridden in sub classes.
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_data(data):
|
|
||||||
"""Parse the data from the federation stream into a row.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
data: The value of ``data`` from FederationStreamRow.data, type
|
|
||||||
depends on the type of stream
|
|
||||||
"""
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
def to_data(self):
|
|
||||||
"""Serialize this row to be sent over the federation stream.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The value to be sent in FederationStreamRow.data. The type depends
|
|
||||||
on the type of stream.
|
|
||||||
"""
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
def add_to_buffer(self, buff):
|
|
||||||
"""Add this row to the appropriate field in the buffer ready for this
|
|
||||||
to be sent over federation.
|
|
||||||
|
|
||||||
We use a buffer so that we can batch up events that have come in at
|
|
||||||
the same time and send them all at once.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
buff (BufferedToSend)
|
|
||||||
"""
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
|
|
||||||
class PresenceRow(
|
|
||||||
BaseFederationRow, namedtuple("PresenceRow", ("state",)) # UserPresenceState
|
|
||||||
):
|
|
||||||
TypeId = "p"
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_data(data):
|
|
||||||
return PresenceRow(state=UserPresenceState.from_dict(data))
|
|
||||||
|
|
||||||
def to_data(self):
|
|
||||||
return self.state.as_dict()
|
|
||||||
|
|
||||||
def add_to_buffer(self, buff):
|
|
||||||
buff.presence.append(self.state)
|
|
||||||
|
|
||||||
|
|
||||||
class PresenceDestinationsRow(
|
|
||||||
BaseFederationRow,
|
|
||||||
namedtuple(
|
|
||||||
"PresenceDestinationsRow",
|
|
||||||
("state", "destinations"), # UserPresenceState # list[str]
|
|
||||||
),
|
|
||||||
):
|
|
||||||
TypeId = "pd"
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_data(data):
|
|
||||||
return PresenceDestinationsRow(
|
|
||||||
state=UserPresenceState.from_dict(data["state"]), destinations=data["dests"]
|
|
||||||
)
|
|
||||||
|
|
||||||
def to_data(self):
|
|
||||||
return {"state": self.state.as_dict(), "dests": self.destinations}
|
|
||||||
|
|
||||||
def add_to_buffer(self, buff):
|
|
||||||
buff.presence_destinations.append((self.state, self.destinations))
|
|
||||||
|
|
||||||
|
|
||||||
class KeyedEduRow(
|
|
||||||
BaseFederationRow,
|
|
||||||
namedtuple(
|
|
||||||
"KeyedEduRow",
|
|
||||||
("key", "edu"), # tuple(str) - the edu key passed to send_edu # Edu
|
|
||||||
),
|
|
||||||
):
|
|
||||||
"""Streams EDUs that have an associated key that is ued to clobber. For example,
|
|
||||||
typing EDUs clobber based on room_id.
|
|
||||||
"""
|
|
||||||
|
|
||||||
TypeId = "k"
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_data(data):
|
|
||||||
return KeyedEduRow(key=tuple(data["key"]), edu=Edu(**data["edu"]))
|
|
||||||
|
|
||||||
def to_data(self):
|
|
||||||
return {"key": self.key, "edu": self.edu.get_internal_dict()}
|
|
||||||
|
|
||||||
def add_to_buffer(self, buff):
|
|
||||||
buff.keyed_edus.setdefault(self.edu.destination, {})[self.key] = self.edu
|
|
||||||
|
|
||||||
|
|
||||||
class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu
|
|
||||||
"""Streams EDUs that don't have keys. See KeyedEduRow"""
|
|
||||||
|
|
||||||
TypeId = "e"
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_data(data):
|
|
||||||
return EduRow(Edu(**data))
|
|
||||||
|
|
||||||
def to_data(self):
|
|
||||||
return self.edu.get_internal_dict()
|
|
||||||
|
|
||||||
def add_to_buffer(self, buff):
|
|
||||||
buff.edus.setdefault(self.edu.destination, []).append(self.edu)
|
|
||||||
|
|
||||||
|
|
||||||
_rowtypes = (
|
|
||||||
PresenceRow,
|
|
||||||
PresenceDestinationsRow,
|
|
||||||
KeyedEduRow,
|
|
||||||
EduRow,
|
|
||||||
) # type: Tuple[Type[BaseFederationRow], ...]
|
|
||||||
|
|
||||||
TypeToRow = {Row.TypeId: Row for Row in _rowtypes}
|
|
||||||
|
|
||||||
|
|
||||||
ParsedFederationStreamData = namedtuple(
|
|
||||||
"ParsedFederationStreamData",
|
|
||||||
(
|
|
||||||
"presence", # list(UserPresenceState)
|
|
||||||
"presence_destinations", # list of tuples of UserPresenceState and destinations
|
|
||||||
"keyed_edus", # dict of destination -> { key -> Edu }
|
|
||||||
"edus", # dict of destination -> [Edu]
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def process_rows_for_federation(
|
|
||||||
transaction_queue: FederationSender,
|
|
||||||
rows: List[FederationStream.FederationStreamRow],
|
|
||||||
) -> None:
|
|
||||||
"""Parse a list of rows from the federation stream and put them in the
|
|
||||||
transaction queue ready for sending to the relevant homeservers.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
transaction_queue
|
|
||||||
rows
|
|
||||||
"""
|
|
||||||
|
|
||||||
# The federation stream contains a bunch of different types of
|
|
||||||
# rows that need to be handled differently. We parse the rows, put
|
|
||||||
# them into the appropriate collection and then send them off.
|
|
||||||
|
|
||||||
buff = ParsedFederationStreamData(
|
|
||||||
presence=[],
|
|
||||||
presence_destinations=[],
|
|
||||||
keyed_edus={},
|
|
||||||
edus={},
|
|
||||||
)
|
|
||||||
|
|
||||||
# Parse the rows in the stream and add to the buffer
|
|
||||||
for row in rows:
|
|
||||||
if row.type not in TypeToRow:
|
|
||||||
logger.error("Unrecognized federation row type %r", row.type)
|
|
||||||
continue
|
|
||||||
|
|
||||||
RowType = TypeToRow[row.type]
|
|
||||||
parsed_row = RowType.from_data(row.data)
|
|
||||||
parsed_row.add_to_buffer(buff)
|
|
||||||
|
|
||||||
if buff.presence:
|
|
||||||
transaction_queue.send_presence(buff.presence)
|
|
||||||
|
|
||||||
for state, destinations in buff.presence_destinations:
|
|
||||||
transaction_queue.send_presence_to_destinations(
|
|
||||||
states=[state], destinations=destinations
|
|
||||||
)
|
|
||||||
|
|
||||||
for destination, edu_map in buff.keyed_edus.items():
|
|
||||||
for key, edu in edu_map.items():
|
|
||||||
transaction_queue.send_edu(edu, key)
|
|
||||||
|
|
||||||
for destination, edu_list in buff.edus.items():
|
|
||||||
for edu in edu_list:
|
|
||||||
transaction_queue.send_edu(edu, None)
|
|
||||||
@@ -24,8 +24,6 @@ from synapse.events import EventBase
|
|||||||
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
|
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
|
||||||
from synapse.federation.sender.transaction_manager import TransactionManager
|
from synapse.federation.sender.transaction_manager import TransactionManager
|
||||||
from synapse.federation.units import Edu
|
from synapse.federation.units import Edu
|
||||||
from synapse.handlers.presence import get_interested_remotes
|
|
||||||
from synapse.logging.context import preserve_fn
|
|
||||||
from synapse.metrics import (
|
from synapse.metrics import (
|
||||||
LaterGauge,
|
LaterGauge,
|
||||||
event_processing_loop_counter,
|
event_processing_loop_counter,
|
||||||
@@ -34,7 +32,7 @@ from synapse.metrics import (
|
|||||||
)
|
)
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.types import Collection, JsonDict, ReadReceipt, RoomStreamToken
|
from synapse.types import Collection, JsonDict, ReadReceipt, RoomStreamToken
|
||||||
from synapse.util.metrics import Measure, measure_func
|
from synapse.util.metrics import Measure
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from synapse.events.presence_router import PresenceRouter
|
from synapse.events.presence_router import PresenceRouter
|
||||||
@@ -79,15 +77,6 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
|
|||||||
"""
|
"""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def send_presence(self, states: List[UserPresenceState]) -> None:
|
|
||||||
"""Send the new presence states to the appropriate destinations.
|
|
||||||
|
|
||||||
This actually queues up the presence states ready for sending and
|
|
||||||
triggers a background task to process them and send out the transactions.
|
|
||||||
"""
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def send_presence_to_destinations(
|
def send_presence_to_destinations(
|
||||||
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
|
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
|
||||||
@@ -134,10 +123,6 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
|
|||||||
def get_current_token(self) -> int:
|
def get_current_token(self) -> int:
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def federation_ack(self, instance_name: str, token: int) -> None:
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
async def get_replication_rows(
|
async def get_replication_rows(
|
||||||
self, instance_name: str, from_token: int, to_token: int, target_row_count: int
|
self, instance_name: str, from_token: int, to_token: int, target_row_count: int
|
||||||
@@ -176,11 +161,6 @@ class FederationSender(AbstractFederationSender):
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Map of user_id -> UserPresenceState for all the pending presence
|
|
||||||
# to be sent out by user_id. Entries here get processed and put in
|
|
||||||
# pending_presence_by_dest
|
|
||||||
self.pending_presence = {} # type: Dict[str, UserPresenceState]
|
|
||||||
|
|
||||||
LaterGauge(
|
LaterGauge(
|
||||||
"synapse_federation_transaction_queue_pending_pdus",
|
"synapse_federation_transaction_queue_pending_pdus",
|
||||||
"",
|
"",
|
||||||
@@ -201,8 +181,6 @@ class FederationSender(AbstractFederationSender):
|
|||||||
self._is_processing = False
|
self._is_processing = False
|
||||||
self._last_poked_id = -1
|
self._last_poked_id = -1
|
||||||
|
|
||||||
self._processing_pending_presence = False
|
|
||||||
|
|
||||||
# map from room_id to a set of PerDestinationQueues which we believe are
|
# map from room_id to a set of PerDestinationQueues which we believe are
|
||||||
# awaiting a call to flush_read_receipts_for_room. The presence of an entry
|
# awaiting a call to flush_read_receipts_for_room. The presence of an entry
|
||||||
# here for a given room means that we are rate-limiting RR flushes to that room,
|
# here for a given room means that we are rate-limiting RR flushes to that room,
|
||||||
@@ -546,48 +524,6 @@ class FederationSender(AbstractFederationSender):
|
|||||||
for queue in queues:
|
for queue in queues:
|
||||||
queue.flush_read_receipts_for_room(room_id)
|
queue.flush_read_receipts_for_room(room_id)
|
||||||
|
|
||||||
@preserve_fn # the caller should not yield on this
|
|
||||||
async def send_presence(self, states: List[UserPresenceState]) -> None:
|
|
||||||
"""Send the new presence states to the appropriate destinations.
|
|
||||||
|
|
||||||
This actually queues up the presence states ready for sending and
|
|
||||||
triggers a background task to process them and send out the transactions.
|
|
||||||
"""
|
|
||||||
if not self.hs.config.use_presence:
|
|
||||||
# No-op if presence is disabled.
|
|
||||||
return
|
|
||||||
|
|
||||||
# First we queue up the new presence by user ID, so multiple presence
|
|
||||||
# updates in quick succession are correctly handled.
|
|
||||||
# We only want to send presence for our own users, so lets always just
|
|
||||||
# filter here just in case.
|
|
||||||
self.pending_presence.update(
|
|
||||||
{state.user_id: state for state in states if self.is_mine_id(state.user_id)}
|
|
||||||
)
|
|
||||||
|
|
||||||
# We then handle the new pending presence in batches, first figuring
|
|
||||||
# out the destinations we need to send each state to and then poking it
|
|
||||||
# to attempt a new transaction. We linearize this so that we don't
|
|
||||||
# accidentally mess up the ordering and send multiple presence updates
|
|
||||||
# in the wrong order
|
|
||||||
if self._processing_pending_presence:
|
|
||||||
return
|
|
||||||
|
|
||||||
self._processing_pending_presence = True
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
states_map = self.pending_presence
|
|
||||||
self.pending_presence = {}
|
|
||||||
|
|
||||||
if not states_map:
|
|
||||||
break
|
|
||||||
|
|
||||||
await self._process_presence_inner(list(states_map.values()))
|
|
||||||
except Exception:
|
|
||||||
logger.exception("Error sending presence states to servers")
|
|
||||||
finally:
|
|
||||||
self._processing_pending_presence = False
|
|
||||||
|
|
||||||
def send_presence_to_destinations(
|
def send_presence_to_destinations(
|
||||||
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
|
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
|
||||||
) -> None:
|
) -> None:
|
||||||
@@ -608,40 +544,6 @@ class FederationSender(AbstractFederationSender):
|
|||||||
continue
|
continue
|
||||||
self._get_per_destination_queue(destination).send_presence(states)
|
self._get_per_destination_queue(destination).send_presence(states)
|
||||||
|
|
||||||
@measure_func("txnqueue._process_presence")
|
|
||||||
async def _process_presence_inner(self, states: List[UserPresenceState]) -> None:
|
|
||||||
"""Given a list of states populate self.pending_presence_by_dest and
|
|
||||||
poke to send a new transaction to each destination
|
|
||||||
"""
|
|
||||||
# We pull the presence router here instead of __init__
|
|
||||||
# to prevent a dependency cycle:
|
|
||||||
#
|
|
||||||
# AuthHandler -> Notifier -> FederationSender
|
|
||||||
# -> PresenceRouter -> ModuleApi -> AuthHandler
|
|
||||||
if self._presence_router is None:
|
|
||||||
self._presence_router = self.hs.get_presence_router()
|
|
||||||
|
|
||||||
assert self._presence_router is not None
|
|
||||||
|
|
||||||
hosts_and_states = await get_interested_remotes(
|
|
||||||
self.store,
|
|
||||||
self._presence_router,
|
|
||||||
states,
|
|
||||||
self.state,
|
|
||||||
)
|
|
||||||
|
|
||||||
for destinations, states in hosts_and_states:
|
|
||||||
for destination in destinations:
|
|
||||||
if destination == self.server_name:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not self._federation_shard_config.should_handle(
|
|
||||||
self._instance_name, destination
|
|
||||||
):
|
|
||||||
continue
|
|
||||||
|
|
||||||
self._get_per_destination_queue(destination).send_presence(states)
|
|
||||||
|
|
||||||
def build_and_send_edu(
|
def build_and_send_edu(
|
||||||
self,
|
self,
|
||||||
destination: str,
|
destination: str,
|
||||||
@@ -729,10 +631,6 @@ class FederationSender(AbstractFederationSender):
|
|||||||
# to a worker.
|
# to a worker.
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
def federation_ack(self, instance_name: str, token: int) -> None:
|
|
||||||
# It is not expected that this gets called on FederationSender.
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def get_replication_rows(
|
async def get_replication_rows(
|
||||||
instance_name: str, from_token: int, to_token: int, target_row_count: int
|
instance_name: str, from_token: int, to_token: int, target_row_count: int
|
||||||
|
|||||||
@@ -484,7 +484,7 @@ class DeviceHandler(DeviceWorkerHandler):
|
|||||||
"device_list_key", position, users=[user_id], rooms=room_ids
|
"device_list_key", position, users=[user_id], rooms=room_ids
|
||||||
)
|
)
|
||||||
|
|
||||||
if hosts:
|
if hosts and self.federation_sender:
|
||||||
logger.info(
|
logger.info(
|
||||||
"Sending device list update notif for %r to: %r", user_id, hosts
|
"Sending device list update notif for %r to: %r", user_id, hosts
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -51,9 +51,7 @@ class DeviceMessageHandler:
|
|||||||
# same instance. Other federation sender instances will get notified by
|
# same instance. Other federation sender instances will get notified by
|
||||||
# `synapse.app.generic_worker.FederationSenderHandler` when it sees it
|
# `synapse.app.generic_worker.FederationSenderHandler` when it sees it
|
||||||
# in the to-device replication stream.
|
# in the to-device replication stream.
|
||||||
self.federation_sender = None
|
self.federation_sender = hs.get_federation_sender()
|
||||||
if hs.should_send_federation():
|
|
||||||
self.federation_sender = hs.get_federation_sender()
|
|
||||||
|
|
||||||
# If we can handle the to device EDUs we do so, otherwise we route them
|
# If we can handle the to device EDUs we do so, otherwise we route them
|
||||||
# to the appropriate worker.
|
# to the appropriate worker.
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ The methods that define policy are:
|
|||||||
import abc
|
import abc
|
||||||
import contextlib
|
import contextlib
|
||||||
import logging
|
import logging
|
||||||
|
from bisect import bisect
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
@@ -53,7 +54,9 @@ from synapse.replication.http.presence import (
|
|||||||
ReplicationBumpPresenceActiveTime,
|
ReplicationBumpPresenceActiveTime,
|
||||||
ReplicationPresenceSetState,
|
ReplicationPresenceSetState,
|
||||||
)
|
)
|
||||||
|
from synapse.replication.http.streams import ReplicationGetStreamUpdates
|
||||||
from synapse.replication.tcp.commands import ClearUserSyncsCommand
|
from synapse.replication.tcp.commands import ClearUserSyncsCommand
|
||||||
|
from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
|
||||||
from synapse.state import StateHandler
|
from synapse.state import StateHandler
|
||||||
from synapse.storage.databases.main import DataStore
|
from synapse.storage.databases.main import DataStore
|
||||||
from synapse.types import Collection, JsonDict, UserID, get_domain_from_id
|
from synapse.types import Collection, JsonDict, UserID, get_domain_from_id
|
||||||
@@ -124,6 +127,8 @@ class BasePresenceHandler(abc.ABC):
|
|||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.store = hs.get_datastore()
|
self.store = hs.get_datastore()
|
||||||
|
|
||||||
|
self.federation_queue = PresenceFederationQueue(hs, self)
|
||||||
|
|
||||||
self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
|
self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
|
||||||
|
|
||||||
active_presence = self.store.take_presence_startup_info()
|
active_presence = self.store.take_presence_startup_info()
|
||||||
@@ -245,9 +250,17 @@ class BasePresenceHandler(abc.ABC):
|
|||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
async def process_replication_rows(self, token, rows):
|
async def process_replication_rows(
|
||||||
|
self, stream_name: str, instance_name: str, token: int, rows: list
|
||||||
|
):
|
||||||
"""Process presence stream rows received over replication."""
|
"""Process presence stream rows received over replication."""
|
||||||
pass
|
await self.federation_queue.process_replication_rows(
|
||||||
|
stream_name, instance_name, token, rows
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_federation_queue(self) -> "PresenceFederationQueue":
|
||||||
|
"""Get the presence federation queue, if any."""
|
||||||
|
return self.federation_queue
|
||||||
|
|
||||||
|
|
||||||
class _NullContextManager(ContextManager[None]):
|
class _NullContextManager(ContextManager[None]):
|
||||||
@@ -265,6 +278,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
|||||||
|
|
||||||
self.presence_router = hs.get_presence_router()
|
self.presence_router = hs.get_presence_router()
|
||||||
self._presence_enabled = hs.config.use_presence
|
self._presence_enabled = hs.config.use_presence
|
||||||
|
self.state = hs.get_state_handler()
|
||||||
|
|
||||||
# The number of ongoing syncs on this process, by user id.
|
# The number of ongoing syncs on this process, by user id.
|
||||||
# Empty if _presence_enabled is false.
|
# Empty if _presence_enabled is false.
|
||||||
@@ -273,6 +287,8 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
|||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
self.instance_id = hs.get_instance_id()
|
self.instance_id = hs.get_instance_id()
|
||||||
|
|
||||||
|
self._federation = hs.get_federation_sender()
|
||||||
|
|
||||||
# user_id -> last_sync_ms. Lists the users that have stopped syncing
|
# user_id -> last_sync_ms. Lists the users that have stopped syncing
|
||||||
# but we haven't notified the master of that yet
|
# but we haven't notified the master of that yet
|
||||||
self.users_going_offline = {}
|
self.users_going_offline = {}
|
||||||
@@ -388,7 +404,14 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
|||||||
users=users_to_states.keys(),
|
users=users_to_states.keys(),
|
||||||
)
|
)
|
||||||
|
|
||||||
async def process_replication_rows(self, token, rows):
|
async def process_replication_rows(
|
||||||
|
self, stream_name: str, instance_name: str, token: int, rows: list
|
||||||
|
):
|
||||||
|
await super().process_replication_rows(stream_name, instance_name, token, rows)
|
||||||
|
|
||||||
|
if stream_name != PresenceStream.NAME:
|
||||||
|
return
|
||||||
|
|
||||||
states = [
|
states = [
|
||||||
UserPresenceState(
|
UserPresenceState(
|
||||||
row.user_id,
|
row.user_id,
|
||||||
@@ -408,6 +431,20 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
|||||||
stream_id = token
|
stream_id = token
|
||||||
await self.notify_from_replication(states, stream_id)
|
await self.notify_from_replication(states, stream_id)
|
||||||
|
|
||||||
|
# Handle poking the local federation sender, if there is one.
|
||||||
|
if not self._federation:
|
||||||
|
return
|
||||||
|
|
||||||
|
hosts_and_states = await get_interested_remotes(
|
||||||
|
self.store,
|
||||||
|
self.presence_router,
|
||||||
|
states,
|
||||||
|
self.state,
|
||||||
|
)
|
||||||
|
|
||||||
|
for destinations, states in hosts_and_states:
|
||||||
|
self._federation.send_presence_to_destinations(states, destinations)
|
||||||
|
|
||||||
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
|
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
|
||||||
return [
|
return [
|
||||||
user_id
|
user_id
|
||||||
@@ -463,11 +500,12 @@ class PresenceHandler(BasePresenceHandler):
|
|||||||
self.server_name = hs.hostname
|
self.server_name = hs.hostname
|
||||||
self.wheel_timer = WheelTimer()
|
self.wheel_timer = WheelTimer()
|
||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
self.federation = hs.get_federation_sender()
|
|
||||||
self.state = hs.get_state_handler()
|
self.state = hs.get_state_handler()
|
||||||
self.presence_router = hs.get_presence_router()
|
self.presence_router = hs.get_presence_router()
|
||||||
self._presence_enabled = hs.config.use_presence
|
self._presence_enabled = hs.config.use_presence
|
||||||
|
|
||||||
|
self.federation_sender = hs.get_federation_sender()
|
||||||
|
|
||||||
federation_registry = hs.get_federation_registry()
|
federation_registry = hs.get_federation_registry()
|
||||||
|
|
||||||
federation_registry.register_edu_handler("m.presence", self.incoming_presence)
|
federation_registry.register_edu_handler("m.presence", self.incoming_presence)
|
||||||
@@ -680,7 +718,17 @@ class PresenceHandler(BasePresenceHandler):
|
|||||||
if to_federation_ping:
|
if to_federation_ping:
|
||||||
federation_presence_out_counter.inc(len(to_federation_ping))
|
federation_presence_out_counter.inc(len(to_federation_ping))
|
||||||
|
|
||||||
self._push_to_remotes(to_federation_ping.values())
|
hosts_and_states = await get_interested_remotes(
|
||||||
|
self.store,
|
||||||
|
self.presence_router,
|
||||||
|
list(to_federation_ping.values()),
|
||||||
|
self.state,
|
||||||
|
)
|
||||||
|
|
||||||
|
for destinations, states in hosts_and_states:
|
||||||
|
self.federation_queue.send_presence_to_destinations(
|
||||||
|
states, destinations
|
||||||
|
)
|
||||||
|
|
||||||
async def _handle_timeouts(self):
|
async def _handle_timeouts(self):
|
||||||
"""Checks the presence of users that have timed out and updates as
|
"""Checks the presence of users that have timed out and updates as
|
||||||
@@ -920,15 +968,21 @@ class PresenceHandler(BasePresenceHandler):
|
|||||||
users=[UserID.from_string(u) for u in users_to_states],
|
users=[UserID.from_string(u) for u in users_to_states],
|
||||||
)
|
)
|
||||||
|
|
||||||
self._push_to_remotes(states)
|
# We only need to tell the local federation sender, if any, that new
|
||||||
|
# presence has happened. Other federation senders will get notified via
|
||||||
|
# the presence replication stream.
|
||||||
|
if not self.federation_sender:
|
||||||
|
return
|
||||||
|
|
||||||
def _push_to_remotes(self, states):
|
hosts_and_states = await get_interested_remotes(
|
||||||
"""Sends state updates to remote servers.
|
self.store,
|
||||||
|
self.presence_router,
|
||||||
|
states,
|
||||||
|
self.state,
|
||||||
|
)
|
||||||
|
|
||||||
Args:
|
for destinations, states in hosts_and_states:
|
||||||
states (list(UserPresenceState))
|
self.federation_sender.send_presence_to_destinations(states, destinations)
|
||||||
"""
|
|
||||||
self.federation.send_presence(states)
|
|
||||||
|
|
||||||
async def incoming_presence(self, origin, content):
|
async def incoming_presence(self, origin, content):
|
||||||
"""Called when we receive a `m.presence` EDU from a remote server."""
|
"""Called when we receive a `m.presence` EDU from a remote server."""
|
||||||
@@ -1166,7 +1220,7 @@ class PresenceHandler(BasePresenceHandler):
|
|||||||
|
|
||||||
# Send out user presence updates for each destination
|
# Send out user presence updates for each destination
|
||||||
for destination, user_state_set in presence_destinations.items():
|
for destination, user_state_set in presence_destinations.items():
|
||||||
self.federation.send_presence_to_destinations(
|
self.federation_queue.send_presence_to_destinations(
|
||||||
destinations=[destination], states=user_state_set
|
destinations=[destination], states=user_state_set
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -1811,3 +1865,171 @@ async def get_interested_remotes(
|
|||||||
hosts_and_states.append(([host], states))
|
hosts_and_states.append(([host], states))
|
||||||
|
|
||||||
return hosts_and_states
|
return hosts_and_states
|
||||||
|
|
||||||
|
|
||||||
|
class PresenceFederationQueue:
|
||||||
|
"""Handles sending ad hoc presence updates over federation, which are *not*
|
||||||
|
due to state updates (that get handled via the presence stream), e.g.
|
||||||
|
federation pings and sending existing present states to newly joined hosts.
|
||||||
|
|
||||||
|
Only the last N minutes will be queued, so if a federation sender instance
|
||||||
|
is down for longer then some updates will be dropped. This is OK as presence
|
||||||
|
is ephemeral, and so it will self correct eventually.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# How long to keep entries in the queue for. Workers that are down for
|
||||||
|
# longer than this duration will miss out on older updates.
|
||||||
|
_KEEP_ITEMS_IN_QUEUE_FOR_MS = 5 * 60 * 1000
|
||||||
|
|
||||||
|
# How often to check if we can expire entries from the queue.
|
||||||
|
_CLEAR_ITEMS_EVERY_MS = 60 * 1000
|
||||||
|
|
||||||
|
def __init__(self, hs: "HomeServer", presence_handler: BasePresenceHandler):
|
||||||
|
self._clock = hs.get_clock()
|
||||||
|
self._notifier = hs.get_notifier()
|
||||||
|
self._instance_name = hs.get_instance_name()
|
||||||
|
self._presence_handler = presence_handler
|
||||||
|
self._repl_client = ReplicationGetStreamUpdates.make_client(hs)
|
||||||
|
|
||||||
|
# Should we keep a queue of recent presence updates? We only bother if
|
||||||
|
# another process may be handling federation sending.
|
||||||
|
self._queue_presence_updates = True
|
||||||
|
|
||||||
|
# The federation sender if this instance is a federation sender.
|
||||||
|
self._federation = hs.get_federation_sender()
|
||||||
|
|
||||||
|
if self._federation:
|
||||||
|
# We don't bother queuing up presence states if only this instance
|
||||||
|
# is sending federation.
|
||||||
|
if hs.config.worker.federation_shard_config.instances == [
|
||||||
|
self._instance_name
|
||||||
|
]:
|
||||||
|
self._queue_presence_updates = False
|
||||||
|
|
||||||
|
# The queue of recently queued updates as tuples of: `(timestamp,
|
||||||
|
# stream_id, destinations, user_ids)`. We don't store the full states
|
||||||
|
# for efficiency, and remote workers will already have the full states
|
||||||
|
# cached.
|
||||||
|
self._queue = [] # type: List[Tuple[int, int, Collection[str], Set[str]]]
|
||||||
|
|
||||||
|
self._next_id = 1
|
||||||
|
|
||||||
|
# Map from instance name to current token
|
||||||
|
self._current_tokens = {} # type: Dict[str, int]
|
||||||
|
|
||||||
|
if self._queue_presence_updates:
|
||||||
|
self._clock.looping_call(self._clear_queue, self._CLEAR_ITEMS_EVERY_MS)
|
||||||
|
|
||||||
|
def _clear_queue(self):
|
||||||
|
"""Clear out older entries from the queue."""
|
||||||
|
clear_before = self._clock.time_msec() - self._KEEP_ITEMS_IN_QUEUE_FOR_MS
|
||||||
|
|
||||||
|
# The queue is sorted by timestamp, so we can bisect to find the right
|
||||||
|
# place to purge before. Note that we are searching using a 1-tuple with
|
||||||
|
# the time, which does The Right Thing since the queue is a tuple where
|
||||||
|
# the first item is a timestamp.
|
||||||
|
index = bisect(self._queue, (clear_before,))
|
||||||
|
self._queue = self._queue[index:]
|
||||||
|
|
||||||
|
def send_presence_to_destinations(
|
||||||
|
self, states: Collection[UserPresenceState], destinations: Collection[str]
|
||||||
|
) -> None:
|
||||||
|
"""Send the presence states to the given destinations.
|
||||||
|
|
||||||
|
Will forward to the local federation sender (if there is one) and queue
|
||||||
|
to send over replication (if there are other federation sender instances.).
|
||||||
|
"""
|
||||||
|
|
||||||
|
if self._federation:
|
||||||
|
self._federation.send_presence_to_destinations(states, destinations)
|
||||||
|
|
||||||
|
if not self._queue_presence_updates:
|
||||||
|
return
|
||||||
|
|
||||||
|
now = self._clock.time_msec()
|
||||||
|
|
||||||
|
stream_id = self._next_id
|
||||||
|
self._next_id += 1
|
||||||
|
|
||||||
|
self._queue.append((now, stream_id, destinations, {s.user_id for s in states}))
|
||||||
|
|
||||||
|
self._notifier.notify_replication()
|
||||||
|
|
||||||
|
def get_current_token(self, instance_name: str) -> int:
|
||||||
|
if instance_name == self._instance_name:
|
||||||
|
return self._next_id - 1
|
||||||
|
else:
|
||||||
|
return self._current_tokens.get(instance_name, 0)
|
||||||
|
|
||||||
|
async def get_replication_rows(
|
||||||
|
self,
|
||||||
|
instance_name: str,
|
||||||
|
from_token: int,
|
||||||
|
upto_token: int,
|
||||||
|
target_row_count: int,
|
||||||
|
) -> Tuple[List[Tuple[int, Tuple[str, str]]], int, bool]:
|
||||||
|
"""Get all the updates between the two tokens.
|
||||||
|
|
||||||
|
We return rows in the form of `(destination, user_id)` to keep the size
|
||||||
|
of each row bounded (rather than returning the sets in a row).
|
||||||
|
"""
|
||||||
|
if instance_name != self._instance_name:
|
||||||
|
# If not local we query over replication.
|
||||||
|
result = await self._repl_client(
|
||||||
|
instance_name=instance_name,
|
||||||
|
stream_name=PresenceFederationStream.NAME,
|
||||||
|
from_token=from_token,
|
||||||
|
upto_token=upto_token,
|
||||||
|
)
|
||||||
|
return result["updates"], result["upto_token"], result["limited"]
|
||||||
|
|
||||||
|
# We can find the correct position in the queue by noting that there is
|
||||||
|
# exactly one entry per stream ID, and that the last entry has an ID of
|
||||||
|
# `self._next_id - 1`, so we can count backwards from the end.
|
||||||
|
#
|
||||||
|
# Since the start of the queue is periodically truncated we need to
|
||||||
|
# handle the case where `from_token` stream ID has already been dropped.
|
||||||
|
start_idx = max(from_token - self._next_id, -len(self._queue))
|
||||||
|
|
||||||
|
to_send = [] # type: List[Tuple[int, Tuple[str, str]]]
|
||||||
|
limited = False
|
||||||
|
new_id = upto_token
|
||||||
|
for _, stream_id, destinations, user_ids in self._queue[start_idx:]:
|
||||||
|
if stream_id > upto_token:
|
||||||
|
break
|
||||||
|
|
||||||
|
new_id = stream_id
|
||||||
|
|
||||||
|
to_send.extend(
|
||||||
|
(stream_id, (destination, user_id))
|
||||||
|
for destination in destinations
|
||||||
|
for user_id in user_ids
|
||||||
|
)
|
||||||
|
|
||||||
|
if len(to_send) > target_row_count:
|
||||||
|
limited = True
|
||||||
|
break
|
||||||
|
|
||||||
|
return to_send, new_id, limited
|
||||||
|
|
||||||
|
async def process_replication_rows(
|
||||||
|
self, stream_name: str, instance_name: str, token: int, rows: list
|
||||||
|
):
|
||||||
|
if stream_name != PresenceFederationStream.NAME:
|
||||||
|
return
|
||||||
|
|
||||||
|
# We keep track of the current tokens
|
||||||
|
self._current_tokens[instance_name] = token
|
||||||
|
|
||||||
|
# If we're a federation sender we pull out the presence states to send
|
||||||
|
# and forward them on.
|
||||||
|
if not self._federation:
|
||||||
|
return
|
||||||
|
|
||||||
|
hosts_to_users = {} # type: Dict[str, Set[str]]
|
||||||
|
for row in rows:
|
||||||
|
hosts_to_users.setdefault(row.destination, set()).add(row.user_id)
|
||||||
|
|
||||||
|
for host, user_ids in hosts_to_users.items():
|
||||||
|
states = await self._presence_handler.current_state_for_users(user_ids)
|
||||||
|
self._federation.send_presence_to_destinations(states.values(), [host])
|
||||||
|
|||||||
@@ -36,9 +36,7 @@ class ReceiptsHandler(BaseHandler):
|
|||||||
# same instance. Other federation sender instances will get notified by
|
# same instance. Other federation sender instances will get notified by
|
||||||
# `synapse.app.generic_worker.FederationSenderHandler` when it sees it
|
# `synapse.app.generic_worker.FederationSenderHandler` when it sees it
|
||||||
# in the receipts stream.
|
# in the receipts stream.
|
||||||
self.federation_sender = None
|
self.federation_sender = hs.get_federation_sender()
|
||||||
if hs.should_send_federation():
|
|
||||||
self.federation_sender = hs.get_federation_sender()
|
|
||||||
|
|
||||||
# If we can handle the receipt EDUs we do so, otherwise we route them
|
# If we can handle the receipt EDUs we do so, otherwise we route them
|
||||||
# to the appropriate worker.
|
# to the appropriate worker.
|
||||||
|
|||||||
@@ -57,9 +57,7 @@ class FollowerTypingHandler:
|
|||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.is_mine_id = hs.is_mine_id
|
self.is_mine_id = hs.is_mine_id
|
||||||
|
|
||||||
self.federation = None
|
self.federation = hs.get_federation_sender()
|
||||||
if hs.should_send_federation():
|
|
||||||
self.federation = hs.get_federation_sender()
|
|
||||||
|
|
||||||
if hs.config.worker.writers.typing != hs.get_instance_name():
|
if hs.config.worker.writers.typing != hs.get_instance_name():
|
||||||
hs.get_federation_registry().register_instance_for_edu(
|
hs.get_federation_registry().register_instance_for_edu(
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ from typing import TYPE_CHECKING, Any, Generator, Iterable, List, Optional, Tupl
|
|||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
|
from synapse.handlers.presence import get_interested_remotes
|
||||||
from synapse.http.client import SimpleHttpClient
|
from synapse.http.client import SimpleHttpClient
|
||||||
from synapse.http.site import SynapseRequest
|
from synapse.http.site import SynapseRequest
|
||||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||||
@@ -50,6 +51,10 @@ class ModuleApi:
|
|||||||
self._auth_handler = auth_handler
|
self._auth_handler = auth_handler
|
||||||
self._server_name = hs.hostname
|
self._server_name = hs.hostname
|
||||||
self._presence_stream = hs.get_event_sources().sources["presence"]
|
self._presence_stream = hs.get_event_sources().sources["presence"]
|
||||||
|
self._state = hs.get_state_handler()
|
||||||
|
self._presence_router = hs.get_presence_router()
|
||||||
|
|
||||||
|
self._federation = self._hs.get_federation_sender()
|
||||||
|
|
||||||
# We expose these as properties below in order to attach a helpful docstring.
|
# We expose these as properties below in order to attach a helpful docstring.
|
||||||
self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient
|
self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient
|
||||||
@@ -423,6 +428,9 @@ class ModuleApi:
|
|||||||
# Force a presence initial_sync for this user next time
|
# Force a presence initial_sync for this user next time
|
||||||
self._send_full_presence_to_local_users.add(user)
|
self._send_full_presence_to_local_users.add(user)
|
||||||
else:
|
else:
|
||||||
|
if not self._federation:
|
||||||
|
continue
|
||||||
|
|
||||||
# Retrieve presence state for currently online users that this user
|
# Retrieve presence state for currently online users that this user
|
||||||
# is considered interested in
|
# is considered interested in
|
||||||
presence_events, _ = await self._presence_stream.get_new_events(
|
presence_events, _ = await self._presence_stream.get_new_events(
|
||||||
@@ -430,12 +438,16 @@ class ModuleApi:
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Send to remote destinations
|
# Send to remote destinations
|
||||||
await make_deferred_yieldable(
|
hosts_and_states = await get_interested_remotes(
|
||||||
# We pull the federation sender here as we can only do so on workers
|
self._store,
|
||||||
# that support sending presence
|
self._presence_router,
|
||||||
self._hs.get_federation_sender().send_presence(presence_events)
|
presence_events,
|
||||||
|
self._state,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
for destinations, states in hosts_and_states:
|
||||||
|
self._federation.send_presence_to_destinations(states, destinations)
|
||||||
|
|
||||||
|
|
||||||
class PublicRoomListManager:
|
class PublicRoomListManager:
|
||||||
"""Contains methods for adding to, removing from and querying whether a room
|
"""Contains methods for adding to, removing from and querying whether a room
|
||||||
|
|||||||
@@ -227,9 +227,7 @@ class Notifier:
|
|||||||
self.appservice_handler = hs.get_application_service_handler()
|
self.appservice_handler = hs.get_application_service_handler()
|
||||||
self._pusher_pool = hs.get_pusherpool()
|
self._pusher_pool = hs.get_pusherpool()
|
||||||
|
|
||||||
self.federation_sender = None
|
self.federation_sender = hs.get_federation_sender()
|
||||||
if hs.should_send_federation():
|
|
||||||
self.federation_sender = hs.get_federation_sender()
|
|
||||||
|
|
||||||
self.state_handler = hs.get_state_handler()
|
self.state_handler = hs.get_state_handler()
|
||||||
|
|
||||||
|
|||||||
@@ -20,16 +20,13 @@ from twisted.internet.defer import Deferred
|
|||||||
from twisted.internet.protocol import ReconnectingClientFactory
|
from twisted.internet.protocol import ReconnectingClientFactory
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.federation import send_queue
|
|
||||||
from synapse.federation.sender import FederationSender
|
from synapse.federation.sender import FederationSender
|
||||||
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
|
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
|
||||||
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
|
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
|
||||||
from synapse.replication.tcp.streams import (
|
from synapse.replication.tcp.streams import (
|
||||||
AccountDataStream,
|
AccountDataStream,
|
||||||
DeviceListsStream,
|
DeviceListsStream,
|
||||||
GroupServerStream,
|
GroupServerStream,
|
||||||
PresenceStream,
|
|
||||||
PushersStream,
|
PushersStream,
|
||||||
PushRulesStream,
|
PushRulesStream,
|
||||||
ReceiptsStream,
|
ReceiptsStream,
|
||||||
@@ -191,8 +188,6 @@ class ReplicationDataHandler:
|
|||||||
self.stop_pusher(row.user_id, row.app_id, row.pushkey)
|
self.stop_pusher(row.user_id, row.app_id, row.pushkey)
|
||||||
else:
|
else:
|
||||||
await self.start_pusher(row.user_id, row.app_id, row.pushkey)
|
await self.start_pusher(row.user_id, row.app_id, row.pushkey)
|
||||||
elif stream_name == PresenceStream.NAME:
|
|
||||||
await self._presence_handler.process_replication_rows(token, rows)
|
|
||||||
elif stream_name == EventsStream.NAME:
|
elif stream_name == EventsStream.NAME:
|
||||||
# We shouldn't get multiple rows per token for events stream, so
|
# We shouldn't get multiple rows per token for events stream, so
|
||||||
# we don't need to optimise this for multiple rows.
|
# we don't need to optimise this for multiple rows.
|
||||||
@@ -221,6 +216,10 @@ class ReplicationDataHandler:
|
|||||||
membership=row.data.membership,
|
membership=row.data.membership,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
await self._presence_handler.process_replication_rows(
|
||||||
|
stream_name, instance_name, token, rows
|
||||||
|
)
|
||||||
|
|
||||||
# Notify any waiting deferreds. The list is ordered by position so we
|
# Notify any waiting deferreds. The list is ordered by position so we
|
||||||
# just iterate through the list until we reach a position that is
|
# just iterate through the list until we reach a position that is
|
||||||
# greater than the received row position.
|
# greater than the received row position.
|
||||||
@@ -338,6 +337,7 @@ class FederationSenderHandler:
|
|||||||
self.store = hs.get_datastore()
|
self.store = hs.get_datastore()
|
||||||
self._is_mine_id = hs.is_mine_id
|
self._is_mine_id = hs.is_mine_id
|
||||||
self._hs = hs
|
self._hs = hs
|
||||||
|
self._presence_handler = hs.get_presence_handler()
|
||||||
|
|
||||||
# We need to make a temporary value to ensure that mypy picks up the
|
# We need to make a temporary value to ensure that mypy picks up the
|
||||||
# right type. We know we should have a federation sender instance since
|
# right type. We know we should have a federation sender instance since
|
||||||
@@ -356,14 +356,8 @@ class FederationSenderHandler:
|
|||||||
self.federation_sender.wake_destination(server)
|
self.federation_sender.wake_destination(server)
|
||||||
|
|
||||||
async def process_replication_rows(self, stream_name, token, rows):
|
async def process_replication_rows(self, stream_name, token, rows):
|
||||||
# The federation stream contains things that we want to send out, e.g.
|
|
||||||
# presence, typing, etc.
|
|
||||||
if stream_name == "federation":
|
|
||||||
send_queue.process_rows_for_federation(self.federation_sender, rows)
|
|
||||||
await self.update_token(token)
|
|
||||||
|
|
||||||
# ... and when new receipts happen
|
# ... and when new receipts happen
|
||||||
elif stream_name == ReceiptsStream.NAME:
|
if stream_name == ReceiptsStream.NAME:
|
||||||
await self._on_new_receipts(rows)
|
await self._on_new_receipts(rows)
|
||||||
|
|
||||||
# ... as well as device updates and messages
|
# ... as well as device updates and messages
|
||||||
@@ -401,54 +395,3 @@ class FederationSenderHandler:
|
|||||||
receipt.data,
|
receipt.data,
|
||||||
)
|
)
|
||||||
await self.federation_sender.send_read_receipt(receipt_info)
|
await self.federation_sender.send_read_receipt(receipt_info)
|
||||||
|
|
||||||
async def update_token(self, token):
|
|
||||||
"""Update the record of where we have processed to in the federation stream.
|
|
||||||
|
|
||||||
Called after we have processed a an update received over replication. Sends
|
|
||||||
a FEDERATION_ACK back to the master, and stores the token that we have processed
|
|
||||||
in `federation_stream_position` so that we can restart where we left off.
|
|
||||||
"""
|
|
||||||
self.federation_position = token
|
|
||||||
|
|
||||||
# We save and send the ACK to master asynchronously, so we don't block
|
|
||||||
# processing on persistence. We don't need to do this operation for
|
|
||||||
# every single RDATA we receive, we just need to do it periodically.
|
|
||||||
|
|
||||||
if self._fed_position_linearizer.is_queued(None):
|
|
||||||
# There is already a task queued up to save and send the token, so
|
|
||||||
# no need to queue up another task.
|
|
||||||
return
|
|
||||||
|
|
||||||
run_as_background_process("_save_and_send_ack", self._save_and_send_ack)
|
|
||||||
|
|
||||||
async def _save_and_send_ack(self):
|
|
||||||
"""Save the current federation position in the database and send an ACK
|
|
||||||
to master with where we're up to.
|
|
||||||
"""
|
|
||||||
# We should only be calling this once we've got a token.
|
|
||||||
assert self.federation_position is not None
|
|
||||||
|
|
||||||
try:
|
|
||||||
# We linearize here to ensure we don't have races updating the token
|
|
||||||
#
|
|
||||||
# XXX this appears to be redundant, since the ReplicationCommandHandler
|
|
||||||
# has a linearizer which ensures that we only process one line of
|
|
||||||
# replication data at a time. Should we remove it, or is it doing useful
|
|
||||||
# service for robustness? Or could we replace it with an assertion that
|
|
||||||
# we're not being re-entered?
|
|
||||||
|
|
||||||
with (await self._fed_position_linearizer.queue(None)):
|
|
||||||
# We persist and ack the same position, so we take a copy of it
|
|
||||||
# here as otherwise it can get modified from underneath us.
|
|
||||||
current_position = self.federation_position
|
|
||||||
|
|
||||||
await self.store.update_federation_out_pos(
|
|
||||||
"federation", current_position
|
|
||||||
)
|
|
||||||
|
|
||||||
# We ACK this token over replication so that the master can drop
|
|
||||||
# its in memory queues
|
|
||||||
self._hs.get_tcp_replication().send_federation_ack(current_position)
|
|
||||||
except Exception:
|
|
||||||
logger.exception("Error updating federation stream position")
|
|
||||||
|
|||||||
@@ -297,33 +297,6 @@ class ClearUserSyncsCommand(Command):
|
|||||||
return self.instance_id
|
return self.instance_id
|
||||||
|
|
||||||
|
|
||||||
class FederationAckCommand(Command):
|
|
||||||
"""Sent by the client when it has processed up to a given point in the
|
|
||||||
federation stream. This allows the master to drop in-memory caches of the
|
|
||||||
federation stream.
|
|
||||||
|
|
||||||
This must only be sent from one worker (i.e. the one sending federation)
|
|
||||||
|
|
||||||
Format::
|
|
||||||
|
|
||||||
FEDERATION_ACK <instance_name> <token>
|
|
||||||
"""
|
|
||||||
|
|
||||||
NAME = "FEDERATION_ACK"
|
|
||||||
|
|
||||||
def __init__(self, instance_name: str, token: int):
|
|
||||||
self.instance_name = instance_name
|
|
||||||
self.token = token
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_line(cls, line: str) -> "FederationAckCommand":
|
|
||||||
instance_name, token = line.split(" ")
|
|
||||||
return cls(instance_name, int(token))
|
|
||||||
|
|
||||||
def to_line(self) -> str:
|
|
||||||
return "%s %s" % (self.instance_name, self.token)
|
|
||||||
|
|
||||||
|
|
||||||
class UserIpCommand(Command):
|
class UserIpCommand(Command):
|
||||||
"""Sent periodically when a worker sees activity from a client.
|
"""Sent periodically when a worker sees activity from a client.
|
||||||
|
|
||||||
@@ -389,7 +362,6 @@ _COMMANDS = (
|
|||||||
NameCommand,
|
NameCommand,
|
||||||
ReplicateCommand,
|
ReplicateCommand,
|
||||||
UserSyncCommand,
|
UserSyncCommand,
|
||||||
FederationAckCommand,
|
|
||||||
UserIpCommand,
|
UserIpCommand,
|
||||||
RemoteServerUpCommand,
|
RemoteServerUpCommand,
|
||||||
ClearUserSyncsCommand,
|
ClearUserSyncsCommand,
|
||||||
@@ -415,7 +387,6 @@ VALID_CLIENT_COMMANDS = (
|
|||||||
PingCommand.NAME,
|
PingCommand.NAME,
|
||||||
UserSyncCommand.NAME,
|
UserSyncCommand.NAME,
|
||||||
ClearUserSyncsCommand.NAME,
|
ClearUserSyncsCommand.NAME,
|
||||||
FederationAckCommand.NAME,
|
|
||||||
UserIpCommand.NAME,
|
UserIpCommand.NAME,
|
||||||
ErrorCommand.NAME,
|
ErrorCommand.NAME,
|
||||||
RemoteServerUpCommand.NAME,
|
RemoteServerUpCommand.NAME,
|
||||||
|
|||||||
@@ -39,7 +39,6 @@ from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
|
|||||||
from synapse.replication.tcp.commands import (
|
from synapse.replication.tcp.commands import (
|
||||||
ClearUserSyncsCommand,
|
ClearUserSyncsCommand,
|
||||||
Command,
|
Command,
|
||||||
FederationAckCommand,
|
|
||||||
PositionCommand,
|
PositionCommand,
|
||||||
RdataCommand,
|
RdataCommand,
|
||||||
RemoteServerUpCommand,
|
RemoteServerUpCommand,
|
||||||
@@ -54,7 +53,6 @@ from synapse.replication.tcp.streams import (
|
|||||||
BackfillStream,
|
BackfillStream,
|
||||||
CachesStream,
|
CachesStream,
|
||||||
EventsStream,
|
EventsStream,
|
||||||
FederationStream,
|
|
||||||
ReceiptsStream,
|
ReceiptsStream,
|
||||||
Stream,
|
Stream,
|
||||||
TagAccountDataStream,
|
TagAccountDataStream,
|
||||||
@@ -73,7 +71,6 @@ inbound_rdata_count = Counter(
|
|||||||
"synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"]
|
"synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"]
|
||||||
)
|
)
|
||||||
user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
|
user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
|
||||||
federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "")
|
|
||||||
remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
|
remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
|
||||||
|
|
||||||
user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")
|
user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")
|
||||||
@@ -157,11 +154,6 @@ class ReplicationCommandHandler:
|
|||||||
if hs.config.worker_app is not None:
|
if hs.config.worker_app is not None:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if stream.NAME == FederationStream.NAME and hs.config.send_federation:
|
|
||||||
# We only support federation stream if federation sending
|
|
||||||
# has been disabled on the master.
|
|
||||||
continue
|
|
||||||
|
|
||||||
self._streams_to_replicate.append(stream)
|
self._streams_to_replicate.append(stream)
|
||||||
|
|
||||||
# Map of stream name to batched updates. See RdataCommand for info on
|
# Map of stream name to batched updates. See RdataCommand for info on
|
||||||
@@ -365,14 +357,6 @@ class ReplicationCommandHandler:
|
|||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def on_FEDERATION_ACK(
|
|
||||||
self, conn: IReplicationConnection, cmd: FederationAckCommand
|
|
||||||
):
|
|
||||||
federation_ack_counter.inc()
|
|
||||||
|
|
||||||
if self._federation_sender:
|
|
||||||
self._federation_sender.federation_ack(cmd.instance_name, cmd.token)
|
|
||||||
|
|
||||||
def on_USER_IP(
|
def on_USER_IP(
|
||||||
self, conn: IReplicationConnection, cmd: UserIpCommand
|
self, conn: IReplicationConnection, cmd: UserIpCommand
|
||||||
) -> Optional[Awaitable[None]]:
|
) -> Optional[Awaitable[None]]:
|
||||||
@@ -655,12 +639,6 @@ class ReplicationCommandHandler:
|
|||||||
else:
|
else:
|
||||||
logger.warning("Dropping command as not connected: %r", cmd.NAME)
|
logger.warning("Dropping command as not connected: %r", cmd.NAME)
|
||||||
|
|
||||||
def send_federation_ack(self, token: int):
|
|
||||||
"""Ack data for the federation stream. This allows the master to drop
|
|
||||||
data stored purely in memory.
|
|
||||||
"""
|
|
||||||
self.send_command(FederationAckCommand(self._instance_name, token))
|
|
||||||
|
|
||||||
def send_user_sync(
|
def send_user_sync(
|
||||||
self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int
|
self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int
|
||||||
):
|
):
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ from synapse.replication.tcp.streams._base import (
|
|||||||
CachesStream,
|
CachesStream,
|
||||||
DeviceListsStream,
|
DeviceListsStream,
|
||||||
GroupServerStream,
|
GroupServerStream,
|
||||||
|
PresenceFederationStream,
|
||||||
PresenceStream,
|
PresenceStream,
|
||||||
PublicRoomsStream,
|
PublicRoomsStream,
|
||||||
PushersStream,
|
PushersStream,
|
||||||
@@ -42,7 +43,6 @@ from synapse.replication.tcp.streams._base import (
|
|||||||
UserSignatureStream,
|
UserSignatureStream,
|
||||||
)
|
)
|
||||||
from synapse.replication.tcp.streams.events import EventsStream
|
from synapse.replication.tcp.streams.events import EventsStream
|
||||||
from synapse.replication.tcp.streams.federation import FederationStream
|
|
||||||
|
|
||||||
STREAMS_MAP = {
|
STREAMS_MAP = {
|
||||||
stream.NAME: stream
|
stream.NAME: stream
|
||||||
@@ -50,6 +50,7 @@ STREAMS_MAP = {
|
|||||||
EventsStream,
|
EventsStream,
|
||||||
BackfillStream,
|
BackfillStream,
|
||||||
PresenceStream,
|
PresenceStream,
|
||||||
|
PresenceFederationStream,
|
||||||
TypingStream,
|
TypingStream,
|
||||||
ReceiptsStream,
|
ReceiptsStream,
|
||||||
PushRulesStream,
|
PushRulesStream,
|
||||||
@@ -58,7 +59,6 @@ STREAMS_MAP = {
|
|||||||
PublicRoomsStream,
|
PublicRoomsStream,
|
||||||
DeviceListsStream,
|
DeviceListsStream,
|
||||||
ToDeviceStream,
|
ToDeviceStream,
|
||||||
FederationStream,
|
|
||||||
TagAccountDataStream,
|
TagAccountDataStream,
|
||||||
AccountDataStream,
|
AccountDataStream,
|
||||||
GroupServerStream,
|
GroupServerStream,
|
||||||
@@ -71,6 +71,7 @@ __all__ = [
|
|||||||
"Stream",
|
"Stream",
|
||||||
"BackfillStream",
|
"BackfillStream",
|
||||||
"PresenceStream",
|
"PresenceStream",
|
||||||
|
"PresenceFederationStream",
|
||||||
"TypingStream",
|
"TypingStream",
|
||||||
"ReceiptsStream",
|
"ReceiptsStream",
|
||||||
"PushRulesStream",
|
"PushRulesStream",
|
||||||
|
|||||||
@@ -290,6 +290,30 @@ class PresenceStream(Stream):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class PresenceFederationStream(Stream):
|
||||||
|
"""A stream used to send ad hoc presence updates over federation.
|
||||||
|
|
||||||
|
Streams the remote destination and the user ID of the presence state to
|
||||||
|
send.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@attr.s(slots=True, auto_attribs=True)
|
||||||
|
class PresenceFederationStreamRow:
|
||||||
|
destination: str
|
||||||
|
user_id: str
|
||||||
|
|
||||||
|
NAME = "presence_federation"
|
||||||
|
ROW_TYPE = PresenceFederationStreamRow
|
||||||
|
|
||||||
|
def __init__(self, hs: "HomeServer"):
|
||||||
|
federation_queue = hs.get_presence_handler().get_federation_queue()
|
||||||
|
super().__init__(
|
||||||
|
hs.get_instance_name(),
|
||||||
|
federation_queue.get_current_token,
|
||||||
|
federation_queue.get_replication_rows,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class TypingStream(Stream):
|
class TypingStream(Stream):
|
||||||
TypingStreamRow = namedtuple(
|
TypingStreamRow = namedtuple(
|
||||||
"TypingStreamRow", ("room_id", "user_ids") # str # list(str)
|
"TypingStreamRow", ("room_id", "user_ids") # str # list(str)
|
||||||
|
|||||||
@@ -1,80 +0,0 @@
|
|||||||
# Copyright 2017 Vector Creations Ltd
|
|
||||||
# Copyright 2019 New Vector Ltd
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
from collections import namedtuple
|
|
||||||
from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Tuple
|
|
||||||
|
|
||||||
from synapse.replication.tcp.streams._base import (
|
|
||||||
Stream,
|
|
||||||
current_token_without_instance,
|
|
||||||
make_http_update_function,
|
|
||||||
)
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from synapse.server import HomeServer
|
|
||||||
|
|
||||||
|
|
||||||
class FederationStream(Stream):
|
|
||||||
"""Data to be sent over federation. Only available when master has federation
|
|
||||||
sending disabled.
|
|
||||||
"""
|
|
||||||
|
|
||||||
FederationStreamRow = namedtuple(
|
|
||||||
"FederationStreamRow",
|
|
||||||
(
|
|
||||||
"type", # str, the type of data as defined in the BaseFederationRows
|
|
||||||
"data", # dict, serialization of a federation.send_queue.BaseFederationRow
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
NAME = "federation"
|
|
||||||
ROW_TYPE = FederationStreamRow
|
|
||||||
|
|
||||||
def __init__(self, hs: "HomeServer"):
|
|
||||||
if hs.config.worker_app is None:
|
|
||||||
# master process: get updates from the FederationRemoteSendQueue.
|
|
||||||
# (if the master is configured to send federation itself, federation_sender
|
|
||||||
# will be a real FederationSender, which has stubs for current_token and
|
|
||||||
# get_replication_rows.)
|
|
||||||
federation_sender = hs.get_federation_sender()
|
|
||||||
current_token = current_token_without_instance(
|
|
||||||
federation_sender.get_current_token
|
|
||||||
)
|
|
||||||
update_function = (
|
|
||||||
federation_sender.get_replication_rows
|
|
||||||
) # type: Callable[[str, int, int, int], Awaitable[Tuple[List[Tuple[int, Any]], int, bool]]]
|
|
||||||
|
|
||||||
elif hs.should_send_federation():
|
|
||||||
# federation sender: Query master process
|
|
||||||
update_function = make_http_update_function(hs, self.NAME)
|
|
||||||
current_token = self._stub_current_token
|
|
||||||
|
|
||||||
else:
|
|
||||||
# other worker: stub out the update function (we're not interested in
|
|
||||||
# any updates so when we get a POSITION we do nothing)
|
|
||||||
update_function = self._stub_update_function
|
|
||||||
current_token = self._stub_current_token
|
|
||||||
|
|
||||||
super().__init__(hs.get_instance_name(), current_token, update_function)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _stub_current_token(instance_name: str) -> int:
|
|
||||||
# dummy current-token method for use on workers
|
|
||||||
return 0
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
async def _stub_update_function(
|
|
||||||
instance_name: str, from_token: int, upto_token: int, limit: int
|
|
||||||
) -> Tuple[list, int, bool]:
|
|
||||||
return [], upto_token, False
|
|
||||||
@@ -59,7 +59,6 @@ from synapse.federation.federation_server import (
|
|||||||
FederationHandlerRegistry,
|
FederationHandlerRegistry,
|
||||||
FederationServer,
|
FederationServer,
|
||||||
)
|
)
|
||||||
from synapse.federation.send_queue import FederationRemoteSendQueue
|
|
||||||
from synapse.federation.sender import AbstractFederationSender, FederationSender
|
from synapse.federation.sender import AbstractFederationSender, FederationSender
|
||||||
from synapse.federation.transport.client import TransportLayerClient
|
from synapse.federation.transport.client import TransportLayerClient
|
||||||
from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer
|
from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer
|
||||||
@@ -580,13 +579,11 @@ class HomeServer(metaclass=abc.ABCMeta):
|
|||||||
return TransportLayerClient(self)
|
return TransportLayerClient(self)
|
||||||
|
|
||||||
@cache_in_self
|
@cache_in_self
|
||||||
def get_federation_sender(self) -> AbstractFederationSender:
|
def get_federation_sender(self) -> Optional[AbstractFederationSender]:
|
||||||
if self.should_send_federation():
|
if self.should_send_federation():
|
||||||
return FederationSender(self)
|
return FederationSender(self)
|
||||||
elif not self.config.worker_app:
|
|
||||||
return FederationRemoteSendQueue(self)
|
|
||||||
else:
|
else:
|
||||||
raise Exception("Workers cannot send federation traffic")
|
return None
|
||||||
|
|
||||||
@cache_in_self
|
@cache_in_self
|
||||||
def get_receipts_handler(self) -> ReceiptsHandler:
|
def get_receipts_handler(self) -> ReceiptsHandler:
|
||||||
|
|||||||
@@ -471,6 +471,168 @@ class PresenceHandlerTestCase(unittest.HomeserverTestCase):
|
|||||||
self.assertEqual(state.state, PresenceState.OFFLINE)
|
self.assertEqual(state.state, PresenceState.OFFLINE)
|
||||||
|
|
||||||
|
|
||||||
|
class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
|
||||||
|
def prepare(self, reactor, clock, hs):
|
||||||
|
self.presence_handler = hs.get_presence_handler()
|
||||||
|
self.clock = hs.get_clock()
|
||||||
|
self.instance_name = hs.get_instance_name()
|
||||||
|
|
||||||
|
self.queue = self.presence_handler.get_federation_queue()
|
||||||
|
|
||||||
|
def test_send_and_get(self):
|
||||||
|
state1 = UserPresenceState.default("@user1:test")
|
||||||
|
state2 = UserPresenceState.default("@user2:test")
|
||||||
|
state3 = UserPresenceState.default("@user3:test")
|
||||||
|
|
||||||
|
prev_token = self.queue.get_current_token(self.instance_name)
|
||||||
|
|
||||||
|
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
|
||||||
|
self.queue.send_presence_to_destinations((state3,), ("dest3",))
|
||||||
|
|
||||||
|
now_token = self.queue.get_current_token(self.instance_name)
|
||||||
|
|
||||||
|
rows, upto_token, limited = self.get_success(
|
||||||
|
self.queue.get_replication_rows("master", prev_token, now_token, 10)
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(upto_token, now_token)
|
||||||
|
self.assertFalse(limited)
|
||||||
|
|
||||||
|
expected_rows = [
|
||||||
|
(1, ("dest1", "@user1:test")),
|
||||||
|
(1, ("dest2", "@user1:test")),
|
||||||
|
(1, ("dest1", "@user2:test")),
|
||||||
|
(1, ("dest2", "@user2:test")),
|
||||||
|
(2, ("dest3", "@user3:test")),
|
||||||
|
]
|
||||||
|
|
||||||
|
self.assertCountEqual(rows, expected_rows)
|
||||||
|
|
||||||
|
def test_send_and_get_split(self):
|
||||||
|
state1 = UserPresenceState.default("@user1:test")
|
||||||
|
state2 = UserPresenceState.default("@user2:test")
|
||||||
|
state3 = UserPresenceState.default("@user3:test")
|
||||||
|
|
||||||
|
prev_token = self.queue.get_current_token(self.instance_name)
|
||||||
|
|
||||||
|
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
|
||||||
|
|
||||||
|
now_token = self.queue.get_current_token(self.instance_name)
|
||||||
|
|
||||||
|
self.queue.send_presence_to_destinations((state3,), ("dest3",))
|
||||||
|
|
||||||
|
rows, upto_token, limited = self.get_success(
|
||||||
|
self.queue.get_replication_rows("master", prev_token, now_token, 10)
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(upto_token, now_token)
|
||||||
|
self.assertFalse(limited)
|
||||||
|
|
||||||
|
expected_rows = [
|
||||||
|
(1, ("dest1", "@user1:test")),
|
||||||
|
(1, ("dest2", "@user1:test")),
|
||||||
|
(1, ("dest1", "@user2:test")),
|
||||||
|
(1, ("dest2", "@user2:test")),
|
||||||
|
]
|
||||||
|
|
||||||
|
self.assertCountEqual(rows, expected_rows)
|
||||||
|
|
||||||
|
def test_clear_queue_all(self):
|
||||||
|
state1 = UserPresenceState.default("@user1:test")
|
||||||
|
state2 = UserPresenceState.default("@user2:test")
|
||||||
|
state3 = UserPresenceState.default("@user3:test")
|
||||||
|
|
||||||
|
prev_token = self.queue.get_current_token(self.instance_name)
|
||||||
|
|
||||||
|
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
|
||||||
|
self.queue.send_presence_to_destinations((state3,), ("dest3",))
|
||||||
|
|
||||||
|
self.reactor.advance(10 * 60 * 1000)
|
||||||
|
|
||||||
|
now_token = self.queue.get_current_token(self.instance_name)
|
||||||
|
|
||||||
|
rows, upto_token, limited = self.get_success(
|
||||||
|
self.queue.get_replication_rows("master", prev_token, now_token, 10)
|
||||||
|
)
|
||||||
|
self.assertEqual(upto_token, now_token)
|
||||||
|
self.assertFalse(limited)
|
||||||
|
self.assertCountEqual(rows, [])
|
||||||
|
|
||||||
|
prev_token = self.queue.get_current_token(self.instance_name)
|
||||||
|
|
||||||
|
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
|
||||||
|
self.queue.send_presence_to_destinations((state3,), ("dest3",))
|
||||||
|
|
||||||
|
now_token = self.queue.get_current_token(self.instance_name)
|
||||||
|
|
||||||
|
rows, upto_token, limited = self.get_success(
|
||||||
|
self.queue.get_replication_rows("master", prev_token, now_token, 10)
|
||||||
|
)
|
||||||
|
self.assertEqual(upto_token, now_token)
|
||||||
|
self.assertFalse(limited)
|
||||||
|
|
||||||
|
expected_rows = [
|
||||||
|
(3, ("dest1", "@user1:test")),
|
||||||
|
(3, ("dest2", "@user1:test")),
|
||||||
|
(3, ("dest1", "@user2:test")),
|
||||||
|
(3, ("dest2", "@user2:test")),
|
||||||
|
(4, ("dest3", "@user3:test")),
|
||||||
|
]
|
||||||
|
|
||||||
|
self.assertCountEqual(rows, expected_rows)
|
||||||
|
|
||||||
|
def test_partially_clear_queue(self):
|
||||||
|
state1 = UserPresenceState.default("@user1:test")
|
||||||
|
state2 = UserPresenceState.default("@user2:test")
|
||||||
|
state3 = UserPresenceState.default("@user3:test")
|
||||||
|
|
||||||
|
prev_token = self.queue.get_current_token(self.instance_name)
|
||||||
|
|
||||||
|
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
|
||||||
|
|
||||||
|
self.reactor.advance(2 * 60 * 1000)
|
||||||
|
|
||||||
|
self.queue.send_presence_to_destinations((state3,), ("dest3",))
|
||||||
|
|
||||||
|
self.reactor.advance(4 * 60 * 1000)
|
||||||
|
|
||||||
|
now_token = self.queue.get_current_token(self.instance_name)
|
||||||
|
|
||||||
|
rows, upto_token, limited = self.get_success(
|
||||||
|
self.queue.get_replication_rows("master", prev_token, now_token, 10)
|
||||||
|
)
|
||||||
|
self.assertEqual(upto_token, now_token)
|
||||||
|
self.assertFalse(limited)
|
||||||
|
|
||||||
|
expected_rows = [
|
||||||
|
(2, ("dest3", "@user3:test")),
|
||||||
|
]
|
||||||
|
self.assertCountEqual(rows, [])
|
||||||
|
|
||||||
|
prev_token = self.queue.get_current_token(self.instance_name)
|
||||||
|
|
||||||
|
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
|
||||||
|
self.queue.send_presence_to_destinations((state3,), ("dest3",))
|
||||||
|
|
||||||
|
now_token = self.queue.get_current_token(self.instance_name)
|
||||||
|
|
||||||
|
rows, upto_token, limited = self.get_success(
|
||||||
|
self.queue.get_replication_rows("master", prev_token, now_token, 10)
|
||||||
|
)
|
||||||
|
self.assertEqual(upto_token, now_token)
|
||||||
|
self.assertFalse(limited)
|
||||||
|
|
||||||
|
expected_rows = [
|
||||||
|
(3, ("dest1", "@user1:test")),
|
||||||
|
(3, ("dest2", "@user1:test")),
|
||||||
|
(3, ("dest1", "@user2:test")),
|
||||||
|
(3, ("dest2", "@user2:test")),
|
||||||
|
(4, ("dest3", "@user3:test")),
|
||||||
|
]
|
||||||
|
|
||||||
|
self.assertCountEqual(rows, expected_rows)
|
||||||
|
|
||||||
|
|
||||||
class PresenceJoinTestCase(unittest.HomeserverTestCase):
|
class PresenceJoinTestCase(unittest.HomeserverTestCase):
|
||||||
"""Tests remote servers get told about presence of users in the room when
|
"""Tests remote servers get told about presence of users in the room when
|
||||||
they join and when new local users join.
|
they join and when new local users join.
|
||||||
|
|||||||
@@ -1,80 +0,0 @@
|
|||||||
# Copyright 2019 New Vector Ltd
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
from synapse.federation.send_queue import EduRow
|
|
||||||
from synapse.replication.tcp.streams.federation import FederationStream
|
|
||||||
|
|
||||||
from tests.replication._base import BaseStreamTestCase
|
|
||||||
|
|
||||||
|
|
||||||
class FederationStreamTestCase(BaseStreamTestCase):
|
|
||||||
def _get_worker_hs_config(self) -> dict:
|
|
||||||
# enable federation sending on the worker
|
|
||||||
config = super()._get_worker_hs_config()
|
|
||||||
# TODO: make it so we don't need both of these
|
|
||||||
config["send_federation"] = False
|
|
||||||
config["worker_app"] = "synapse.app.federation_sender"
|
|
||||||
return config
|
|
||||||
|
|
||||||
def test_catchup(self):
|
|
||||||
"""Basic test of catchup on reconnect
|
|
||||||
|
|
||||||
Makes sure that updates sent while we are offline are received later.
|
|
||||||
"""
|
|
||||||
fed_sender = self.hs.get_federation_sender()
|
|
||||||
received_rows = self.test_handler.received_rdata_rows
|
|
||||||
|
|
||||||
fed_sender.build_and_send_edu("testdest", "m.test_edu", {"a": "b"})
|
|
||||||
|
|
||||||
self.reconnect()
|
|
||||||
self.reactor.advance(0)
|
|
||||||
|
|
||||||
# check we're testing what we think we are: no rows should yet have been
|
|
||||||
# received
|
|
||||||
self.assertEqual(received_rows, [])
|
|
||||||
|
|
||||||
# We should now see an attempt to connect to the master
|
|
||||||
request = self.handle_http_replication_attempt()
|
|
||||||
self.assert_request_is_get_repl_stream_updates(request, "federation")
|
|
||||||
|
|
||||||
# we should have received an update row
|
|
||||||
stream_name, token, row = received_rows.pop()
|
|
||||||
self.assertEqual(stream_name, "federation")
|
|
||||||
self.assertIsInstance(row, FederationStream.FederationStreamRow)
|
|
||||||
self.assertEqual(row.type, EduRow.TypeId)
|
|
||||||
edurow = EduRow.from_data(row.data)
|
|
||||||
self.assertEqual(edurow.edu.edu_type, "m.test_edu")
|
|
||||||
self.assertEqual(edurow.edu.origin, self.hs.hostname)
|
|
||||||
self.assertEqual(edurow.edu.destination, "testdest")
|
|
||||||
self.assertEqual(edurow.edu.content, {"a": "b"})
|
|
||||||
|
|
||||||
self.assertEqual(received_rows, [])
|
|
||||||
|
|
||||||
# additional updates should be transferred without an HTTP hit
|
|
||||||
fed_sender.build_and_send_edu("testdest", "m.test1", {"c": "d"})
|
|
||||||
self.reactor.advance(0)
|
|
||||||
# there should be no http hit
|
|
||||||
self.assertEqual(len(self.reactor.tcpClients), 0)
|
|
||||||
# ... but we should have a row
|
|
||||||
self.assertEqual(len(received_rows), 1)
|
|
||||||
|
|
||||||
stream_name, token, row = received_rows.pop()
|
|
||||||
self.assertEqual(stream_name, "federation")
|
|
||||||
self.assertIsInstance(row, FederationStream.FederationStreamRow)
|
|
||||||
self.assertEqual(row.type, EduRow.TypeId)
|
|
||||||
edurow = EduRow.from_data(row.data)
|
|
||||||
self.assertEqual(edurow.edu.edu_type, "m.test1")
|
|
||||||
self.assertEqual(edurow.edu.origin, self.hs.hostname)
|
|
||||||
self.assertEqual(edurow.edu.destination, "testdest")
|
|
||||||
self.assertEqual(edurow.edu.content, {"c": "d"})
|
|
||||||
@@ -1,73 +0,0 @@
|
|||||||
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
from unittest import mock
|
|
||||||
|
|
||||||
from synapse.app.generic_worker import GenericWorkerServer
|
|
||||||
from synapse.replication.tcp.commands import FederationAckCommand
|
|
||||||
from synapse.replication.tcp.protocol import IReplicationConnection
|
|
||||||
from synapse.replication.tcp.streams.federation import FederationStream
|
|
||||||
|
|
||||||
from tests.unittest import HomeserverTestCase
|
|
||||||
|
|
||||||
|
|
||||||
class FederationAckTestCase(HomeserverTestCase):
|
|
||||||
def default_config(self) -> dict:
|
|
||||||
config = super().default_config()
|
|
||||||
config["worker_app"] = "synapse.app.federation_sender"
|
|
||||||
config["send_federation"] = False
|
|
||||||
return config
|
|
||||||
|
|
||||||
def make_homeserver(self, reactor, clock):
|
|
||||||
hs = self.setup_test_homeserver(homeserver_to_use=GenericWorkerServer)
|
|
||||||
|
|
||||||
return hs
|
|
||||||
|
|
||||||
def test_federation_ack_sent(self):
|
|
||||||
"""A FEDERATION_ACK should be sent back after each RDATA federation
|
|
||||||
|
|
||||||
This test checks that the federation sender is correctly sending back
|
|
||||||
FEDERATION_ACK messages. The test works by spinning up a federation_sender
|
|
||||||
worker server, and then fishing out its ReplicationCommandHandler. We wire
|
|
||||||
the RCH up to a mock connection (so that we can observe the command being sent)
|
|
||||||
and then poke in an RDATA row.
|
|
||||||
|
|
||||||
XXX: it might be nice to do this by pretending to be a synapse master worker
|
|
||||||
(or a redis server), and having the worker connect to us via a mocked-up TCP
|
|
||||||
transport, rather than assuming that the implementation has a
|
|
||||||
ReplicationCommandHandler.
|
|
||||||
"""
|
|
||||||
rch = self.hs.get_tcp_replication()
|
|
||||||
|
|
||||||
# wire up the ReplicationCommandHandler to a mock connection, which needs
|
|
||||||
# to implement IReplicationConnection. (Note that Mock doesn't understand
|
|
||||||
# interfaces, but casing an interface to a list gives the attributes.)
|
|
||||||
mock_connection = mock.Mock(spec=list(IReplicationConnection))
|
|
||||||
rch.new_connection(mock_connection)
|
|
||||||
|
|
||||||
# tell it it received an RDATA row
|
|
||||||
self.get_success(
|
|
||||||
rch.on_rdata(
|
|
||||||
"federation",
|
|
||||||
"master",
|
|
||||||
token=10,
|
|
||||||
rows=[FederationStream.FederationStreamRow(type="x", data=[1, 2, 3])],
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
# now check that the FEDERATION_ACK was sent
|
|
||||||
mock_connection.send_command.assert_called_once()
|
|
||||||
cmd = mock_connection.send_command.call_args[0][0]
|
|
||||||
assert isinstance(cmd, FederationAckCommand)
|
|
||||||
self.assertEqual(cmd.token, 10)
|
|
||||||
Reference in New Issue
Block a user