mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Compare commits
2 Commits
v1.35.1
...
function_t
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
75bf48b905 | ||
|
|
d1ae594ae5 |
213
scripts/graph_tracer.py
Normal file
213
scripts/graph_tracer.py
Normal file
@@ -0,0 +1,213 @@
|
||||
import fileinput
|
||||
import pydot
|
||||
import sys
|
||||
import itertools
|
||||
import json
|
||||
|
||||
|
||||
def pairwise(iterable):
|
||||
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
|
||||
a, b = itertools.tee(iterable)
|
||||
next(b, None)
|
||||
return itertools.izip(a, b)
|
||||
|
||||
|
||||
nodes = {}
|
||||
edges = set()
|
||||
|
||||
graph = pydot.Dot(graph_name="call_graph", graph_type="digraph")
|
||||
|
||||
names = {}
|
||||
starts = {}
|
||||
ends = {}
|
||||
deferreds = set()
|
||||
deferreds_map = {}
|
||||
deferred_edges = set()
|
||||
|
||||
root_id = None
|
||||
|
||||
for line in fileinput.input():
|
||||
line = line.strip()
|
||||
try:
|
||||
if " calls " in line:
|
||||
start, end = line.split(" calls ")
|
||||
start, end = start.strip(), end.strip()
|
||||
edges.add((start, end))
|
||||
# print start, end
|
||||
if " named " in line:
|
||||
node_id, name = line.split(" named ")
|
||||
names[node_id.strip()] = name.strip()
|
||||
|
||||
if name.strip() == "synapse.rest.client.v1.room.RoomSendEventRestServlet.on_PUT":
|
||||
root_id = node_id
|
||||
if " in " in line:
|
||||
node_id, d = line.split(" in ")
|
||||
deferreds_map[node_id.strip()] = d.strip()
|
||||
if " is deferred" in line:
|
||||
node_id, _ = line.split(" is deferred")
|
||||
deferreds.add(node_id)
|
||||
if " start " in line:
|
||||
node_id, ms = line.split(" start ")
|
||||
starts[node_id.strip()] = int(ms.strip())
|
||||
if " end " in line:
|
||||
node_id, ms = line.split(" end ")
|
||||
ends[node_id.strip()] = int(ms.strip())
|
||||
if " waits on " in line:
|
||||
start, end = line.split(" waits on ")
|
||||
start, end = start.strip(), end.strip()
|
||||
deferred_edges.add((start, end))
|
||||
# print start, end
|
||||
except Exception as e:
|
||||
sys.stderr.write("failed %s to parse '%s'\n" % (e.message, line))
|
||||
|
||||
if not root_id:
|
||||
sys.stderr.write("Could not find root")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
# deferreds_root = set(deferreds.values())
|
||||
# for parent, child in deferred_edges:
|
||||
# deferreds_root.discard(child)
|
||||
#
|
||||
# deferred_tree = {
|
||||
# d: {}
|
||||
# for d in deferreds_root
|
||||
# }
|
||||
#
|
||||
# def populate(root, tree):
|
||||
# for leaf in deferred_edges.get(root, []):
|
||||
# populate(leaf, tree.setdefault(leaf, {}))
|
||||
#
|
||||
#
|
||||
# for d in deferreds_root:
|
||||
# tree = deferred_tree.setdefault(d, {})
|
||||
# populate(d, tree)
|
||||
|
||||
# print deferred_edges
|
||||
# print root_id
|
||||
|
||||
def is_in_deferred(d):
|
||||
while True:
|
||||
if d == root_id:
|
||||
return True
|
||||
|
||||
for start, end in deferred_edges:
|
||||
if d == end:
|
||||
d = start
|
||||
break
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def walk_graph(d):
|
||||
res = [d]
|
||||
while d != root_id:
|
||||
for start, end in edges:
|
||||
if d == end:
|
||||
d = start
|
||||
res.append(d)
|
||||
break
|
||||
else:
|
||||
return res
|
||||
return res
|
||||
|
||||
|
||||
def make_tree_el(node_id):
|
||||
return {
|
||||
"id": node_id,
|
||||
"name": names[node_id],
|
||||
"children": [],
|
||||
"start": starts[node_id],
|
||||
"end": ends[node_id],
|
||||
"size": ends[node_id] - starts[node_id],
|
||||
}
|
||||
|
||||
tree = make_tree_el(root_id)
|
||||
|
||||
tree_index = {
|
||||
root_id: tree,
|
||||
}
|
||||
|
||||
|
||||
viz_out = {
|
||||
"nodes": [],
|
||||
"edges": [],
|
||||
}
|
||||
|
||||
for node_id, name in names.items():
|
||||
# if times.get(node_id, 100) < 5:
|
||||
# continue
|
||||
|
||||
walk = walk_graph(node_id)
|
||||
# print walk
|
||||
if root_id not in walk:
|
||||
continue
|
||||
|
||||
if node_id in deferreds:
|
||||
if not is_in_deferred(node_id):
|
||||
continue
|
||||
elif node_id in deferreds_map:
|
||||
if not is_in_deferred(deferreds_map[node_id]):
|
||||
continue
|
||||
|
||||
walk_names = [
|
||||
names[w].split("synapse.", 1)[1] for w in walk
|
||||
]
|
||||
|
||||
for child, parent in reversed(list(pairwise(walk))):
|
||||
if parent in tree_index and child not in tree_index:
|
||||
el = make_tree_el(child)
|
||||
tree_index[parent]["children"].append(el)
|
||||
tree_index[child] = el
|
||||
|
||||
# print "-".join(reversed(["end"] + walk_names)) + ", " + str(ends[node_id] - starts[node_id])
|
||||
# print "%d,%s,%s,%s" % (len(walk), walk_names[0], starts[node_id], ends[node_id])
|
||||
|
||||
viz_out["nodes"].append({
|
||||
"id": node_id,
|
||||
"label": names[node_id].split("synapse.", 1)[1],
|
||||
"value": ends[node_id] - starts[node_id],
|
||||
"level": len(walk),
|
||||
})
|
||||
|
||||
node = pydot.Node(node_id, label=name)
|
||||
|
||||
# if node_id in deferreds:
|
||||
# clusters[deferreds[node_id]].add_node(node)
|
||||
# elif node_id in clusters:
|
||||
# clusters[node_id].add_node(node)
|
||||
# else:
|
||||
# graph.add_node(node)
|
||||
graph.add_node(node)
|
||||
nodes[node_id] = node
|
||||
|
||||
# print node_id
|
||||
|
||||
# for el in tree_index.values():
|
||||
# el["children"].sort(key=lambda e: e["start"])
|
||||
#
|
||||
# print json.dumps(tree)
|
||||
|
||||
for parent, child in edges:
|
||||
if child not in nodes:
|
||||
# sys.stderr.write(child + " not a node\n")
|
||||
continue
|
||||
|
||||
if parent not in nodes:
|
||||
# sys.stderr.write(parent + " not a node\n")
|
||||
continue
|
||||
|
||||
viz_out["edges"].append({
|
||||
"from": parent,
|
||||
"to": child,
|
||||
"value": ends[child] - starts[child],
|
||||
})
|
||||
|
||||
edge = pydot.Edge(nodes[parent], nodes[child])
|
||||
graph.add_edge(edge)
|
||||
|
||||
print json.dumps(viz_out)
|
||||
|
||||
file_prefix = "call_graph_out"
|
||||
graph.write('%s.dot' % file_prefix, format='raw', prog='dot')
|
||||
graph.write_svg("%s.svg" % file_prefix, prog='dot')
|
||||
@@ -51,6 +51,8 @@ from synapse.rest.client.v2_alpha import ClientV2AlphaRestResource
|
||||
from daemonize import Daemonize
|
||||
import twisted.manhole.telnet
|
||||
|
||||
from synapse.util.traceutil import Tracer
|
||||
|
||||
import synapse
|
||||
|
||||
import logging
|
||||
@@ -61,6 +63,7 @@ import subprocess
|
||||
import sqlite3
|
||||
import syweb
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -399,8 +402,13 @@ class SynapseService(service.Service):
|
||||
|
||||
|
||||
def run(hs):
|
||||
|
||||
def in_thread():
|
||||
try:
|
||||
tracer = Tracer()
|
||||
sys.settrace(tracer.process)
|
||||
except Exception:
|
||||
logger.exception("Failed to start tracer")
|
||||
|
||||
with LoggingContext("run"):
|
||||
change_resource_limit(hs.config.soft_file_limit)
|
||||
|
||||
|
||||
@@ -73,7 +73,6 @@ class FederationHandler(BaseHandler):
|
||||
# When joining a room we need to queue any events for that room up
|
||||
self.room_queues = {}
|
||||
|
||||
@log_function
|
||||
@defer.inlineCallbacks
|
||||
def handle_new_event(self, event, destinations):
|
||||
""" Takes in an event from the client to server side, that has already
|
||||
|
||||
@@ -165,6 +165,12 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
|
||||
defer.returnValue((200, {}))
|
||||
|
||||
|
||||
def trace(f):
|
||||
f.should_trace = True
|
||||
f.root_trace = True
|
||||
return f
|
||||
|
||||
|
||||
# TODO: Needs unit testing for generic events + feedback
|
||||
class RoomSendEventRestServlet(ClientV1RestServlet):
|
||||
|
||||
@@ -175,7 +181,11 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, request, room_id, event_type, txn_id=None):
|
||||
import inspect
|
||||
frame = inspect.currentframe()
|
||||
logger.info("Frame: %s", id(frame))
|
||||
user, client = yield self.auth.get_user_by_req(request)
|
||||
logger.info("Frame: %s", id(inspect.currentframe()))
|
||||
content = _parse_json(request)
|
||||
|
||||
msg_handler = self.handlers.message_handler
|
||||
@@ -189,12 +199,14 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
|
||||
client=client,
|
||||
txn_id=txn_id,
|
||||
)
|
||||
logger.info("Frame: %s", id(inspect.currentframe()))
|
||||
|
||||
defer.returnValue((200, {"event_id": event.event_id}))
|
||||
|
||||
def on_GET(self, request, room_id, event_type, txn_id):
|
||||
return (200, "Not implemented")
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def on_PUT(self, request, room_id, event_type, txn_id):
|
||||
try:
|
||||
|
||||
286
synapse/util/traceutil.py
Normal file
286
synapse/util/traceutil.py
Normal file
@@ -0,0 +1,286 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2015 OpenMarket Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import time
|
||||
|
||||
import inspect
|
||||
import logging
|
||||
|
||||
|
||||
logger = logging.getLogger("Tracer")
|
||||
|
||||
|
||||
class Tracer(object):
|
||||
|
||||
def __init__(self):
|
||||
self.interested_deferreds = set()
|
||||
|
||||
self.next_id = 1
|
||||
|
||||
self.deferred_frames = {}
|
||||
self.deferred_to_current_frames = {}
|
||||
|
||||
def process(self, frame, event, arg):
|
||||
if event == 'call':
|
||||
return self.process_call(frame)
|
||||
|
||||
def handle_inline_callbacks(self, frm):
|
||||
argvalues = inspect.getargvalues(frm)
|
||||
generator = argvalues.locals["g"]
|
||||
deferred = argvalues.locals["deferred"]
|
||||
|
||||
if not hasattr(deferred, "syn_trace_defer_id"):
|
||||
trace_id = self.get_next_id()
|
||||
deferred.syn_trace_defer_id = trace_id
|
||||
logger.info(
|
||||
"%s named %s",
|
||||
trace_id,
|
||||
self.get_name_for_frame(generator.gi_frame)
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"%s is deferred",
|
||||
trace_id,
|
||||
)
|
||||
|
||||
logger.info("%s start %d", trace_id, int(time.time() * 1000))
|
||||
|
||||
def do(res):
|
||||
logger.info("%s end %d", trace_id, int(time.time() * 1000))
|
||||
return res
|
||||
|
||||
deferred.addBoth(do)
|
||||
|
||||
back = frm.f_back
|
||||
while back:
|
||||
try:
|
||||
name = self.get_name_for_frame(back)
|
||||
if name == "twisted.internet.defer._inlineCallbacks":
|
||||
argvalues = inspect.getargvalues(back)
|
||||
deferred = argvalues.locals["deferred"]
|
||||
|
||||
d_id = getattr(deferred, "syn_trace_defer_id", None)
|
||||
if d_id:
|
||||
logger.info("%s in %s", trace_id, d_id)
|
||||
curr_stack = self.deferred_to_current_frames.setdefault(
|
||||
d_id, []
|
||||
)
|
||||
|
||||
if curr_stack:
|
||||
logger.info("%s calls %s", curr_stack[-1], trace_id)
|
||||
else:
|
||||
logger.info("%s calls %s", d_id, trace_id)
|
||||
break
|
||||
|
||||
except:
|
||||
pass
|
||||
|
||||
back = back.f_back
|
||||
|
||||
def are_interested(self, name):
|
||||
if not name.startswith("synapse"):
|
||||
return False
|
||||
if name.startswith("synapse.util.logcontext"):
|
||||
return False
|
||||
if name.startswith("synapse.util.logutils"):
|
||||
return False
|
||||
if name.startswith("synapse.util.traceutil"):
|
||||
return False
|
||||
if name.startswith("synapse.events.FrozenEvent.get"):
|
||||
return False
|
||||
if name.startswith("synapse.events.EventBuilder.get"):
|
||||
return False
|
||||
if name.startswith("synapse.types"):
|
||||
return False
|
||||
if name.startswith("synapse.util.frozenutils.freeze"):
|
||||
return False
|
||||
if name.startswith("synapse.util.frozenutils.<dictcomp>"):
|
||||
return False
|
||||
if name.startswith("synapse.util.Clock"):
|
||||
return False
|
||||
|
||||
if name.endswith("__repr__") or name.endswith("__str__"):
|
||||
return False
|
||||
if name.endswith("<genexpr>"):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def process_call(self, frame):
|
||||
should_trace = False
|
||||
|
||||
try:
|
||||
name = self.get_name_for_frame(frame)
|
||||
if name == "twisted.internet.defer._inlineCallbacks":
|
||||
self.handle_inline_callbacks(frame)
|
||||
return
|
||||
|
||||
if not self.are_interested(name):
|
||||
return
|
||||
|
||||
back_name = self.get_name_for_frame(frame.f_back)
|
||||
|
||||
if name == "synapse.api.auth.Auth.get_user_by_req":
|
||||
logger.info(
|
||||
"synapse.api.auth.Auth.get_user_by_req %s",
|
||||
back_name
|
||||
)
|
||||
|
||||
try:
|
||||
if back_name == "twisted.internet.defer._inlineCallbacks":
|
||||
def ret(f, event, result):
|
||||
if event != "return":
|
||||
return
|
||||
|
||||
argvalues = inspect.getargvalues(frame.f_back)
|
||||
deferred = argvalues.locals["deferred"]
|
||||
|
||||
try:
|
||||
logger.info(
|
||||
"%s waits on %s",
|
||||
deferred.syn_trace_defer_id,
|
||||
result.syn_trace_defer_id
|
||||
)
|
||||
except:
|
||||
pass
|
||||
return ret
|
||||
if back_name == "twisted.internet.defer.unwindGenerator":
|
||||
return
|
||||
except:
|
||||
pass
|
||||
|
||||
try:
|
||||
func = getattr(frame.f_locals["self"], frame.f_code.co_name)
|
||||
if inspect.isgeneratorfunction(func):
|
||||
return
|
||||
except:
|
||||
pass
|
||||
|
||||
try:
|
||||
func = frame.f_globals[frame.f_code.co_name]
|
||||
if inspect.isgeneratorfunction(func):
|
||||
return
|
||||
except:
|
||||
pass
|
||||
except:
|
||||
return
|
||||
|
||||
back = frame
|
||||
names = []
|
||||
|
||||
seen_deferreds = []
|
||||
bottom_deferred = None
|
||||
while back:
|
||||
try:
|
||||
name = self.get_name_for_frame(back)
|
||||
if name.startswith("synapse"):
|
||||
names.append(name)
|
||||
|
||||
# if name.startswith("twisted.internet.defer"):
|
||||
# logger.info("Name: %s", name)
|
||||
|
||||
if name == "twisted.internet.defer._inlineCallbacks":
|
||||
argvalues = inspect.getargvalues(back)
|
||||
deferred = argvalues.locals["deferred"]
|
||||
|
||||
d_id = getattr(deferred, "syn_trace_defer_id", None)
|
||||
if d_id:
|
||||
seen_deferreds.append(d_id)
|
||||
if not bottom_deferred:
|
||||
bottom_deferred = deferred
|
||||
if d_id in self.interested_deferreds:
|
||||
should_trace = True
|
||||
break
|
||||
|
||||
func = getattr(back.f_locals["self"], back.f_code.co_name)
|
||||
|
||||
if hasattr(func, "should_trace") or hasattr(func.im_func, "should_trace"):
|
||||
should_trace = True
|
||||
break
|
||||
|
||||
func.root_trace
|
||||
should_trace = True
|
||||
|
||||
break
|
||||
except:
|
||||
pass
|
||||
|
||||
back = back.f_back
|
||||
|
||||
if not should_trace:
|
||||
return
|
||||
|
||||
frame_id = self.get_next_id()
|
||||
name = self.get_name_for_frame(frame)
|
||||
logger.info("%s named %s", frame_id, name)
|
||||
|
||||
self.interested_deferreds.update(seen_deferreds)
|
||||
|
||||
names.reverse()
|
||||
|
||||
if bottom_deferred:
|
||||
self.deferred_frames.setdefault(
|
||||
bottom_deferred.syn_trace_defer_id, []
|
||||
).append(names)
|
||||
|
||||
logger.info("%s in %s", frame_id, bottom_deferred.syn_trace_defer_id)
|
||||
|
||||
if not hasattr(bottom_deferred, "syn_trace_registered_cb"):
|
||||
bottom_deferred.syn_trace_registered_cb = True
|
||||
|
||||
def do(res):
|
||||
return res
|
||||
|
||||
bottom_deferred.addBoth(do)
|
||||
|
||||
curr_stack = self.deferred_to_current_frames.setdefault(
|
||||
bottom_deferred.syn_trace_defer_id, []
|
||||
)
|
||||
|
||||
if curr_stack:
|
||||
logger.info("%s calls %s", curr_stack[-1], frame_id)
|
||||
else:
|
||||
logger.info("%s calls %s", bottom_deferred.syn_trace_defer_id, frame_id)
|
||||
|
||||
curr_stack.append(frame_id)
|
||||
|
||||
logger.info("%s start %d", frame_id, int(time.time() * 1000))
|
||||
|
||||
def p(frame, event, arg):
|
||||
if event == "return":
|
||||
curr_stack.pop()
|
||||
|
||||
logger.info("%s end %d", frame_id, int(time.time() * 1000))
|
||||
|
||||
return p
|
||||
|
||||
def get_name_for_frame(self, frame):
|
||||
module_name = frame.f_globals["__name__"]
|
||||
cls_instance = frame.f_locals.get("self", None)
|
||||
if cls_instance:
|
||||
cls_name = cls_instance.__class__.__name__
|
||||
name = "%s.%s.%s" % (
|
||||
module_name, cls_name, frame.f_code.co_name
|
||||
)
|
||||
else:
|
||||
name = "%s.%s" % (
|
||||
module_name, frame.f_code.co_name
|
||||
)
|
||||
return name
|
||||
|
||||
def get_next_id(self):
|
||||
i = self.next_id
|
||||
self.next_id += 1
|
||||
return i
|
||||
Reference in New Issue
Block a user