Introduce Clock.call_when_running(...) to include logcontext by default (#18944)

Introduce `Clock.call_when_running(...)` to wrap startup code in a
logcontext, ensuring we can identify which server generated the logs.

Background:

>  Ideally, nothing from the Synapse homeserver would be logged against the `sentinel` 
>  logcontext as we want to know which server the logs came from. In practice, this is not 
>  always the case yet especially outside of request handling. 
>   
>  Global things outside of Synapse (e.g. Twisted reactor code) should run in the 
>  `sentinel` logcontext. It's only when it calls into application code that a logcontext 
>  gets activated. This means the reactor should be started in the `sentinel` logcontext, 
>  and any time an awaitable yields control back to the reactor, it should reset the 
>  logcontext to be the `sentinel` logcontext. This is important to avoid leaking the 
>  current logcontext to the reactor (which would then get picked up and associated with 
>  the next thing the reactor does). 
>
> *-- `docs/log_contexts.md`

Also adds a lint to prefer `Clock.call_when_running(...)` over
`reactor.callWhenRunning(...)`

Part of https://github.com/element-hq/synapse/issues/18905
This commit is contained in:
Eric Eastwood
2025-09-22 10:27:59 -05:00
committed by GitHub
parent 83aca3f097
commit 5a9ca1e3d9
292 changed files with 693 additions and 527 deletions

1
changelog.d/18944.misc Normal file
View File

@@ -0,0 +1 @@
Introduce `Clock.call_when_running(...)` to wrap startup code in a logcontext, ensuring we can identify which server generated the logs.

View File

@@ -68,6 +68,12 @@ PROMETHEUS_METRIC_MISSING_FROM_LIST_TO_CHECK = ErrorCode(
category="per-homeserver-tenant-metrics",
)
PREFER_SYNAPSE_CLOCK_CALL_WHEN_RUNNING = ErrorCode(
"prefer-synapse-clock-call-when-running",
"`synapse.util.Clock.call_when_running` should be used instead of `reactor.callWhenRunning`",
category="synapse-reactor-clock",
)
class Sentinel(enum.Enum):
# defining a sentinel in this way allows mypy to correctly handle the
@@ -229,9 +235,43 @@ class SynapsePlugin(Plugin):
):
return check_is_cacheable_wrapper
if fullname in (
"twisted.internet.interfaces.IReactorCore.callWhenRunning",
"synapse.types.ISynapseThreadlessReactor.callWhenRunning",
"synapse.types.ISynapseReactor.callWhenRunning",
):
return check_call_when_running
return None
def check_call_when_running(ctx: MethodSigContext) -> CallableType:
"""
Ensure that the `reactor.callWhenRunning` callsites aren't used.
`synapse.util.Clock.call_when_running` should always be used instead of
`reactor.callWhenRunning`.
Since `reactor.callWhenRunning` is a reactor callback, the callback will start out
with the sentinel logcontext. `synapse.util.Clock` starts a default logcontext as we
want to know which server the logs came from.
Args:
ctx: The `FunctionSigContext` from mypy.
"""
signature: CallableType = ctx.default_signature
ctx.api.fail(
(
"Expected all `reactor.callWhenRunning` calls to use `synapse.util.Clock.call_when_running` instead. "
"This is so all Synapse code runs with a logcontext as we want to know which server the logs came from."
),
ctx.context,
code=PREFER_SYNAPSE_CLOCK_CALL_WHEN_RUNNING,
)
return signature
def analyze_prometheus_metric_classes(ctx: ClassDefContext) -> None:
"""
Cross-check the list of Prometheus metric classes against the

View File

@@ -30,7 +30,7 @@ from signedjson.sign import sign_json
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.util import json_encoder
from synapse.util.json import json_encoder
def main() -> None:

View File

@@ -54,11 +54,11 @@ from twisted.internet import defer, reactor as reactor_
from synapse.config.database import DatabaseConnectionConfig
from synapse.config.homeserver import HomeServerConfig
from synapse.logging.context import (
LoggingContext,
make_deferred_yieldable,
run_in_background,
)
from synapse.notifier import ReplicationNotifier
from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.storage.database import DatabasePool, LoggingTransaction, make_conn
from synapse.storage.databases.main import FilteringWorkerStore
from synapse.storage.databases.main.account_data import AccountDataWorkerStore
@@ -98,8 +98,7 @@ from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStor
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
from synapse.types import ISynapseReactor
from synapse.util import SYNAPSE_VERSION, Clock
from synapse.util.stringutils import random_string
from synapse.util import SYNAPSE_VERSION
# Cast safety: Twisted does some naughty magic which replaces the
# twisted.internet.reactor module with a Reactor instance at runtime.
@@ -318,31 +317,16 @@ class Store(
)
class MockHomeserver:
class MockHomeserver(HomeServer):
DATASTORE_CLASS = DataStore
def __init__(self, config: HomeServerConfig):
self.clock = Clock(reactor)
self.config = config
self.hostname = config.server.server_name
self.version_string = SYNAPSE_VERSION
self.instance_id = random_string(5)
def get_clock(self) -> Clock:
return self.clock
def get_reactor(self) -> ISynapseReactor:
return reactor
def get_instance_id(self) -> str:
return self.instance_id
def get_instance_name(self) -> str:
return "master"
def should_send_federation(self) -> bool:
return False
def get_replication_notifier(self) -> ReplicationNotifier:
return ReplicationNotifier()
super().__init__(
hostname=config.server.server_name,
config=config,
reactor=reactor,
version_string=f"Synapse/{SYNAPSE_VERSION}",
)
class Porter:
@@ -351,12 +335,12 @@ class Porter:
sqlite_config: Dict[str, Any],
progress: "Progress",
batch_size: int,
hs_config: HomeServerConfig,
hs: HomeServer,
):
self.sqlite_config = sqlite_config
self.progress = progress
self.batch_size = batch_size
self.hs_config = hs_config
self.hs = hs
async def setup_table(self, table: str) -> Tuple[str, int, int, int, int]:
if table in APPEND_ONLY_TABLES:
@@ -676,8 +660,7 @@ class Porter:
engine = create_engine(db_config.config)
hs = MockHomeserver(self.hs_config)
server_name = hs.hostname
server_name = self.hs.hostname
with make_conn(
db_config=db_config,
@@ -688,16 +671,16 @@ class Porter:
engine.check_database(
db_conn, allow_outdated_version=allow_outdated_version
)
prepare_database(db_conn, engine, config=self.hs_config)
prepare_database(db_conn, engine, config=self.hs.config)
# Type safety: ignore that we're using Mock homeservers here.
store = Store(
DatabasePool(
hs, # type: ignore[arg-type]
self.hs,
db_config,
engine,
),
db_conn,
hs, # type: ignore[arg-type]
self.hs,
)
db_conn.commit()
@@ -795,7 +778,7 @@ class Porter:
return
self.postgres_store = self.build_db_store(
self.hs_config.database.get_single_database()
self.hs.config.database.get_single_database()
)
await self.remove_ignored_background_updates_from_database()
@@ -1584,6 +1567,8 @@ def main() -> None:
config = HomeServerConfig()
config.parse_config_dict(hs_config, "", "")
hs = MockHomeserver(config)
def start(stdscr: Optional["curses.window"] = None) -> None:
progress: Progress
if stdscr:
@@ -1595,15 +1580,14 @@ def main() -> None:
sqlite_config=sqlite_config,
progress=progress,
batch_size=args.batch_size,
hs_config=config,
hs=hs,
)
@defer.inlineCallbacks
def run() -> Generator["defer.Deferred[Any]", Any, None]:
with LoggingContext("synapse_port_db_run"):
yield defer.ensureDeferred(porter.run())
yield defer.ensureDeferred(porter.run())
reactor.callWhenRunning(run)
hs.get_clock().call_when_running(run)
reactor.run()

View File

@@ -74,7 +74,7 @@ def run_background_updates(hs: HomeServer) -> None:
)
)
reactor.callWhenRunning(run)
hs.get_clock().call_when_running(run)
reactor.run()

View File

@@ -43,9 +43,9 @@ from synapse.logging.opentracing import (
from synapse.metrics import SERVER_NAME_LABEL
from synapse.synapse_rust.http_client import HttpClient
from synapse.types import JsonDict, Requester, UserID, create_requester
from synapse.util import json_decoder
from synapse.util.caches.cached_call import RetryOnExceptionCachedCall
from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext
from synapse.util.json import json_decoder
from . import introspection_response_timer

View File

@@ -48,9 +48,9 @@ from synapse.logging.opentracing import (
from synapse.metrics import SERVER_NAME_LABEL
from synapse.synapse_rust.http_client import HttpClient
from synapse.types import Requester, UserID, create_requester
from synapse.util import json_decoder
from synapse.util.caches.cached_call import RetryOnExceptionCachedCall
from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext
from synapse.util.json import json_decoder
from . import introspection_response_timer

View File

@@ -30,7 +30,7 @@ from typing import Any, Dict, List, Optional, Union
from twisted.web import http
from synapse.util import json_decoder
from synapse.util.json import json_decoder
if typing.TYPE_CHECKING:
from synapse.config.homeserver import HomeServerConfig

View File

@@ -26,7 +26,7 @@ from synapse.api.errors import LimitExceededError
from synapse.config.ratelimiting import RatelimitSettings
from synapse.storage.databases.main import DataStore
from synapse.types import Requester
from synapse.util import Clock
from synapse.util.clock import Clock
if TYPE_CHECKING:
# To avoid circular imports:

View File

@@ -241,7 +241,7 @@ def redirect_stdio_to_logs() -> None:
def register_start(
cb: Callable[P, Awaitable], *args: P.args, **kwargs: P.kwargs
hs: "HomeServer", cb: Callable[P, Awaitable], *args: P.args, **kwargs: P.kwargs
) -> None:
"""Register a callback with the reactor, to be called once it is running
@@ -278,7 +278,8 @@ def register_start(
# on as normal.
os._exit(1)
reactor.callWhenRunning(lambda: defer.ensureDeferred(wrapper()))
clock = hs.get_clock()
clock.call_when_running(lambda: defer.ensureDeferred(wrapper()))
def listen_metrics(bind_addresses: StrCollection, port: int) -> None:

View File

@@ -356,11 +356,9 @@ def start(config_options: List[str]) -> None:
handle_startup_exception(e)
async def start() -> None:
# Re-establish log context now that we're back from the reactor
with LoggingContext("start"):
await _base.start(hs)
await _base.start(hs)
register_start(start)
register_start(hs, start)
# redirect stdio to the logs, if configured.
if not hs.config.logging.no_redirect_stdio:

View File

@@ -377,19 +377,17 @@ def setup(config_options: List[str]) -> SynapseHomeServer:
handle_startup_exception(e)
async def start() -> None:
# Re-establish log context now that we're back from the reactor
with LoggingContext("start"):
# Load the OIDC provider metadatas, if OIDC is enabled.
if hs.config.oidc.oidc_enabled:
oidc = hs.get_oidc_handler()
# Loading the provider metadata also ensures the provider config is valid.
await oidc.load_metadata()
# Load the OIDC provider metadatas, if OIDC is enabled.
if hs.config.oidc.oidc_enabled:
oidc = hs.get_oidc_handler()
# Loading the provider metadata also ensures the provider config is valid.
await oidc.load_metadata()
await _base.start(hs)
await _base.start(hs)
hs.get_datastores().main.db_pool.updates.start_doing_background_updates()
hs.get_datastores().main.db_pool.updates.start_doing_background_updates()
register_start(start)
register_start(hs, start)
return hs

View File

@@ -84,7 +84,7 @@ from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases.main import DataStore
from synapse.types import DeviceListUpdates, JsonMapping
from synapse.util import Clock
from synapse.util.clock import Clock
if TYPE_CHECKING:
from synapse.server import HomeServer

View File

@@ -38,7 +38,7 @@ from synapse.storage.databases.main import DataStore
from synapse.synapse_rust.events import EventInternalMetadata
from synapse.types import EventID, JsonDict, StrCollection
from synapse.types.state import StateFilter
from synapse.util import Clock
from synapse.util.clock import Clock
from synapse.util.stringutils import random_string
if TYPE_CHECKING:

View File

@@ -178,7 +178,7 @@ from synapse.types import (
StrCollection,
get_domain_from_id,
)
from synapse.util import Clock
from synapse.util.clock import Clock
from synapse.util.metrics import Measure
from synapse.util.retryutils import filter_destinations_by_retry_limiter

View File

@@ -36,7 +36,7 @@ from synapse.logging.opentracing import (
)
from synapse.metrics import SERVER_NAME_LABEL
from synapse.types import JsonDict
from synapse.util import json_decoder
from synapse.util.json import json_decoder
from synapse.util.metrics import measure_func
if TYPE_CHECKING:

View File

@@ -62,7 +62,7 @@ class DeactivateAccountHandler:
# Start the user parter loop so it can resume parting users from rooms where
# it left off (if it has work left to do).
if hs.config.worker.worker_app is None:
hs.get_reactor().callWhenRunning(self._start_user_parting)
hs.get_clock().call_when_running(self._start_user_parting)
else:
self._notify_account_deactivated_client = (
ReplicationNotifyAccountDeactivatedServlet.make_client(hs)

View File

@@ -1002,7 +1002,7 @@ class DeviceWriterHandler(DeviceHandler):
# rolling-restarting Synapse.
if self._is_main_device_list_writer:
# On start up check if there are any updates pending.
hs.get_reactor().callWhenRunning(self._handle_new_device_update_async)
hs.get_clock().call_when_running(self._handle_new_device_update_async)
self.device_list_updater = DeviceListUpdater(hs, self)
hs.get_federation_registry().register_edu_handler(
EduTypes.DEVICE_LIST_UPDATE,

View File

@@ -34,7 +34,7 @@ from synapse.logging.opentracing import (
set_tag,
)
from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
from synapse.util import json_encoder
from synapse.util.json import json_encoder
from synapse.util.stringutils import random_string
if TYPE_CHECKING:

View File

@@ -44,9 +44,9 @@ from synapse.types import (
get_domain_from_id,
get_verify_key_from_cross_signing_key,
)
from synapse.util import json_decoder
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.cancellation import cancellable
from synapse.util.json import json_decoder
from synapse.util.retryutils import (
NotRetryingDestination,
filter_destinations_by_retry_limiter,

View File

@@ -39,8 +39,8 @@ from synapse.http import RequestTimedOutError
from synapse.http.client import SimpleHttpClient
from synapse.http.site import SynapseRequest
from synapse.types import JsonDict, Requester
from synapse.util import json_decoder
from synapse.util.hash import sha256_and_url_safe_base64
from synapse.util.json import json_decoder
from synapse.util.stringutils import (
assert_valid_client_secret,
random_string,

View File

@@ -81,9 +81,10 @@ from synapse.types import (
create_requester,
)
from synapse.types.state import StateFilter
from synapse.util import json_decoder, json_encoder, log_failure, unwrapFirstError
from synapse.util import log_failure, unwrapFirstError
from synapse.util.async_helpers import Linearizer, gather_results
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.json import json_decoder, json_encoder
from synapse.util.metrics import measure_func
from synapse.visibility import get_effective_room_visibility_from_state

View File

@@ -67,8 +67,9 @@ from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable
from synapse.module_api import ModuleApi
from synapse.types import JsonDict, UserID, map_username_to_mxid_localpart
from synapse.util import Clock, json_decoder
from synapse.util.caches.cached_call import RetryOnExceptionCachedCall
from synapse.util.clock import Clock
from synapse.util.json import json_decoder
from synapse.util.macaroons import MacaroonGenerator, OidcSessionData
from synapse.util.templates import _localpart_from_email_filter

View File

@@ -27,7 +27,7 @@ from twisted.web.client import PartialDownloadError
from synapse.api.constants import LoginType
from synapse.api.errors import Codes, LoginError, SynapseError
from synapse.util import json_decoder
from synapse.util.json import json_decoder
if TYPE_CHECKING:
from synapse.server import HomeServer

View File

@@ -87,8 +87,8 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import set_tag, start_active_span, tags
from synapse.metrics import SERVER_NAME_LABEL
from synapse.types import ISynapseReactor, StrSequence
from synapse.util import json_decoder
from synapse.util.async_helpers import timeout_deferred
from synapse.util.json import json_decoder
if TYPE_CHECKING:
from synapse.server import HomeServer

View File

@@ -49,7 +49,7 @@ from synapse.http.federation.well_known_resolver import WellKnownResolver
from synapse.http.proxyagent import ProxyAgent
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.types import ISynapseReactor
from synapse.util import Clock
from synapse.util.clock import Clock
logger = logging.getLogger(__name__)

View File

@@ -27,7 +27,6 @@ from typing import Callable, Dict, Optional, Tuple
import attr
from twisted.internet import defer
from twisted.internet.interfaces import IReactorTime
from twisted.web.client import RedirectAgent
from twisted.web.http import stringToDatetime
from twisted.web.http_headers import Headers
@@ -35,8 +34,10 @@ from twisted.web.iweb import IAgent, IResponse
from synapse.http.client import BodyExceededMaxSize, read_body_with_max_size
from synapse.logging.context import make_deferred_yieldable
from synapse.util import Clock, json_decoder
from synapse.types import ISynapseThreadlessReactor
from synapse.util.caches.ttlcache import TTLCache
from synapse.util.clock import Clock
from synapse.util.json import json_decoder
from synapse.util.metrics import Measure
# period to cache .well-known results for by default
@@ -88,7 +89,7 @@ class WellKnownResolver:
def __init__(
self,
server_name: str,
reactor: IReactorTime,
reactor: ISynapseThreadlessReactor,
agent: IAgent,
user_agent: bytes,
well_known_cache: Optional[TTLCache[bytes, Optional[bytes]]] = None,

View File

@@ -89,8 +89,8 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import set_tag, start_active_span, tags
from synapse.metrics import SERVER_NAME_LABEL
from synapse.types import JsonDict
from synapse.util import json_decoder
from synapse.util.async_helpers import AwakenableSleeper, Linearizer, timeout_deferred
from synapse.util.json import json_decoder
from synapse.util.metrics import Measure
from synapse.util.stringutils import parse_and_validate_server_name

View File

@@ -52,10 +52,11 @@ from zope.interface import implementer
from twisted.internet import defer, interfaces, reactor
from twisted.internet.defer import CancelledError
from twisted.internet.interfaces import IReactorTime
from twisted.python import failure
from twisted.web import resource
from synapse.types import ISynapseThreadlessReactor
try:
from twisted.web.pages import notFound
except ImportError:
@@ -77,10 +78,11 @@ from synapse.api.errors import (
from synapse.config.homeserver import HomeServerConfig
from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
from synapse.logging.opentracing import active_span, start_active_span, trace_servlet
from synapse.util import Clock, json_encoder
from synapse.util.caches import intern_dict
from synapse.util.cancellation import is_function_cancellable
from synapse.util.clock import Clock
from synapse.util.iterutils import chunk_seq
from synapse.util.json import json_encoder
if TYPE_CHECKING:
import opentracing
@@ -410,7 +412,7 @@ class DirectServeJsonResource(_AsyncResource):
clock: Optional[Clock] = None,
):
if clock is None:
clock = Clock(cast(IReactorTime, reactor))
clock = Clock(cast(ISynapseThreadlessReactor, reactor))
super().__init__(clock, extract_context)
self.canonical_json = canonical_json
@@ -589,7 +591,7 @@ class DirectServeHtmlResource(_AsyncResource):
clock: Optional[Clock] = None,
):
if clock is None:
clock = Clock(cast(IReactorTime, reactor))
clock = Clock(cast(ISynapseThreadlessReactor, reactor))
super().__init__(clock, extract_context)

View File

@@ -51,7 +51,7 @@ from synapse.api.errors import Codes, SynapseError
from synapse.http import redact_uri
from synapse.http.server import HttpServer
from synapse.types import JsonDict, RoomAlias, RoomID, StrCollection
from synapse.util import json_decoder
from synapse.util.json import json_decoder
if TYPE_CHECKING:
from synapse.server import HomeServer

View File

@@ -60,8 +60,18 @@ class PeriodicallyFlushingMemoryHandler(MemoryHandler):
else:
reactor_to_use = reactor
# call our hook when the reactor start up
reactor_to_use.callWhenRunning(on_reactor_running)
# Call our hook when the reactor start up
#
# type-ignore: Ideally, we'd use `Clock.call_when_running(...)`, but
# `PeriodicallyFlushingMemoryHandler` is instantiated via Python logging
# configuration, so it's not straightforward to pass in the homeserver's clock
# (and we don't want to burden other peoples logging config with the details).
#
# The important reason why we want to use `Clock.call_when_running` is so that
# the callback runs with a logcontext as we want to know which server the logs
# came from. But since we don't log anything in the callback, it's safe to
# ignore the lint here.
reactor_to_use.callWhenRunning(on_reactor_running) # type: ignore[prefer-synapse-clock-call-when-running]
def shouldFlush(self, record: LogRecord) -> bool:
"""

View File

@@ -204,7 +204,7 @@ from twisted.web.http import Request
from twisted.web.http_headers import Headers
from synapse.config import ConfigError
from synapse.util import json_decoder, json_encoder
from synapse.util.json import json_decoder, json_encoder
if TYPE_CHECKING:
from synapse.http.site import SynapseRequest

View File

@@ -54,8 +54,8 @@ from synapse.logging.context import (
make_deferred_yieldable,
run_in_background,
)
from synapse.util import Clock
from synapse.util.async_helpers import DeferredEvent
from synapse.util.clock import Clock
from synapse.util.stringutils import is_ascii
if TYPE_CHECKING:

View File

@@ -55,7 +55,7 @@ from synapse.api.errors import NotFoundError
from synapse.logging.context import defer_to_thread, run_in_background
from synapse.logging.opentracing import start_active_span, trace, trace_with_opname
from synapse.media._base import ThreadedFileSender
from synapse.util import Clock
from synapse.util.clock import Clock
from synapse.util.file_consumer import BackgroundFileConsumer
from ..types import JsonDict

View File

@@ -27,7 +27,7 @@ import attr
from synapse.media.preview_html import parse_html_description
from synapse.types import JsonDict
from synapse.util import json_decoder
from synapse.util.json import json_decoder
if TYPE_CHECKING:
from lxml import etree

View File

@@ -46,9 +46,9 @@ from synapse.media.oembed import OEmbedProvider
from synapse.media.preview_html import decode_body, parse_html_to_open_graph
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import JsonDict, UserID
from synapse.util import json_encoder
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.json import json_encoder
from synapse.util.stringutils import random_string
if TYPE_CHECKING:

View File

@@ -158,9 +158,9 @@ from synapse.types import (
create_requester,
)
from synapse.types.state import StateFilter
from synapse.util import Clock
from synapse.util.async_helpers import maybe_awaitable
from synapse.util.caches.descriptors import CachedFunction, cached as _cached
from synapse.util.clock import Clock
from synapse.util.frozenutils import freeze
if TYPE_CHECKING:

View File

@@ -29,7 +29,7 @@ import logging
from typing import List, Optional, Tuple, Type, TypeVar
from synapse.replication.tcp.streams._base import StreamRow
from synapse.util import json_decoder, json_encoder
from synapse.util.json import json_decoder, json_encoder
logger = logging.getLogger(__name__)

View File

@@ -27,7 +27,7 @@ from prometheus_client import Counter, Histogram
from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable
from synapse.metrics import SERVER_NAME_LABEL
from synapse.util import json_decoder, json_encoder
from synapse.util.json import json_decoder, json_encoder
if TYPE_CHECKING:
from txredisapi import ConnectionHandler

View File

@@ -55,7 +55,7 @@ from synapse.replication.tcp.commands import (
ServerCommand,
parse_command_from_line,
)
from synapse.util import Clock
from synapse.util.clock import Clock
from synapse.util.stringutils import random_string
if TYPE_CHECKING:

View File

@@ -58,8 +58,8 @@ from synapse.logging.opentracing import log_kv, set_tag, trace_with_opname
from synapse.rest.admin.experimental_features import ExperimentalFeature
from synapse.types import JsonDict, Requester, SlidingSyncStreamToken, StreamToken
from synapse.types.rest.client import SlidingSyncBody
from synapse.util import json_decoder
from synapse.util.caches.lrucache import LruCache
from synapse.util.json import json_decoder
from ._base import client_patterns, set_timeline_upper_limit

View File

@@ -38,8 +38,8 @@ from synapse.http.servlet import (
from synapse.storage.keys import FetchKeyResultForRemote
from synapse.types import JsonDict
from synapse.types.rest import RequestBodyModel
from synapse.util import json_decoder
from synapse.util.async_helpers import yieldable_gather_results
from synapse.util.json import json_decoder
if TYPE_CHECKING:
from synapse.server import HomeServer

View File

@@ -28,7 +28,7 @@ from synapse.api.errors import NotFoundError
from synapse.http.server import DirectServeJsonResource
from synapse.http.site import SynapseRequest
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.json import json_encoder
from synapse.util.stringutils import parse_server_name
if TYPE_CHECKING:

View File

@@ -156,7 +156,7 @@ from synapse.storage.controllers import StorageControllers
from synapse.streams.events import EventSources
from synapse.synapse_rust.rendezvous import RendezvousHandler
from synapse.types import DomainSpecificString, ISynapseReactor
from synapse.util import Clock
from synapse.util.clock import Clock
from synapse.util.distributor import Distributor
from synapse.util.macaroons import MacaroonGenerator
from synapse.util.ratelimitutils import FederationRateLimiter

View File

@@ -29,8 +29,8 @@ from synapse.storage.database import (
make_in_list_sql_clause, # noqa: F401
)
from synapse.types import get_domain_from_id
from synapse.util import json_decoder
from synapse.util.caches.descriptors import CachedFunction
from synapse.util.json import json_decoder
if TYPE_CHECKING:
from synapse.server import HomeServer

View File

@@ -45,7 +45,8 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Connection, Cursor
from synapse.types import JsonDict, StrCollection
from synapse.util import Clock, json_encoder
from synapse.util.clock import Clock
from synapse.util.json import json_encoder
from . import engines

View File

@@ -48,9 +48,9 @@ from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
from synapse.storage.invite_rule import InviteRulesConfig
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import JsonDict, JsonMapping
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.json import json_encoder
if TYPE_CHECKING:
from synapse.server import HomeServer

View File

@@ -42,8 +42,8 @@ from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.types import Cursor
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import DeviceListUpdates, JsonMapping
from synapse.util import json_encoder
from synapse.util.caches.descriptors import _CacheContext, cached
from synapse.util.json import json_encoder
if TYPE_CHECKING:
from synapse.server import HomeServer

View File

@@ -32,7 +32,7 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.util import json_encoder
from synapse.util.json import json_encoder
if TYPE_CHECKING:
from synapse.server import HomeServer

View File

@@ -22,7 +22,8 @@ from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import LoggingTransaction, StoreError
from synapse.storage.engines import PostgresEngine
from synapse.types import JsonDict, RoomID
from synapse.util import json_encoder, stringutils as stringutils
from synapse.util import stringutils
from synapse.util.json import json_encoder
logger = logging.getLogger(__name__)

View File

@@ -53,10 +53,11 @@ from synapse.storage.database import (
)
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import JsonDict, StrCollection
from synapse.util import Duration, json_encoder
from synapse.util import Duration
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.iterutils import batch_iter
from synapse.util.json import json_encoder
from synapse.util.stringutils import parse_and_validate_server_name
if TYPE_CHECKING:

View File

@@ -64,11 +64,11 @@ from synapse.types import (
StrCollection,
get_verify_key_from_cross_signing_key,
)
from synapse.util import json_decoder, json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
from synapse.util.json import json_decoder, json_encoder
from synapse.util.stringutils import shortstr
if TYPE_CHECKING:

View File

@@ -41,7 +41,7 @@ from synapse.storage.database import (
LoggingTransaction,
)
from synapse.types import JsonDict, JsonSerializable, StreamKeyType
from synapse.util import json_encoder
from synapse.util.json import json_encoder
if TYPE_CHECKING:
from synapse.server import HomeServer

View File

@@ -61,10 +61,10 @@ from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import JsonDict, JsonMapping, MultiWriterStreamToken
from synapse.util import json_decoder, json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
from synapse.util.json import json_decoder, json_encoder
if TYPE_CHECKING:
from synapse.handlers.e2e_keys import SignatureListItem

View File

@@ -59,11 +59,11 @@ from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import JsonDict, StrCollection
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
from synapse.util.json import json_encoder
if TYPE_CHECKING:
from synapse.server import HomeServer

View File

@@ -107,8 +107,8 @@ from synapse.storage.database import (
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.types import JsonDict, StrCollection
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
from synapse.util.json import json_encoder
if TYPE_CHECKING:
from synapse.server import HomeServer

View File

@@ -83,9 +83,9 @@ from synapse.types import (
)
from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
from synapse.types.state import StateFilter
from synapse.util import json_encoder
from synapse.util.events import get_plain_text_topic_from_event_content
from synapse.util.iterutils import batch_iter, sorted_topologically
from synapse.util.json import json_encoder
from synapse.util.stringutils import non_null_str_or_none
if TYPE_CHECKING:

View File

@@ -58,8 +58,8 @@ from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection
from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
from synapse.types.state import StateFilter
from synapse.types.storage import _BackgroundUpdates
from synapse.util import json_encoder
from synapse.util.iterutils import batch_iter
from synapse.util.json import json_encoder
if TYPE_CHECKING:
from synapse.server import HomeServer

View File

@@ -38,7 +38,7 @@ from synapse.storage.database import (
LoggingTransaction,
)
from synapse.types import ISynapseReactor
from synapse.util import Clock
from synapse.util.clock import Clock
from synapse.util.stringutils import random_string
if TYPE_CHECKING:

View File

@@ -56,10 +56,11 @@ from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundExc
from synapse.storage.util.id_generators import IdGenerator, MultiWriterIdGenerator
from synapse.synapse_rust.push import FilteredPushRules, PushRule, PushRules
from synapse.types import JsonDict
from synapse.util import json_encoder, unwrapFirstError
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import gather_results
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.json import json_encoder
if TYPE_CHECKING:
from synapse.server import HomeServer

View File

@@ -42,8 +42,8 @@ from synapse.storage.database import (
)
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
from synapse.util.json import json_encoder
if TYPE_CHECKING:
from synapse.server import HomeServer

View File

@@ -55,10 +55,10 @@ from synapse.types import (
PersistedPosition,
StrCollection,
)
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.iterutils import batch_iter
from synapse.util.json import json_encoder
if TYPE_CHECKING:
from synapse.server import HomeServer

View File

@@ -65,8 +65,8 @@ from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import IdGenerator, MultiWriterIdGenerator
from synapse.types import JsonDict, RetentionPolicy, StrCollection, ThirdPartyInstanceID
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.json import json_encoder
from synapse.util.stringutils import MXC_REGEX
if TYPE_CHECKING:

View File

@@ -30,7 +30,7 @@ from synapse.storage.database import (
LoggingTransaction,
)
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.json import json_encoder
if TYPE_CHECKING:
from synapse.server import HomeServer

View File

@@ -35,8 +35,8 @@ from synapse.types.handlers.sliding_sync import (
RoomStatusMap,
RoomSyncConfig,
)
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
from synapse.util.json import json_encoder
if TYPE_CHECKING:
from synapse.server import HomeServer

View File

@@ -30,8 +30,8 @@ from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main.account_data import AccountDataWorkerStore
from synapse.storage.util.id_generators import AbstractStreamIdGenerator
from synapse.types import JsonDict, JsonMapping
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
from synapse.util.json import json_encoder
logger = logging.getLogger(__name__)

View File

@@ -29,7 +29,7 @@ from synapse.storage.database import (
make_in_list_sql_clause,
)
from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus
from synapse.util import json_encoder
from synapse.util.json import json_encoder
if TYPE_CHECKING:
from synapse.server import HomeServer

View File

@@ -27,7 +27,8 @@ from synapse.api.errors import StoreError
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import LoggingTransaction
from synapse.types import JsonDict
from synapse.util import json_encoder, stringutils
from synapse.util import stringutils
from synapse.util.json import json_encoder
@attr.s(slots=True, auto_attribs=True)

View File

@@ -116,13 +116,27 @@ StrSequence = Union[Tuple[str, ...], List[str]]
# Note that this seems to require inheriting *directly* from Interface in order
# for mypy-zope to realize it is an interface.
class ISynapseReactor(
class ISynapseThreadlessReactor(
IReactorTCP,
IReactorSSL,
IReactorUNIX,
IReactorPluggableNameResolver,
IReactorTime,
IReactorCore,
Interface,
):
"""
The interfaces necessary for Synapse to function (without threads).
Helpful because we use `twisted.internet.testing.MemoryReactorClock` in tests which
doesn't implement `IReactorThreads`.
"""
# Note that this seems to require inheriting *directly* from Interface in order
# for mypy-zope to realize it is an interface.
class ISynapseReactor(
ISynapseThreadlessReactor,
IReactorThreads,
Interface,
):

View File

@@ -20,12 +20,9 @@
#
import collections.abc
import json
import logging
import typing
from typing import (
Any,
Callable,
Dict,
Iterator,
Mapping,
@@ -36,17 +33,11 @@ from typing import (
)
import attr
from immutabledict import immutabledict
from matrix_common.versionstring import get_distribution_version_string
from typing_extensions import ParamSpec
from twisted.internet import defer, task
from twisted.internet.interfaces import IDelayedCall, IReactorTime
from twisted.internet.task import LoopingCall
from twisted.internet import defer
from twisted.python.failure import Failure
from synapse.logging import context
if typing.TYPE_CHECKING:
pass
@@ -62,41 +53,6 @@ class Duration:
DAY_MS = 24 * HOUR_MS
def _reject_invalid_json(val: Any) -> None:
"""Do not allow Infinity, -Infinity, or NaN values in JSON."""
raise ValueError("Invalid JSON value: '%s'" % val)
def _handle_immutabledict(obj: Any) -> Dict[Any, Any]:
"""Helper for json_encoder. Makes immutabledicts serializable by returning
the underlying dict
"""
if type(obj) is immutabledict:
# fishing the protected dict out of the object is a bit nasty,
# but we don't really want the overhead of copying the dict.
try:
# Safety: we catch the AttributeError immediately below.
return obj._dict
except AttributeError:
# If all else fails, resort to making a copy of the immutabledict
return dict(obj)
raise TypeError(
"Object of type %s is not JSON serializable" % obj.__class__.__name__
)
# A custom JSON encoder which:
# * handles immutabledicts
# * produces valid JSON (no NaNs etc)
# * reduces redundant whitespace
json_encoder = json.JSONEncoder(
allow_nan=False, separators=(",", ":"), default=_handle_immutabledict
)
# Create a custom decoder to reject Python extensions to JSON.
json_decoder = json.JSONDecoder(parse_constant=_reject_invalid_json)
def unwrapFirstError(failure: Failure) -> Failure:
# Deprecated: you probably just want to catch defer.FirstError and reraise
# the subFailure's value, which will do a better job of preserving stacktraces.
@@ -105,129 +61,6 @@ def unwrapFirstError(failure: Failure) -> Failure:
return failure.value.subFailure
P = ParamSpec("P")
@attr.s(slots=True)
class Clock:
"""
A Clock wraps a Twisted reactor and provides utilities on top of it.
Args:
reactor: The Twisted reactor to use.
"""
_reactor: IReactorTime = attr.ib()
async def sleep(self, seconds: float) -> None:
d: defer.Deferred[float] = defer.Deferred()
with context.PreserveLoggingContext():
self._reactor.callLater(seconds, d.callback, seconds)
await d
def time(self) -> float:
"""Returns the current system time in seconds since epoch."""
return self._reactor.seconds()
def time_msec(self) -> int:
"""Returns the current system time in milliseconds since epoch."""
return int(self.time() * 1000)
def looping_call(
self,
f: Callable[P, object],
msec: float,
*args: P.args,
**kwargs: P.kwargs,
) -> LoopingCall:
"""Call a function repeatedly.
Waits `msec` initially before calling `f` for the first time.
If the function given to `looping_call` returns an awaitable/deferred, the next
call isn't scheduled until after the returned awaitable has finished. We get
this functionality thanks to this function being a thin wrapper around
`twisted.internet.task.LoopingCall`.
Note that the function will be called with no logcontext, so if it is anything
other than trivial, you probably want to wrap it in run_as_background_process.
Args:
f: The function to call repeatedly.
msec: How long to wait between calls in milliseconds.
*args: Positional arguments to pass to function.
**kwargs: Key arguments to pass to function.
"""
return self._looping_call_common(f, msec, False, *args, **kwargs)
def looping_call_now(
self,
f: Callable[P, object],
msec: float,
*args: P.args,
**kwargs: P.kwargs,
) -> LoopingCall:
"""Call a function immediately, and then repeatedly thereafter.
As with `looping_call`: subsequent calls are not scheduled until after the
the Awaitable returned by a previous call has finished.
Also as with `looping_call`: the function is called with no logcontext and
you probably want to wrap it in `run_as_background_process`.
Args:
f: The function to call repeatedly.
msec: How long to wait between calls in milliseconds.
*args: Positional arguments to pass to function.
**kwargs: Key arguments to pass to function.
"""
return self._looping_call_common(f, msec, True, *args, **kwargs)
def _looping_call_common(
self,
f: Callable[P, object],
msec: float,
now: bool,
*args: P.args,
**kwargs: P.kwargs,
) -> LoopingCall:
"""Common functionality for `looping_call` and `looping_call_now`"""
call = task.LoopingCall(f, *args, **kwargs)
call.clock = self._reactor
d = call.start(msec / 1000.0, now=now)
d.addErrback(log_failure, "Looping call died", consumeErrors=False)
return call
def call_later(
self, delay: float, callback: Callable, *args: Any, **kwargs: Any
) -> IDelayedCall:
"""Call something later
Note that the function will be called with no logcontext, so if it is anything
other than trivial, you probably want to wrap it in run_as_background_process.
Args:
delay: How long to wait in seconds.
callback: Function to call
*args: Postional arguments to pass to function.
**kwargs: Key arguments to pass to function.
"""
def wrapped_callback(*args: Any, **kwargs: Any) -> None:
with context.PreserveLoggingContext():
callback(*args, **kwargs)
with context.PreserveLoggingContext():
return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs)
def cancel_call_later(self, timer: IDelayedCall, ignore_errs: bool = False) -> None:
try:
timer.cancel()
except Exception:
if not ignore_errs:
raise
def log_failure(
failure: Failure, msg: str, consumeErrors: bool = True
) -> Optional[Failure]:

View File

@@ -65,7 +65,8 @@ from synapse.logging.context import (
run_coroutine_in_background,
run_in_background,
)
from synapse.util import Clock
from synapse.types import ISynapseThreadlessReactor
from synapse.util.clock import Clock
logger = logging.getLogger(__name__)
@@ -566,7 +567,7 @@ class Linearizer:
if not clock:
from twisted.internet import reactor
clock = Clock(cast(IReactorTime, reactor))
clock = Clock(cast(ISynapseThreadlessReactor, reactor))
self._clock = clock
self.max_count = max_count

View File

@@ -39,7 +39,7 @@ from twisted.internet import defer
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.metrics import SERVER_NAME_LABEL
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util import Clock
from synapse.util.clock import Clock
logger = logging.getLogger(__name__)

View File

@@ -29,8 +29,8 @@ from twisted.internet import defer
from synapse.config import cache as cache_config
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util import Clock
from synapse.util.caches import EvictionReason, register_cache
from synapse.util.clock import Clock
logger = logging.getLogger(__name__)

View File

@@ -46,20 +46,21 @@ from typing import (
)
from twisted.internet import defer, reactor
from twisted.internet.interfaces import IReactorTime
from synapse.config import cache as cache_config
from synapse.metrics.background_process_metrics import (
run_as_background_process,
)
from synapse.metrics.jemalloc import get_jemalloc_stats
from synapse.util import Clock, caches
from synapse.types import ISynapseThreadlessReactor
from synapse.util import caches
from synapse.util.caches import CacheMetric, EvictionReason, register_cache
from synapse.util.caches.treecache import (
TreeCache,
iterate_tree_cache_entry,
iterate_tree_cache_items,
)
from synapse.util.clock import Clock
from synapse.util.linked_list import ListNode
if TYPE_CHECKING:
@@ -496,7 +497,7 @@ class LruCache(Generic[KT, VT]):
# Default `clock` to something sensible. Note that we rename it to
# `real_clock` so that mypy doesn't think its still `Optional`.
if clock is None:
real_clock = Clock(cast(IReactorTime, reactor))
real_clock = Clock(cast(ISynapseThreadlessReactor, reactor))
else:
real_clock = clock

View File

@@ -41,9 +41,9 @@ from synapse.logging.opentracing import (
start_active_span,
start_active_span_follows_from,
)
from synapse.util import Clock
from synapse.util.async_helpers import AbstractObservableDeferred, ObservableDeferred
from synapse.util.caches import EvictionReason, register_cache
from synapse.util.clock import Clock
logger = logging.getLogger(__name__)

208
synapse/util/clock.py Normal file
View File

@@ -0,0 +1,208 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl_3.0.html>.
#
#
from typing import (
Any,
Callable,
)
import attr
from typing_extensions import ParamSpec
from twisted.internet import defer, task
from twisted.internet.interfaces import IDelayedCall
from twisted.internet.task import LoopingCall
from synapse.logging import context
from synapse.types import ISynapseThreadlessReactor
from synapse.util import log_failure
P = ParamSpec("P")
@attr.s(slots=True)
class Clock:
"""
A Clock wraps a Twisted reactor and provides utilities on top of it.
Args:
reactor: The Twisted reactor to use.
"""
_reactor: ISynapseThreadlessReactor = attr.ib()
async def sleep(self, seconds: float) -> None:
d: defer.Deferred[float] = defer.Deferred()
with context.PreserveLoggingContext():
self._reactor.callLater(seconds, d.callback, seconds)
await d
def time(self) -> float:
"""Returns the current system time in seconds since epoch."""
return self._reactor.seconds()
def time_msec(self) -> int:
"""Returns the current system time in milliseconds since epoch."""
return int(self.time() * 1000)
def looping_call(
self,
f: Callable[P, object],
msec: float,
*args: P.args,
**kwargs: P.kwargs,
) -> LoopingCall:
"""Call a function repeatedly.
Waits `msec` initially before calling `f` for the first time.
If the function given to `looping_call` returns an awaitable/deferred, the next
call isn't scheduled until after the returned awaitable has finished. We get
this functionality thanks to this function being a thin wrapper around
`twisted.internet.task.LoopingCall`.
Note that the function will be called with no logcontext, so if it is anything
other than trivial, you probably want to wrap it in run_as_background_process.
Args:
f: The function to call repeatedly.
msec: How long to wait between calls in milliseconds.
*args: Positional arguments to pass to function.
**kwargs: Key arguments to pass to function.
"""
return self._looping_call_common(f, msec, False, *args, **kwargs)
def looping_call_now(
self,
f: Callable[P, object],
msec: float,
*args: P.args,
**kwargs: P.kwargs,
) -> LoopingCall:
"""Call a function immediately, and then repeatedly thereafter.
As with `looping_call`: subsequent calls are not scheduled until after the
the Awaitable returned by a previous call has finished.
Also as with `looping_call`: the function is called with no logcontext and
you probably want to wrap it in `run_as_background_process`.
Args:
f: The function to call repeatedly.
msec: How long to wait between calls in milliseconds.
*args: Positional arguments to pass to function.
**kwargs: Key arguments to pass to function.
"""
return self._looping_call_common(f, msec, True, *args, **kwargs)
def _looping_call_common(
self,
f: Callable[P, object],
msec: float,
now: bool,
*args: P.args,
**kwargs: P.kwargs,
) -> LoopingCall:
"""Common functionality for `looping_call` and `looping_call_now`"""
call = task.LoopingCall(f, *args, **kwargs)
call.clock = self._reactor
d = call.start(msec / 1000.0, now=now)
d.addErrback(log_failure, "Looping call died", consumeErrors=False)
return call
def call_later(
self, delay: float, callback: Callable, *args: Any, **kwargs: Any
) -> IDelayedCall:
"""Call something later
Note that the function will be called with no logcontext, so if it is anything
other than trivial, you probably want to wrap it in run_as_background_process.
Args:
delay: How long to wait in seconds.
callback: Function to call
*args: Postional arguments to pass to function.
**kwargs: Key arguments to pass to function.
"""
def wrapped_callback(*args: Any, **kwargs: Any) -> None:
with context.PreserveLoggingContext():
callback(*args, **kwargs)
with context.PreserveLoggingContext():
return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs)
def cancel_call_later(self, timer: IDelayedCall, ignore_errs: bool = False) -> None:
try:
timer.cancel()
except Exception:
if not ignore_errs:
raise
def call_when_running(
self,
callback: Callable[P, object],
*args: P.args,
**kwargs: P.kwargs,
) -> None:
"""
Call a function when the reactor is running.
If the reactor has not started, the callable will be scheduled to run when it
does start. Otherwise, the callable will be invoked immediately.
Args:
callback: Function to call
*args: Postional arguments to pass to function.
**kwargs: Key arguments to pass to function.
"""
def wrapped_callback(*args: Any, **kwargs: Any) -> None:
# Since this callback can be invoked immediately if the reactor is already
# running, we can't always assume that we're running in the sentinel
# logcontext (i.e. we can't assert that we're in the sentinel context like
# we can in other methods).
#
# We will only be running in the sentinel logcontext if the reactor was not
# running when `call_when_running` was invoked and later starts up.
#
# assert context.current_context() is context.SENTINEL_CONTEXT
# Because this is a callback from the reactor, we will be using the
# `sentinel` log context at this point. We want the function to log with
# some logcontext as we want to know which server the logs came from.
#
# We use `PreserveLoggingContext` to prevent our new `call_when_running`
# logcontext from finishing as soon as we exit this function, in case `f`
# returns an awaitable/deferred which would continue running and may try to
# restore the `loop_call` context when it's done (because it's trying to
# adhere to the Synapse logcontext rules.)
#
# This also ensures that we return to the `sentinel` context when we exit
# this function and yield control back to the reactor to avoid leaking the
# current logcontext to the reactor (which would then get picked up and
# associated with the next thing the reactor does)
with context.PreserveLoggingContext(
context.LoggingContext("call_when_running")
):
# We use `run_in_background` to reset the logcontext after `f` (or the
# awaitable returned by `f`) completes to avoid leaking the current
# logcontext to the reactor
context.run_in_background(callback, *args, **kwargs)
# We can ignore the lint here since this class is the one location
# callWhenRunning should be called.
self._reactor.callWhenRunning(wrapped_callback, *args, **kwargs) # type: ignore[prefer-synapse-clock-call-when-running]

57
synapse/util/json.py Normal file
View File

@@ -0,0 +1,57 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl_3.0.html>.
#
#
import json
from typing import (
Any,
Dict,
)
from immutabledict import immutabledict
def _reject_invalid_json(val: Any) -> None:
"""Do not allow Infinity, -Infinity, or NaN values in JSON."""
raise ValueError("Invalid JSON value: '%s'" % val)
def _handle_immutabledict(obj: Any) -> Dict[Any, Any]:
"""Helper for json_encoder. Makes immutabledicts serializable by returning
the underlying dict
"""
if type(obj) is immutabledict:
# fishing the protected dict out of the object is a bit nasty,
# but we don't really want the overhead of copying the dict.
try:
# Safety: we catch the AttributeError immediately below.
return obj._dict
except AttributeError:
# If all else fails, resort to making a copy of the immutabledict
return dict(obj)
raise TypeError(
"Object of type %s is not JSON serializable" % obj.__class__.__name__
)
# A custom JSON encoder which:
# * handles immutabledicts
# * produces valid JSON (no NaNs etc)
# * reduces redundant whitespace
json_encoder = json.JSONEncoder(
allow_nan=False, separators=(",", ":"), default=_handle_immutabledict
)
# Create a custom decoder to reject Python extensions to JSON.
json_decoder = json.JSONDecoder(parse_constant=_reject_invalid_json)

View File

@@ -28,7 +28,8 @@ import attr
import pymacaroons
from pymacaroons.exceptions import MacaroonVerificationFailedException
from synapse.util import Clock, stringutils
from synapse.util import stringutils
from synapse.util.clock import Clock
MacaroonType = Literal["access", "delete_pusher", "session"]

View File

@@ -42,7 +42,7 @@ from synapse.logging.context import (
current_context,
)
from synapse.metrics import SERVER_NAME_LABEL, InFlightGauge
from synapse.util import Clock
from synapse.util.clock import Clock
logger = logging.getLogger(__name__)

View File

@@ -53,7 +53,7 @@ from synapse.logging.context import (
)
from synapse.logging.opentracing import start_active_span
from synapse.metrics import SERVER_NAME_LABEL, Histogram, LaterGauge
from synapse.util import Clock
from synapse.util.clock import Clock
if typing.TYPE_CHECKING:
from contextlib import _GeneratorContextManager

View File

@@ -27,7 +27,7 @@ from synapse.api.errors import CodeMessageException
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage import DataStore
from synapse.types import StrCollection
from synapse.util import Clock
from synapse.util.clock import Clock
if TYPE_CHECKING:
from synapse.notifier import Notifier

View File

@@ -55,7 +55,7 @@ from synapse.types import (
get_domain_from_id,
)
from synapse.types.state import StateFilter
from synapse.util import Clock
from synapse.util.clock import Clock
logger = logging.getLogger(__name__)
filtered_event_logger = logging.getLogger("synapse.visibility.filtered_event_debug")

View File

@@ -62,7 +62,10 @@ def make_test(
return res
d.addBoth(on_done)
reactor.callWhenRunning(lambda: d.callback(True))
# type-ignore: This is outside of Synapse (just a utility benchmark script)
# so we don't need to worry about which server the logs are coming from
# (`Clock.call_when_running` manages the logcontext for us).
reactor.callWhenRunning(lambda: d.callback(True)) # type: ignore[prefer-synapse-clock-call-when-running]
reactor.run()
# mypy thinks this is an object for some reason.

View File

@@ -37,7 +37,7 @@ from synapse.config.logger import _setup_stdlib_logging
from synapse.logging import RemoteHandler
from synapse.synapse_rust import reset_logging_config
from synapse.types import ISynapseReactor
from synapse.util import Clock
from synapse.util.clock import Clock
class LineCounter(LineOnlyReceiver):

View File

@@ -39,7 +39,7 @@ from synapse.appservice import ApplicationService
from synapse.server import HomeServer
from synapse.storage.databases.main.registration import TokenLookupResult
from synapse.types import Requester, UserID
from synapse.util import Clock
from synapse.util.clock import Clock
from tests import unittest
from tests.unittest import override_config

View File

@@ -33,7 +33,7 @@ from synapse.api.filtering import Filter
from synapse.api.presence import UserPresenceState
from synapse.server import HomeServer
from synapse.types import JsonDict, UserID
from synapse.util import Clock
from synapse.util.clock import Clock
from synapse.util.frozenutils import freeze
from tests import unittest

View File

@@ -17,7 +17,7 @@ from twisted.internet.testing import MemoryReactor
from synapse.api.urls import LoginSSORedirectURIBuilder
from synapse.server import HomeServer
from synapse.util import Clock
from synapse.util.clock import Clock
from tests.unittest import HomeserverTestCase

View File

@@ -29,7 +29,7 @@ from synapse.app.homeserver import SynapseHomeServer
from synapse.config.server import parse_listener_def
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.util import Clock
from synapse.util.clock import Clock
from tests.server import make_request
from tests.unittest import HomeserverTestCase

View File

@@ -2,7 +2,7 @@ import synapse
from synapse.app.phone_stats_home import start_phone_stats_home
from synapse.rest.client import login, room
from synapse.server import HomeServer
from synapse.util import Clock
from synapse.util.clock import Clock
from tests.server import ThreadedMemoryReactorClock
from tests.unittest import HomeserverTestCase

View File

@@ -26,7 +26,7 @@ from twisted.internet.testing import MemoryReactor
from synapse.appservice import ApplicationService
from synapse.server import HomeServer
from synapse.types import JsonDict, UserID
from synapse.util import Clock
from synapse.util.clock import Clock
from tests import unittest
from tests.unittest import override_config

View File

@@ -41,7 +41,7 @@ from synapse.events import EventBase
from synapse.logging.context import make_deferred_yieldable
from synapse.server import HomeServer
from synapse.types import DeviceListUpdates, JsonDict
from synapse.util import Clock
from synapse.util.clock import Clock
from tests import unittest

View File

@@ -27,7 +27,7 @@ import synapse.rest.client.room
from synapse.config._base import RootConfig
from synapse.config.room_directory import RoomDirectoryConfig
from synapse.server import HomeServer
from synapse.util import Clock
from synapse.util.clock import Clock
from tests import unittest
from tests.unittest import override_config

View File

@@ -49,7 +49,7 @@ from synapse.logging.context import (
from synapse.server import HomeServer
from synapse.storage.keys import FetchKeyResult
from synapse.types import JsonDict
from synapse.util import Clock
from synapse.util.clock import Clock
from tests import unittest
from tests.unittest import logcontext_clean, override_config

View File

@@ -41,7 +41,7 @@ from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
from synapse.types import StreamToken, UserID, UserInfo, create_requester
from synapse.util import Clock
from synapse.util.clock import Clock
from tests.handlers.test_sync import generate_sync_config
from tests.unittest import (

View File

@@ -34,7 +34,7 @@ from synapse.rest import admin
from synapse.rest.client import login, presence, room
from synapse.server import HomeServer
from synapse.types import JsonDict, StreamToken, create_requester
from synapse.util import Clock
from synapse.util.clock import Clock
from tests.handlers.test_sync import SyncRequestKey, generate_sync_config
from tests.unittest import (

View File

@@ -26,7 +26,7 @@ from synapse.events.snapshot import EventContext
from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
from synapse.util import Clock
from synapse.util.clock import Clock
from tests import unittest
from tests.test_utils.event_injection import create_event

View File

@@ -16,7 +16,7 @@ from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.util import Clock
from synapse.util.clock import Clock
from synapse.util.retryutils import NotRetryingDestination
from tests.test_utils import event_injection

View File

@@ -30,7 +30,7 @@ from synapse.events import EventBase
from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
from synapse.util import Clock
from synapse.util.clock import Clock
from tests.test_utils import FakeResponse, event_injection
from tests.unittest import FederatingHomeserverTestCase

View File

@@ -26,7 +26,7 @@ from twisted.internet.testing import MemoryReactor
from synapse.handlers.device import DeviceListUpdater
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.util import Clock
from synapse.util.clock import Clock
from synapse.util.retryutils import NotRetryingDestination
from tests import unittest

View File

@@ -32,7 +32,7 @@ from synapse.media.storage_provider import (
)
from synapse.server import HomeServer
from synapse.types import UserID
from synapse.util import Clock
from synapse.util.clock import Clock
from tests import unittest
from tests.media.test_media_storage import small_png

Some files were not shown because too many files have changed in this diff Show More