Compare commits

...

5 Commits

Author SHA1 Message Date
Erik Johnston
9fc4231ad8 Don't copy scopes from parent span 2020-07-14 10:41:16 +01:00
Erik Johnston
b4334ff71a Handle empty suffixes in nested_logging_context 2020-07-14 10:07:57 +01:00
Erik Johnston
55b359bd28 Join persist events bg job with requests 2020-07-11 10:07:15 +01:00
Erik Johnston
7e6da9f5d1 Add a span to background processes 2020-07-11 10:07:15 +01:00
Erik Johnston
b71ef075f1 Add a span to Measure blocks 2020-07-11 10:07:15 +01:00
6 changed files with 85 additions and 13 deletions

View File

@@ -367,15 +367,11 @@ class LoggingContext(object):
# we track the current request
record.request = self.request
# we also track the current scope:
record.scope = self.scope
def copy_to_twisted_log_entry(self, record) -> None:
"""
Copy logging fields from this context to a Twisted log record.
"""
record["request"] = self.request
record["scope"] = self.scope
def start(self, rusage: "Optional[resource._RUsage]") -> None:
"""
@@ -659,9 +655,10 @@ def nested_logging_context(
context = parent_context # type: LoggingContextOrSentinel
else:
context = current_context()
return LoggingContext(
parent_context=context, request=str(context.request) + "-" + suffix
)
name = str(context.request)
if suffix:
name = name + "-" + suffix
return LoggingContext(parent_context=context, request=name)
def preserve_fn(f):

View File

@@ -526,6 +526,19 @@ def start_active_span_from_edu(
return scope
def get_active_span_context():
"""Gets the active span's context, if any.
"""
if not opentracing:
return None
active_span = opentracing.tracer.active_span
if not active_span:
return None
return active_span.context
# Opentracing setters for tags, logs, etc

View File

@@ -50,7 +50,12 @@ class LogContextScopeManager(ScopeManager):
available.
"""
ctx = current_context()
return ctx.scope
while ctx:
if ctx.scope:
return ctx.scope
ctx = ctx.parent_context
return None
def activate(self, span, finish_on_close):
"""

View File

@@ -25,6 +25,10 @@ from twisted.internet import defer
from twisted.python.failure import Failure
from synapse.logging.context import LoggingContext, PreserveLoggingContext
from synapse.logging.opentracing import (
get_active_span_context,
start_active_span_follows_from,
)
if TYPE_CHECKING:
import resource
@@ -188,6 +192,8 @@ def run_as_background_process(desc, func, *args, **kwargs):
follow the synapse logcontext rules.
"""
previous_span_context = get_active_span_context()
@defer.inlineCallbacks
def run():
with _bg_metrics_lock:
@@ -197,7 +203,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
_background_process_start_count.labels(desc).inc()
_background_process_in_flight_count.labels(desc).inc()
with BackgroundProcessLoggingContext(desc) as context:
with BackgroundProcessLoggingContext(desc, previous_span_context) as context:
context.request = "%s-%i" % (desc, count)
try:
@@ -250,12 +256,14 @@ class BackgroundProcessLoggingContext(LoggingContext):
processes.
"""
__slots__ = ["_proc"]
__slots__ = ["_proc", "_span", "_previous_spans"]
def __init__(self, name: str):
def __init__(self, name: str, previous_span_context):
super().__init__(name)
self._proc = _BackgroundProcess(name, self)
self._span = None
self._previous_spans = [previous_span_context] if previous_span_context else []
def start(self, rusage: "Optional[resource._RUsage]"):
"""Log context has started running (again).
@@ -269,10 +277,20 @@ class BackgroundProcessLoggingContext(LoggingContext):
with _bg_metrics_lock:
_background_processes_active_since_last_scrape.add(self._proc)
def __enter__(self) -> LoggingContext:
context = super().__enter__()
self._span = start_active_span_follows_from(self.name, self._previous_spans)
self._span.__enter__()
return context
def __exit__(self, type, value, traceback) -> None:
"""Log context has finished.
"""
self._span.__exit__(type, value, traceback)
super().__exit__(type, value, traceback)
# The background process has finished. We explictly remove and manually

View File

@@ -28,6 +28,12 @@ from synapse.api.constants import EventTypes, Membership
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.logging.opentracing import (
get_active_span_context,
set_tag,
start_active_span_follows_from,
trace,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.state import StateResolutionStore
from synapse.storage.data_stores import DataStores
@@ -76,13 +82,16 @@ class _EventPeristenceQueue(object):
"""
_EventPersistQueueItem = namedtuple(
"_EventPersistQueueItem", ("events_and_contexts", "backfilled", "deferred")
"_EventPersistQueueItem",
("id", "events_and_contexts", "backfilled", "deferred", "contexts"),
)
def __init__(self):
self._event_persist_queues = {}
self._currently_persisting_rooms = set()
self._entry_seq = 1
@trace(opname="persist_events_queued")
def add_to_queue(self, room_id, events_and_contexts, backfilled):
"""Add events to the queue, with the given persist_event options.
@@ -99,25 +108,40 @@ class _EventPeristenceQueue(object):
defer.Deferred: a deferred which will resolve once the events are
persisted. Runs its callbacks *without* a logcontext.
"""
set_tag("room_id", room_id)
queue = self._event_persist_queues.setdefault(room_id, deque())
if queue:
# if the last item in the queue has the same `backfilled` setting,
# we can just add these new events to that item.
end_item = queue[-1]
if end_item.backfilled == backfilled:
set_tag("queue_entry_id", end_item.id)
end_item.events_and_contexts.extend(events_and_contexts)
end_item.contexts.append(get_active_span_context())
return end_item.deferred.observe()
deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True)
# Generate a unique ID to track this queue entry. This is mainly used to
# tag the opentracing spans so that we can correlate between this
# current span and the span started to handle this queued item
# (annoyingly it seems hard in jaeger to see all spans that follow from
# a span).
queue_entry_id = "%s-%s" % (id(self), self._entry_seq)
self._entry_seq += 1
queue.append(
self._EventPersistQueueItem(
id=queue_entry_id,
events_and_contexts=events_and_contexts,
backfilled=backfilled,
deferred=deferred,
contexts=[get_active_span_context()],
)
)
set_tag("queue_entry_id", queue_entry_id)
return deferred.observe()
def handle_queue(self, room_id, per_item_callback):
@@ -146,7 +170,12 @@ class _EventPeristenceQueue(object):
queue = self._get_drainining_queue(room_id)
for item in queue:
try:
ret = await per_item_callback(item)
with start_active_span_follows_from(
"persist_queue", item.contexts,
):
set_tag("room_id", room_id)
set_tag("queue_entry_id", item.id)
ret = await per_item_callback(item)
except Exception:
with PreserveLoggingContext():
item.deferred.errback()

View File

@@ -22,6 +22,7 @@ from prometheus_client import Counter
from twisted.internet import defer
from synapse.logging.context import LoggingContext, current_context
from synapse.logging.opentracing import start_active_span
from synapse.metrics import InFlightGauge
logger = logging.getLogger(__name__)
@@ -93,6 +94,7 @@ class Measure(object):
"name",
"_logging_context",
"start",
"_span",
]
def __init__(self, clock, name):
@@ -100,6 +102,7 @@ class Measure(object):
self.name = name
self._logging_context = None
self.start = None
self._span = None
def __enter__(self):
if self._logging_context:
@@ -111,12 +114,19 @@ class Measure(object):
"Measure[%s]" % (self.name,), parent_context
)
self._logging_context.__enter__()
self._span = start_active_span(self.name)
self._span.__enter__()
in_flight.register((self.name,), self._update_in_flight)
def __exit__(self, exc_type, exc_val, exc_tb):
if not self._logging_context:
raise RuntimeError("Measure() block exited without being entered")
if self._span:
self._span.__exit__(exc_type, exc_val, exc_tb)
duration = self.clock.time() - self.start
usage = self._logging_context.get_resource_usage()