mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Compare commits
37 Commits
dmr/storag
...
erikj/timi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dbd788c262 | ||
|
|
6882231f78 | ||
|
|
0e2ed5445b | ||
|
|
579d0ef941 | ||
|
|
df6cf1600a | ||
|
|
d88cf9ed73 | ||
|
|
d6a9d4e562 | ||
|
|
b997af9ea8 | ||
|
|
e30d39cd4b | ||
|
|
9e7fc33d93 | ||
|
|
6de4d824e3 | ||
|
|
98fdc155c8 | ||
|
|
105f17ff4a | ||
|
|
266eb3bf26 | ||
|
|
9e54d865e6 | ||
|
|
de580a94b5 | ||
|
|
025458363f | ||
|
|
40c5fffba1 | ||
|
|
52e6bdecb4 | ||
|
|
f4407484aa | ||
|
|
9cd40a2f8d | ||
|
|
6e5fb82bc3 | ||
|
|
aa54bbe250 | ||
|
|
1b4fc178aa | ||
|
|
0417a8607b | ||
|
|
143ff10212 | ||
|
|
7b4f8c527c | ||
|
|
219923eaad | ||
|
|
fd3108fda6 | ||
|
|
09970a70e1 | ||
|
|
892a15190a | ||
|
|
80c4c93db3 | ||
|
|
e77498c8aa | ||
|
|
15f41e6084 | ||
|
|
05f2c86891 | ||
|
|
d375e0038e | ||
|
|
49fda0f1de |
@@ -29,6 +29,7 @@ from synapse.util import unwrapFirstError
|
||||
from synapse.util.async import concurrently_execute, run_on_reactor
|
||||
from synapse.util.caches.snapshot_cache import SnapshotCache
|
||||
from synapse.util.logcontext import preserve_fn
|
||||
from synapse.util.logutils import log_duration
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
from ._base import BaseHandler
|
||||
@@ -734,31 +735,33 @@ class MessageHandler(BaseHandler):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _create_new_client_event(self, builder, prev_event_ids=None):
|
||||
if prev_event_ids:
|
||||
prev_events = yield self.store.add_event_hashes(prev_event_ids)
|
||||
prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
|
||||
depth = prev_max_depth + 1
|
||||
else:
|
||||
latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
|
||||
builder.room_id,
|
||||
)
|
||||
|
||||
if latest_ret:
|
||||
depth = max([d for _, _, d in latest_ret]) + 1
|
||||
with log_duration("prev_events"):
|
||||
if prev_event_ids:
|
||||
prev_events = yield self.store.add_event_hashes(prev_event_ids)
|
||||
prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
|
||||
depth = prev_max_depth + 1
|
||||
else:
|
||||
depth = 1
|
||||
latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
|
||||
builder.room_id,
|
||||
)
|
||||
|
||||
prev_events = [
|
||||
(event_id, prev_hashes)
|
||||
for event_id, prev_hashes, _ in latest_ret
|
||||
]
|
||||
if latest_ret:
|
||||
depth = max([d for _, _, d in latest_ret]) + 1
|
||||
else:
|
||||
depth = 1
|
||||
|
||||
prev_events = [
|
||||
(event_id, prev_hashes)
|
||||
for event_id, prev_hashes, _ in latest_ret
|
||||
]
|
||||
|
||||
builder.prev_events = prev_events
|
||||
builder.depth = depth
|
||||
|
||||
state_handler = self.state_handler
|
||||
|
||||
context = yield state_handler.compute_event_context(builder)
|
||||
with log_duration("context"):
|
||||
context = yield state_handler.compute_event_context(builder)
|
||||
|
||||
if builder.is_state():
|
||||
builder.prev_state = yield self.store.add_event_hashes(
|
||||
@@ -882,14 +885,16 @@ class MessageHandler(BaseHandler):
|
||||
"Changing the room create event is forbidden",
|
||||
)
|
||||
|
||||
action_generator = ActionGenerator(self.hs)
|
||||
yield action_generator.handle_push_actions_for_event(
|
||||
event, context
|
||||
)
|
||||
with log_duration("action_generator"):
|
||||
action_generator = ActionGenerator(self.hs)
|
||||
yield action_generator.handle_push_actions_for_event(
|
||||
event, context
|
||||
)
|
||||
|
||||
(event_stream_id, max_stream_id) = yield self.store.persist_event(
|
||||
event, context=context
|
||||
)
|
||||
with log_duration("persist_event"):
|
||||
(event_stream_id, max_stream_id) = yield self.store.persist_event(
|
||||
event, context=context
|
||||
)
|
||||
|
||||
# this intentionally does not yield: we don't care about the result
|
||||
# and don't need to wait for it.
|
||||
@@ -916,7 +921,8 @@ class MessageHandler(BaseHandler):
|
||||
extra_users=extra_users
|
||||
)
|
||||
|
||||
preserve_fn(_notify)()
|
||||
with log_duration("on_new_room_event"):
|
||||
preserve_fn(_notify)()
|
||||
|
||||
# If invite, remove room_state from unsigned before sending.
|
||||
event.unsigned.pop("invite_room_state", None)
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
|
||||
|
||||
from itertools import chain
|
||||
from collections import Counter
|
||||
|
||||
|
||||
# TODO(paul): I can't believe Python doesn't have one of these
|
||||
@@ -52,30 +53,29 @@ class CounterMetric(BaseMetric):
|
||||
"""The simplest kind of metric; one that stores a monotonically-increasing
|
||||
integer that counts events."""
|
||||
|
||||
__slots__ = ("counts")
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(CounterMetric, self).__init__(*args, **kwargs)
|
||||
|
||||
self.counts = {}
|
||||
self.counts = Counter()
|
||||
|
||||
# Scalar metrics are never empty
|
||||
if self.is_scalar():
|
||||
self.counts[()] = 0
|
||||
|
||||
def inc_by(self, incr, *values):
|
||||
if len(values) != self.dimension():
|
||||
raise ValueError(
|
||||
"Expected as many values to inc() as labels (%d)" % (self.dimension())
|
||||
)
|
||||
# if len(values) != self.dimension():
|
||||
# raise ValueError(
|
||||
# "Expected as many values to inc() as labels (%d)" % (self.dimension())
|
||||
# )
|
||||
|
||||
# TODO: should assert that the tag values are all strings
|
||||
|
||||
if values not in self.counts:
|
||||
self.counts[values] = incr
|
||||
else:
|
||||
self.counts[values] += incr
|
||||
self.counts[values] += incr
|
||||
|
||||
def inc(self, *values):
|
||||
self.inc_by(1, *values)
|
||||
self.counts[values] += 1
|
||||
|
||||
def render_item(self, k):
|
||||
return ["%s%s %d" % (self.name, self._render_key(k), self.counts[k])]
|
||||
|
||||
@@ -17,6 +17,7 @@ from twisted.internet import defer
|
||||
|
||||
from .bulk_push_rule_evaluator import evaluator_for_event
|
||||
|
||||
from synapse.util.logutils import log_duration
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
import logging
|
||||
@@ -39,13 +40,15 @@ class ActionGenerator:
|
||||
@defer.inlineCallbacks
|
||||
def handle_push_actions_for_event(self, event, context):
|
||||
with Measure(self.clock, "handle_push_actions_for_event"):
|
||||
bulk_evaluator = yield evaluator_for_event(
|
||||
event, self.hs, self.store, context.current_state
|
||||
)
|
||||
with log_duration("evaluator_for_event"):
|
||||
bulk_evaluator = yield evaluator_for_event(
|
||||
event, self.hs, self.store, context.current_state
|
||||
)
|
||||
|
||||
actions_by_user = yield bulk_evaluator.action_for_event_by_user(
|
||||
event, context.current_state
|
||||
)
|
||||
with log_duration("action_for_event_by_user"):
|
||||
actions_by_user = yield bulk_evaluator.action_for_event_by_user(
|
||||
event, context.current_state
|
||||
)
|
||||
|
||||
context.push_actions = [
|
||||
(uid, actions) for uid, actions in actions_by_user.items()
|
||||
|
||||
@@ -22,6 +22,7 @@ from .push_rule_evaluator import PushRuleEvaluatorForEvent
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.visibility import filter_events_for_clients
|
||||
from synapse.util.logutils import log_duration
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -36,7 +37,8 @@ def decode_rule_json(rule):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_rules(room_id, user_ids, store):
|
||||
rules_by_user = yield store.bulk_get_push_rules(user_ids)
|
||||
with log_duration("bulk_get_push_rules"):
|
||||
rules_by_user = yield store.bulk_get_push_rules(user_ids)
|
||||
|
||||
rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None}
|
||||
|
||||
@@ -46,25 +48,28 @@ def _get_rules(room_id, user_ids, store):
|
||||
@defer.inlineCallbacks
|
||||
def evaluator_for_event(event, hs, store, current_state):
|
||||
room_id = event.room_id
|
||||
|
||||
# We also will want to generate notifs for other people in the room so
|
||||
# their unread countss are correct in the event stream, but to avoid
|
||||
# generating them for bot / AS users etc, we only do so for people who've
|
||||
# sent a read receipt into the room.
|
||||
|
||||
local_users_in_room = set(
|
||||
e.state_key for e in current_state.values()
|
||||
if e.type == EventTypes.Member and e.membership == Membership.JOIN
|
||||
and hs.is_mine_id(e.state_key)
|
||||
)
|
||||
with log_duration("get_users_in_room"):
|
||||
local_users_in_room = set(
|
||||
e.state_key for e in current_state.values()
|
||||
if e.type == EventTypes.Member and e.membership == Membership.JOIN
|
||||
and hs.is_mine_id(e.state_key)
|
||||
)
|
||||
|
||||
# users in the room who have pushers need to get push rules run because
|
||||
# that's how their pushers work
|
||||
if_users_with_pushers = yield store.get_if_users_have_pushers(
|
||||
local_users_in_room
|
||||
)
|
||||
user_ids = set(
|
||||
uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
|
||||
)
|
||||
with log_duration("get_users_with_pushers_in_room"):
|
||||
if_users_with_pushers = yield store.get_if_users_have_pushers(
|
||||
local_users_in_room
|
||||
)
|
||||
user_ids = set(
|
||||
uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
|
||||
)
|
||||
|
||||
users_with_receipts = yield store.get_users_with_read_receipts_in_room(room_id)
|
||||
|
||||
|
||||
@@ -231,6 +231,7 @@ class StateHandler(object):
|
||||
)
|
||||
|
||||
logger.info("Resolving state for %s with %d groups", room_id, len(state_groups))
|
||||
logger.info("State groups for %s with %r", room_id, group_names)
|
||||
|
||||
new_state, prev_states = self._resolve_events(
|
||||
state_groups.values(), event_type, state_key
|
||||
|
||||
@@ -94,6 +94,9 @@ class Cache(object):
|
||||
else:
|
||||
return default
|
||||
|
||||
def has(self, key):
|
||||
return key in self.cache
|
||||
|
||||
def update(self, sequence, key, value):
|
||||
self.check_thread()
|
||||
if self.sequence == sequence:
|
||||
@@ -134,6 +137,12 @@ class Cache(object):
|
||||
self.sequence += 1
|
||||
self.cache.clear()
|
||||
|
||||
def __contains__(self, key):
|
||||
return self.has(key)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.get(key)
|
||||
|
||||
|
||||
class CacheDescriptor(object):
|
||||
""" A method decorator that applies a memoizing cache around the function.
|
||||
|
||||
@@ -21,10 +21,23 @@ import logging
|
||||
import inspect
|
||||
import time
|
||||
|
||||
from contextlib import contextmanager
|
||||
|
||||
|
||||
_TIME_FUNC_ID = 0
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def log_duration(name):
|
||||
start = time.time() * 1000
|
||||
yield
|
||||
end = time.time() * 1000
|
||||
logger.info("Timings: %s tooke %dms", name, int(end - start))
|
||||
|
||||
|
||||
def _log_debug_as_f(f, msg, msg_args):
|
||||
name = f.__module__
|
||||
logger = logging.getLogger(name)
|
||||
|
||||
Reference in New Issue
Block a user