mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-11 01:40:27 +00:00
Compare commits
8 Commits
madlittlem
...
rei/select
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d80b3596b5 | ||
|
|
6c579370d4 | ||
|
|
37153a5478 | ||
|
|
5e56736313 | ||
|
|
bd39e8363c | ||
|
|
5ebd31c349 | ||
|
|
16d70d0627 | ||
|
|
68ecfc346b |
3
mypy.ini
3
mypy.ini
@@ -177,6 +177,9 @@ ignore_missing_imports = True
|
|||||||
[mypy-saml2.*]
|
[mypy-saml2.*]
|
||||||
ignore_missing_imports = True
|
ignore_missing_imports = True
|
||||||
|
|
||||||
|
[mypy-scalene.*]
|
||||||
|
ignore_missing_imports = True
|
||||||
|
|
||||||
[mypy-service_identity.*]
|
[mypy-service_identity.*]
|
||||||
ignore_missing_imports = True
|
ignore_missing_imports = True
|
||||||
|
|
||||||
|
|||||||
4388
poetry.lock
generated
4388
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -210,6 +210,8 @@ parameterized = { version = ">=0.7.4", optional = true }
|
|||||||
idna = { version = ">=2.5", optional = true }
|
idna = { version = ">=2.5", optional = true }
|
||||||
pyicu = { version = ">=2.10.2", optional = true }
|
pyicu = { version = ">=2.10.2", optional = true }
|
||||||
|
|
||||||
|
scalene = { version = ">=1.5.16", optional = true, python = ">=3.8,<4.0.0" }
|
||||||
|
|
||||||
[tool.poetry.extras]
|
[tool.poetry.extras]
|
||||||
# NB: Packages that should be part of `pip install matrix-synapse[all]` need to be specified
|
# NB: Packages that should be part of `pip install matrix-synapse[all]` need to be specified
|
||||||
# twice: once here, and once in the `all` extra.
|
# twice: once here, and once in the `all` extra.
|
||||||
@@ -236,6 +238,8 @@ test = ["parameterized", "idna"]
|
|||||||
# Debian-based distributions).
|
# Debian-based distributions).
|
||||||
user-search = ["pyicu"]
|
user-search = ["pyicu"]
|
||||||
|
|
||||||
|
scalene = ["scalene"]
|
||||||
|
|
||||||
# The duplication here is awful. I hate hate hate hate hate it. However, for now I want
|
# The duplication here is awful. I hate hate hate hate hate it. However, for now I want
|
||||||
# to ensure you can still `pip install matrix-synapse[all]` like today. Two motivations:
|
# to ensure you can still `pip install matrix-synapse[all]` like today. Two motivations:
|
||||||
# 1) for new installations, I want instructions in existing documentation and tutorials
|
# 1) for new installations, I want instructions in existing documentation and tutorials
|
||||||
|
|||||||
13
synapse/hacks/__init__.py
Normal file
13
synapse/hacks/__init__.py
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
# Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
109
synapse/hacks/selective_scalene_profiling.py
Normal file
109
synapse/hacks/selective_scalene_profiling.py
Normal file
@@ -0,0 +1,109 @@
|
|||||||
|
# Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from typing import Any, Callable, Dict, Optional
|
||||||
|
|
||||||
|
from scalene import scalene_profiler
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ProfilingDecider:
|
||||||
|
INSTANCES: Dict[str, "ProfilingDecider"] = {}
|
||||||
|
|
||||||
|
def __init__(self, name: str, cond: Callable[[], bool]) -> None:
|
||||||
|
ProfilingDecider.INSTANCES[name] = self
|
||||||
|
|
||||||
|
logger.warning("Setting up profiler %r", name)
|
||||||
|
|
||||||
|
# Default to being armed if SCALENE is available as an env var.
|
||||||
|
self.armed = b"SCALENE" in os.environb
|
||||||
|
|
||||||
|
self._cond = cond
|
||||||
|
|
||||||
|
def decide(self) -> bool:
|
||||||
|
logger.warning("Decide? Armed? %r", self.armed)
|
||||||
|
if not self.armed:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if not self._cond():
|
||||||
|
logger.warning("Cond fail")
|
||||||
|
return False
|
||||||
|
|
||||||
|
self.armed = False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class CpuUtimeTracker:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self._update_times(time.time())
|
||||||
|
|
||||||
|
def _update_times(self, now_wall: float) -> None:
|
||||||
|
utime, _, _, _, elapsed = os.times()
|
||||||
|
self._last_utime = utime
|
||||||
|
self._last_elapsed = elapsed
|
||||||
|
self._last_wall = now_wall
|
||||||
|
|
||||||
|
self.min_elapse = 0.5
|
||||||
|
self.max_elapse = 120.0
|
||||||
|
|
||||||
|
def update_return_utime(self) -> Optional[float]:
|
||||||
|
"""
|
||||||
|
Returns CPU usage over this period, provided at least `min_elapse` have
|
||||||
|
elapsed.
|
||||||
|
"""
|
||||||
|
wall = time.time()
|
||||||
|
elapsed = wall - self._last_wall
|
||||||
|
if elapsed < self.min_elapse:
|
||||||
|
logger.warning("Not enough elapsed %r", elapsed)
|
||||||
|
return None
|
||||||
|
|
||||||
|
last_utime = self._last_utime
|
||||||
|
last_elapsed = self._last_elapsed
|
||||||
|
|
||||||
|
self._update_times(wall)
|
||||||
|
|
||||||
|
if elapsed > self.max_elapse:
|
||||||
|
# the average will be a bit skewy if so much time has elapsed. Ignore.
|
||||||
|
logger.warning("Too much elapsed %r", elapsed)
|
||||||
|
return None
|
||||||
|
|
||||||
|
usage = (self._last_utime - last_utime) / (self._last_elapsed - last_elapsed)
|
||||||
|
logger.info("Usage %r", usage)
|
||||||
|
return usage
|
||||||
|
|
||||||
|
|
||||||
|
class SelectiveProfiling:
|
||||||
|
def __init__(self, decider: ProfilingDecider, enable: bool = False):
|
||||||
|
self._decider = decider
|
||||||
|
self._enable = enable
|
||||||
|
logger.info("selective enable %r", enable)
|
||||||
|
|
||||||
|
def __enter__(self) -> None:
|
||||||
|
if not self._enable:
|
||||||
|
return
|
||||||
|
if not self._decider.decide():
|
||||||
|
self._enable = False
|
||||||
|
return
|
||||||
|
logger.info("STARTING")
|
||||||
|
scalene_profiler.start()
|
||||||
|
|
||||||
|
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
|
||||||
|
if not self._enable:
|
||||||
|
return
|
||||||
|
scalene_profiler.stop()
|
||||||
|
logger.info("STOPPED")
|
||||||
@@ -300,17 +300,23 @@ class E2eKeysHandler:
|
|||||||
# queries. We use the more efficient batched query_client_keys for all
|
# queries. We use the more efficient batched query_client_keys for all
|
||||||
# remaining users
|
# remaining users
|
||||||
user_ids_updated = []
|
user_ids_updated = []
|
||||||
for (user_id, device_list) in destination_query.items():
|
|
||||||
if user_id in user_ids_updated:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if device_list:
|
# Perform a user device resync for each user only once and only as long as:
|
||||||
continue
|
# - they have an empty device_list
|
||||||
|
# - they are in some rooms that this server can see
|
||||||
|
users_to_resync_devices = {
|
||||||
|
user_id
|
||||||
|
for (user_id, device_list) in destination_query.items()
|
||||||
|
if (not device_list) and (await self.store.get_rooms_for_user(user_id))
|
||||||
|
}
|
||||||
|
|
||||||
room_ids = await self.store.get_rooms_for_user(user_id)
|
logger.debug(
|
||||||
if not room_ids:
|
"%d users to resync devices for from destination %s",
|
||||||
continue
|
len(users_to_resync_devices),
|
||||||
|
destination,
|
||||||
|
)
|
||||||
|
|
||||||
|
for user_id in users_to_resync_devices:
|
||||||
# We've decided we're sharing a room with this user and should
|
# We've decided we're sharing a room with this user and should
|
||||||
# probably be tracking their device lists. However, we haven't
|
# probably be tracking their device lists. However, we haven't
|
||||||
# done an initial sync on the device list so we do it now.
|
# done an initial sync on the device list so we do it now.
|
||||||
|
|||||||
@@ -38,6 +38,11 @@ from synapse.api.errors import (
|
|||||||
)
|
)
|
||||||
from synapse.api.filtering import Filter
|
from synapse.api.filtering import Filter
|
||||||
from synapse.events.utils import format_event_for_client_v2
|
from synapse.events.utils import format_event_for_client_v2
|
||||||
|
from synapse.hacks.selective_scalene_profiling import (
|
||||||
|
CpuUtimeTracker,
|
||||||
|
ProfilingDecider,
|
||||||
|
SelectiveProfiling,
|
||||||
|
)
|
||||||
from synapse.http.server import HttpServer
|
from synapse.http.server import HttpServer
|
||||||
from synapse.http.servlet import (
|
from synapse.http.servlet import (
|
||||||
ResolveRoomIdMixin,
|
ResolveRoomIdMixin,
|
||||||
@@ -313,6 +318,19 @@ class RoomStateEventRestServlet(TransactionRestServlet):
|
|||||||
return 200, ret
|
return 200, ret
|
||||||
|
|
||||||
|
|
||||||
|
_sep_cpu = CpuUtimeTracker()
|
||||||
|
|
||||||
|
|
||||||
|
def _sep_cond() -> bool:
|
||||||
|
utime = _sep_cpu.update_return_utime()
|
||||||
|
if utime is None:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
send_event_profiler = ProfilingDecider("send_event", _sep_cond)
|
||||||
|
|
||||||
|
|
||||||
# TODO: Needs unit testing for generic events + feedback
|
# TODO: Needs unit testing for generic events + feedback
|
||||||
class RoomSendEventRestServlet(TransactionRestServlet):
|
class RoomSendEventRestServlet(TransactionRestServlet):
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
@@ -333,33 +351,38 @@ class RoomSendEventRestServlet(TransactionRestServlet):
|
|||||||
txn_id: Optional[str] = None,
|
txn_id: Optional[str] = None,
|
||||||
) -> Tuple[int, JsonDict]:
|
) -> Tuple[int, JsonDict]:
|
||||||
requester = await self.auth.get_user_by_req(request, allow_guest=True)
|
requester = await self.auth.get_user_by_req(request, allow_guest=True)
|
||||||
content = parse_json_object_from_request(request)
|
logger.info("SP!!!")
|
||||||
|
with SelectiveProfiling(
|
||||||
|
send_event_profiler,
|
||||||
|
enable=requester.user.to_string() == "@reivilibre.element:librepush.net",
|
||||||
|
):
|
||||||
|
content = parse_json_object_from_request(request)
|
||||||
|
|
||||||
event_dict: JsonDict = {
|
event_dict: JsonDict = {
|
||||||
"type": event_type,
|
"type": event_type,
|
||||||
"content": content,
|
"content": content,
|
||||||
"room_id": room_id,
|
"room_id": room_id,
|
||||||
"sender": requester.user.to_string(),
|
"sender": requester.user.to_string(),
|
||||||
}
|
}
|
||||||
|
|
||||||
if requester.app_service:
|
if requester.app_service:
|
||||||
origin_server_ts = parse_integer(request, "ts")
|
origin_server_ts = parse_integer(request, "ts")
|
||||||
if origin_server_ts is not None:
|
if origin_server_ts is not None:
|
||||||
event_dict["origin_server_ts"] = origin_server_ts
|
event_dict["origin_server_ts"] = origin_server_ts
|
||||||
|
|
||||||
try:
|
try:
|
||||||
(
|
(
|
||||||
event,
|
event,
|
||||||
_,
|
_,
|
||||||
) = await self.event_creation_handler.create_and_send_nonmember_event(
|
) = await self.event_creation_handler.create_and_send_nonmember_event(
|
||||||
requester, event_dict, txn_id=txn_id
|
requester, event_dict, txn_id=txn_id
|
||||||
)
|
)
|
||||||
event_id = event.event_id
|
event_id = event.event_id
|
||||||
except ShadowBanError:
|
except ShadowBanError:
|
||||||
event_id = "$" + random_string(43)
|
event_id = "$" + random_string(43)
|
||||||
|
|
||||||
set_tag("event_id", event_id)
|
set_tag("event_id", event_id)
|
||||||
return 200, {"event_id": event_id}
|
return 200, {"event_id": event_id}
|
||||||
|
|
||||||
def on_GET(
|
def on_GET(
|
||||||
self, request: SynapseRequest, room_id: str, event_type: str, txn_id: str
|
self, request: SynapseRequest, room_id: str, event_type: str, txn_id: str
|
||||||
|
|||||||
Reference in New Issue
Block a user