Compare commits

...

2 Commits

Author SHA1 Message Date
Erik Johnston
a48296dd86 WIP docs 2021-07-28 11:06:24 +01:00
Erik Johnston
13f9422e38 Allow /typing to be handled by any worker 2021-07-28 10:58:45 +01:00
5 changed files with 145 additions and 22 deletions

View File

@@ -319,11 +319,24 @@ effects of bursts of events from that bridge on events sent by normal users.
#### Stream writers
Additionally, there is *experimental* support for moving writing of specific
streams (such as events) off of the main process to a particular worker. (This
is only supported with Redis-based replication.)
Additionally, there is support for moving writing of specific streams (such as
events) off of the main process to a particular worker. (This is only supported
with Redis-based replication.)
Currently supported streams are `events` and `typing`.
Currently supported streams are, and which endpoints **must** be routed to them:
* `events`
* `typing`:
* `^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/typing`
* `to_device`:
`^/_matrix/client/(api/v1|r0|unstable)/sendToDevice/`
`^/_matrix/client/(api/v1|r0|unstable)/keys/claim`
`^/_matrix/client/(api/v1|r0|unstable)/room_keys`
* `account_data`
* `receipts`
* `presence`
To enable this, the worker must have a HTTP replication listener configured,
have a `worker_name` and be listed in the `instance_map` config. For example to
@@ -340,10 +353,10 @@ stream_writers:
events: event_persister1
```
The `events` stream also experimentally supports having multiple writers, where
work is sharded between them by room ID. Note that you *must* restart all worker
instances when adding or removing event persisters. An example `stream_writers`
configuration with multiple writers:
The `events` stream also supports having multiple writers, where work is sharded
between them by room ID. Note that you *must* restart all worker instances when
adding or removing event persisters. An example `stream_writers` configuration
with multiple writers:
```yaml
stream_writers:
@@ -352,6 +365,8 @@ stream_writers:
- event_persister2
```
All other streams currently only support having a single writer.
#### Background tasks
There is also *experimental* support for moving background tasks to a separate

View File

@@ -22,6 +22,7 @@ from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.replication.http.typing import ReplicationTypingRestServlet
from synapse.replication.tcp.streams import TypingStream
from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -61,7 +62,9 @@ class FollowerTypingHandler:
if hs.should_send_federation():
self.federation = hs.get_federation_sender()
if hs.config.worker.writers.typing != hs.get_instance_name():
self._typing_repl_client = ReplicationTypingRestServlet.make_client(hs)
self._typing_worker = hs.config.worker.writers.typing
if self._typing_worker != hs.get_instance_name():
hs.get_federation_registry().register_instance_for_edu(
"m.typing",
hs.config.worker.writers.typing,
@@ -199,6 +202,30 @@ class FollowerTypingHandler:
def get_current_token(self) -> int:
return self._latest_room_serial
async def started_typing(
self, target_user: UserID, requester: Requester, room_id: str, timeout: int
) -> None:
await self._typing_repl_client(
typing=True,
instance_name=self._typing_worker,
user_id=target_user.to_string(),
requester=requester,
room_id=room_id,
timeout=timeout,
)
async def stopped_typing(
self, target_user: UserID, requester: Requester, room_id: str
) -> None:
await self._typing_repl_client(
typing=True,
instance_name=self._typing_worker,
user_id=target_user.to_string(),
requester=requester,
room_id=room_id,
timeout=None,
)
class TypingWriterHandler(FollowerTypingHandler):
def __init__(self, hs: "HomeServer"):

View File

@@ -24,6 +24,7 @@ from synapse.replication.http import (
register,
send_event,
streams,
typing,
)
REPLICATION_PREFIX = "/_synapse/replication"
@@ -43,6 +44,7 @@ class ReplicationRestResource(JsonResource):
streams.register_servlets(hs, self)
account_data.register_servlets(hs, self)
push.register_servlets(hs, self)
typing.register_servlets(hs, self)
# The following can't currently be instantiated on workers.
if hs.config.worker.worker_app is None:

View File

@@ -0,0 +1,89 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.types import Requester, UserID
from typing import TYPE_CHECKING
import logging
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class ReplicationTypingRestServlet(ReplicationEndpoint):
"""Call to start or stop a user typing in a room.
Request format:
POST /_synapse/replication/typing/:room_id/:user_id
{
"requester": ...,
"typing": true,
"timeout": 30000
}
"""
NAME = "typing"
PATH_ARGS = ("room_id", "user_id")
CACHE = False
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.handler = hs.get_typing_handler()
self.store = hs.get_datastore()
@staticmethod
async def _serialize_payload(requester, room_id, user_id, typing, timeout):
payload = {
"requester": requester.serialize(),
"typing": typing,
"timeout": timeout,
}
return payload
async def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request)
requester = Requester.deserialize(self.store, content["requester"])
request.requester = requester
target_user = UserID.from_string(user_id)
if content["typing"]:
await self.handler.started_typing(
target_user,
requester,
room_id,
content["timeout"],
)
else:
await self.handler.stopped_typing(
target_user,
requester,
room_id,
)
return 200, {}
def register_servlets(hs, http_server):
ReplicationTypingRestServlet(hs).register(http_server)

View File

@@ -1254,18 +1254,11 @@ class RoomTypingRestServlet(RestServlet):
self.presence_handler = hs.get_presence_handler()
self.auth = hs.get_auth()
# If we're not on the typing writer instance we should scream if we get
# requests.
self._is_typing_writer = (
hs.config.worker.writers.typing == hs.get_instance_name()
)
self.handler = hs.get_typing_handler()
async def on_PUT(self, request, room_id, user_id):
requester = await self.auth.get_user_by_req(request)
if not self._is_typing_writer:
raise Exception("Got /typing request on instance that is not typing writer")
room_id = urlparse.unquote(room_id)
target_user = UserID.from_string(urlparse.unquote(user_id))
@@ -1276,19 +1269,16 @@ class RoomTypingRestServlet(RestServlet):
# Limit timeout to stop people from setting silly typing timeouts.
timeout = min(content.get("timeout", 30000), 120000)
# Defer getting the typing handler since it will raise on workers.
typing_handler = self.hs.get_typing_writer_handler()
try:
if content["typing"]:
await typing_handler.started_typing(
await self.handler.started_typing(
target_user=target_user,
requester=requester,
room_id=room_id,
timeout=timeout,
)
else:
await typing_handler.stopped_typing(
await self.handler.stopped_typing(
target_user=target_user, requester=requester, room_id=room_id
)
except ShadowBanError: