Compare commits

...

8 Commits

Author SHA1 Message Date
Olivier Wilkinson (reivilibre)
d80b3596b5 Revert "dodgy"
This reverts commit 6c579370d4.
2022-12-19 22:30:33 +00:00
Olivier Wilkinson (reivilibre)
6c579370d4 dodgy 2022-12-19 22:05:54 +00:00
Olivier Wilkinson (reivilibre)
37153a5478 Add logging 2022-12-19 21:48:00 +00:00
Olivier Wilkinson (reivilibre)
5e56736313 Example: room send event profiling 2022-12-19 20:55:56 +00:00
Olivier Wilkinson (reivilibre)
bd39e8363c Add helpers 2022-12-19 20:55:56 +00:00
Olivier Wilkinson (reivilibre)
5ebd31c349 Add Scalene 2022-12-19 20:20:35 +00:00
Olivier Wilkinson (reivilibre)
16d70d0627 Add log line so we can see what's going on 2022-12-19 20:20:14 +00:00
Olivier Wilkinson (reivilibre)
68ecfc346b Build a set of who we are interested in first and foremost 2022-12-19 20:20:14 +00:00
7 changed files with 2443 additions and 2165 deletions

View File

@@ -177,6 +177,9 @@ ignore_missing_imports = True
[mypy-saml2.*]
ignore_missing_imports = True
[mypy-scalene.*]
ignore_missing_imports = True
[mypy-service_identity.*]
ignore_missing_imports = True

4388
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -210,6 +210,8 @@ parameterized = { version = ">=0.7.4", optional = true }
idna = { version = ">=2.5", optional = true }
pyicu = { version = ">=2.10.2", optional = true }
scalene = { version = ">=1.5.16", optional = true, python = ">=3.8,<4.0.0" }
[tool.poetry.extras]
# NB: Packages that should be part of `pip install matrix-synapse[all]` need to be specified
# twice: once here, and once in the `all` extra.
@@ -236,6 +238,8 @@ test = ["parameterized", "idna"]
# Debian-based distributions).
user-search = ["pyicu"]
scalene = ["scalene"]
# The duplication here is awful. I hate hate hate hate hate it. However, for now I want
# to ensure you can still `pip install matrix-synapse[all]` like today. Two motivations:
# 1) for new installations, I want instructions in existing documentation and tutorials

13
synapse/hacks/__init__.py Normal file
View File

@@ -0,0 +1,13 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@@ -0,0 +1,109 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import time
from typing import Any, Callable, Dict, Optional
from scalene import scalene_profiler
logger = logging.getLogger(__name__)
class ProfilingDecider:
INSTANCES: Dict[str, "ProfilingDecider"] = {}
def __init__(self, name: str, cond: Callable[[], bool]) -> None:
ProfilingDecider.INSTANCES[name] = self
logger.warning("Setting up profiler %r", name)
# Default to being armed if SCALENE is available as an env var.
self.armed = b"SCALENE" in os.environb
self._cond = cond
def decide(self) -> bool:
logger.warning("Decide? Armed? %r", self.armed)
if not self.armed:
return False
if not self._cond():
logger.warning("Cond fail")
return False
self.armed = False
return True
class CpuUtimeTracker:
def __init__(self) -> None:
self._update_times(time.time())
def _update_times(self, now_wall: float) -> None:
utime, _, _, _, elapsed = os.times()
self._last_utime = utime
self._last_elapsed = elapsed
self._last_wall = now_wall
self.min_elapse = 0.5
self.max_elapse = 120.0
def update_return_utime(self) -> Optional[float]:
"""
Returns CPU usage over this period, provided at least `min_elapse` have
elapsed.
"""
wall = time.time()
elapsed = wall - self._last_wall
if elapsed < self.min_elapse:
logger.warning("Not enough elapsed %r", elapsed)
return None
last_utime = self._last_utime
last_elapsed = self._last_elapsed
self._update_times(wall)
if elapsed > self.max_elapse:
# the average will be a bit skewy if so much time has elapsed. Ignore.
logger.warning("Too much elapsed %r", elapsed)
return None
usage = (self._last_utime - last_utime) / (self._last_elapsed - last_elapsed)
logger.info("Usage %r", usage)
return usage
class SelectiveProfiling:
def __init__(self, decider: ProfilingDecider, enable: bool = False):
self._decider = decider
self._enable = enable
logger.info("selective enable %r", enable)
def __enter__(self) -> None:
if not self._enable:
return
if not self._decider.decide():
self._enable = False
return
logger.info("STARTING")
scalene_profiler.start()
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
if not self._enable:
return
scalene_profiler.stop()
logger.info("STOPPED")

View File

@@ -300,17 +300,23 @@ class E2eKeysHandler:
# queries. We use the more efficient batched query_client_keys for all
# remaining users
user_ids_updated = []
for (user_id, device_list) in destination_query.items():
if user_id in user_ids_updated:
continue
if device_list:
continue
# Perform a user device resync for each user only once and only as long as:
# - they have an empty device_list
# - they are in some rooms that this server can see
users_to_resync_devices = {
user_id
for (user_id, device_list) in destination_query.items()
if (not device_list) and (await self.store.get_rooms_for_user(user_id))
}
room_ids = await self.store.get_rooms_for_user(user_id)
if not room_ids:
continue
logger.debug(
"%d users to resync devices for from destination %s",
len(users_to_resync_devices),
destination,
)
for user_id in users_to_resync_devices:
# We've decided we're sharing a room with this user and should
# probably be tracking their device lists. However, we haven't
# done an initial sync on the device list so we do it now.

View File

@@ -38,6 +38,11 @@ from synapse.api.errors import (
)
from synapse.api.filtering import Filter
from synapse.events.utils import format_event_for_client_v2
from synapse.hacks.selective_scalene_profiling import (
CpuUtimeTracker,
ProfilingDecider,
SelectiveProfiling,
)
from synapse.http.server import HttpServer
from synapse.http.servlet import (
ResolveRoomIdMixin,
@@ -313,6 +318,19 @@ class RoomStateEventRestServlet(TransactionRestServlet):
return 200, ret
_sep_cpu = CpuUtimeTracker()
def _sep_cond() -> bool:
utime = _sep_cpu.update_return_utime()
if utime is None:
return False
return True
send_event_profiler = ProfilingDecider("send_event", _sep_cond)
# TODO: Needs unit testing for generic events + feedback
class RoomSendEventRestServlet(TransactionRestServlet):
def __init__(self, hs: "HomeServer"):
@@ -333,33 +351,38 @@ class RoomSendEventRestServlet(TransactionRestServlet):
txn_id: Optional[str] = None,
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request, allow_guest=True)
content = parse_json_object_from_request(request)
logger.info("SP!!!")
with SelectiveProfiling(
send_event_profiler,
enable=requester.user.to_string() == "@reivilibre.element:librepush.net",
):
content = parse_json_object_from_request(request)
event_dict: JsonDict = {
"type": event_type,
"content": content,
"room_id": room_id,
"sender": requester.user.to_string(),
}
event_dict: JsonDict = {
"type": event_type,
"content": content,
"room_id": room_id,
"sender": requester.user.to_string(),
}
if requester.app_service:
origin_server_ts = parse_integer(request, "ts")
if origin_server_ts is not None:
event_dict["origin_server_ts"] = origin_server_ts
if requester.app_service:
origin_server_ts = parse_integer(request, "ts")
if origin_server_ts is not None:
event_dict["origin_server_ts"] = origin_server_ts
try:
(
event,
_,
) = await self.event_creation_handler.create_and_send_nonmember_event(
requester, event_dict, txn_id=txn_id
)
event_id = event.event_id
except ShadowBanError:
event_id = "$" + random_string(43)
try:
(
event,
_,
) = await self.event_creation_handler.create_and_send_nonmember_event(
requester, event_dict, txn_id=txn_id
)
event_id = event.event_id
except ShadowBanError:
event_id = "$" + random_string(43)
set_tag("event_id", event_id)
return 200, {"event_id": event_id}
set_tag("event_id", event_id)
return 200, {"event_id": event_id}
def on_GET(
self, request: SynapseRequest, room_id: str, event_type: str, txn_id: str