# # This file is licensed under the Affero General Public License (AGPL) version 3. # # Copyright 2020 The Matrix.org Foundation C.I.C. # Copyright (C) 2023 New Vector, Ltd # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # See the GNU Affero General Public License for more details: # . # # Originally licensed under the Apache License, Version 2.0: # . # # [This file includes modifications made by New Vector Limited] # # """This is a mypy plugin for Synpase to deal with some of the funky typing that can crop up, e.g the cache descriptors. """ from typing import Callable, Optional, Tuple, Type, Union import mypy.types from mypy.erasetype import remove_instance_last_known_values from mypy.errorcodes import ErrorCode from mypy.nodes import ARG_NAMED_OPT, ListExpr, NameExpr, TempNode, Var from mypy.plugin import ( FunctionLike, FunctionSigContext, MethodSigContext, Plugin, ) from mypy.typeops import bind_self from mypy.types import ( AnyType, CallableType, Instance, NoneType, TupleType, TypeAliasType, TypeVarType, UninhabitedType, UnionType, ) PROMETHEUS_METRIC_MISSING_SERVER_NAME_LABEL = ErrorCode( "missing-server-name-label", "`SERVER_NAME_LABEL` required in metric", category="per-homeserver-tenant-metrics", ) class SynapsePlugin(Plugin): def get_function_signature_hook( self, fullname: str ) -> Optional[Callable[[FunctionSigContext], FunctionLike]]: if fullname in ( "prometheus_client.metrics.Counter", # TODO: Add other prometheus_client metrics that need checking as we # refactor, see https://github.com/element-hq/synapse/issues/18592 ): return check_prometheus_metric_instantiation return None def get_method_signature_hook( self, fullname: str ) -> Optional[Callable[[MethodSigContext], CallableType]]: if fullname.startswith( ( "synapse.util.caches.descriptors.CachedFunction.__call__", "synapse.util.caches.descriptors._LruCachedFunction.__call__", ) ): return cached_function_method_signature if fullname in ( "synapse.util.caches.descriptors._CachedFunctionDescriptor.__call__", "synapse.util.caches.descriptors._CachedListFunctionDescriptor.__call__", ): return check_is_cacheable_wrapper return None def check_prometheus_metric_instantiation(ctx: FunctionSigContext) -> CallableType: """ Ensure that the `prometheus_client` metrics include the `SERVER_NAME_LABEL` label when instantiated. This is important because we support multiple Synapse instances running in the same process, where all metrics share a single global `REGISTRY`. The `server_name` label ensures metrics are correctly separated by homeserver. There are also some metrics that apply at the process level, such as CPU usage, Python garbage collection, Twisted reactor tick time which shouldn't have the `SERVER_NAME_LABEL`. In those cases, use use a type ignore comment to disable the check, e.g. `# type: ignore[missing-server-name-label]`. """ # The true signature, this isn't being modified so this is what will be returned. signature: CallableType = ctx.default_signature # Sanity check the arguments are still as expected in this version of # `prometheus_client`. ex. `Counter(name, documentation, labelnames, ...)` # # `signature.arg_names` should be: ["name", "documentation", "labelnames", ...] if len(signature.arg_names) < 3 or signature.arg_names[2] != "labelnames": ctx.api.fail( f"Expected the 3rd argument of {signature.name} to be 'labelnames', but got " f"{signature.arg_names[2]}", ctx.context, ) return signature # Ensure mypy is passing the correct number of arguments because we are doing some # dirty indexing into `ctx.args` later on. assert len(ctx.args) == len(signature.arg_names), ( f"Expected the list of arguments in the {signature.name} signature ({len(signature.arg_names)})" f"to match the number of arguments from the function signature context ({len(ctx.args)})" ) # Check if the `labelnames` argument includes `SERVER_NAME_LABEL` # # `ctx.args` should look like this: # ``` # [ # [StrExpr("name")], # [StrExpr("documentation")], # [ListExpr([StrExpr("label1"), StrExpr("label2")])] # ... # ] # ``` labelnames_arg_expression = ctx.args[2][0] if len(ctx.args[2]) > 0 else None if isinstance(labelnames_arg_expression, ListExpr): # Check if the `labelnames` argument includes the `server_name` label (`SERVER_NAME_LABEL`). for labelname_expression in labelnames_arg_expression.items: if ( isinstance(labelname_expression, NameExpr) and labelname_expression.fullname == "synapse.metrics.SERVER_NAME_LABEL" ): # Found the `SERVER_NAME_LABEL`, all good! break else: ctx.api.fail( f"Expected {signature.name} to include `SERVER_NAME_LABEL` in the list of labels. " "If this is a process-level metric (vs homeserver-level), use a type ignore comment " "to disable this check.", ctx.context, code=PROMETHEUS_METRIC_MISSING_SERVER_NAME_LABEL, ) else: ctx.api.fail( f"Expected the `labelnames` argument of {signature.name} to be a list of label names " f"(including `SERVER_NAME_LABEL`), but got {labelnames_arg_expression}. " "If this is a process-level metric (vs homeserver-level), use a type ignore comment " "to disable this check.", ctx.context, code=PROMETHEUS_METRIC_MISSING_SERVER_NAME_LABEL, ) return signature return signature def _get_true_return_type(signature: CallableType) -> mypy.types.Type: """ Get the "final" return type of a callable which might return an Awaitable/Deferred. """ if isinstance(signature.ret_type, Instance): # If a coroutine, unwrap the coroutine's return type. if signature.ret_type.type.fullname == "typing.Coroutine": return signature.ret_type.args[2] # If an awaitable, unwrap the awaitable's final value. elif signature.ret_type.type.fullname == "typing.Awaitable": return signature.ret_type.args[0] # If a Deferred, unwrap the Deferred's final value. elif signature.ret_type.type.fullname == "twisted.internet.defer.Deferred": return signature.ret_type.args[0] # Otherwise, return the raw value of the function. return signature.ret_type def cached_function_method_signature(ctx: MethodSigContext) -> CallableType: """Fixes the `CachedFunction.__call__` signature to be correct. It already has *almost* the correct signature, except: 1. the `self` argument needs to be marked as "bound"; 2. any `cache_context` argument should be removed; 3. an optional keyword argument `on_invalidated` should be added. 4. Wrap the return type to always be a Deferred. """ # 1. Mark this as a bound function signature. signature: CallableType = bind_self(ctx.default_signature) # 2. Remove any "cache_context" args. # # Note: We should be only doing this if `cache_context=True` is set, but if # it isn't then the code will raise an exception when its called anyway, so # it's not the end of the world. context_arg_index = None for idx, name in enumerate(signature.arg_names): if name == "cache_context": context_arg_index = idx break arg_types = list(signature.arg_types) arg_names = list(signature.arg_names) arg_kinds = list(signature.arg_kinds) if context_arg_index: arg_types.pop(context_arg_index) arg_names.pop(context_arg_index) arg_kinds.pop(context_arg_index) # 3. Add an optional "on_invalidate" argument. # # This is a either # - a callable which accepts no input and returns nothing, or # - None. calltyp = UnionType( [ NoneType(), CallableType( arg_types=[], arg_kinds=[], arg_names=[], ret_type=NoneType(), fallback=ctx.api.named_generic_type("builtins.function", []), ), ] ) arg_types.append(calltyp) arg_names.append("on_invalidate") arg_kinds.append(ARG_NAMED_OPT) # Arg is an optional kwarg. # 4. Ensure the return type is a Deferred. ret_arg = _get_true_return_type(signature) # This should be able to use ctx.api.named_generic_type, but that doesn't seem # to find the correct symbol for anything more than 1 module deep. # # modules is not part of CheckerPluginInterface. The following is a combination # of TypeChecker.named_generic_type and TypeChecker.lookup_typeinfo. sym = ctx.api.modules["twisted.internet.defer"].names.get("Deferred") # type: ignore[attr-defined] ret_type = Instance(sym.node, [remove_instance_last_known_values(ret_arg)]) signature = signature.copy_modified( arg_types=arg_types, arg_names=arg_names, arg_kinds=arg_kinds, ret_type=ret_type, ) return signature def check_is_cacheable_wrapper(ctx: MethodSigContext) -> CallableType: """Asserts that the signature of a method returns a value which can be cached. Makes no changes to the provided method signature. """ # The true signature, this isn't being modified so this is what will be returned. signature: CallableType = ctx.default_signature if not isinstance(ctx.args[0][0], TempNode): ctx.api.note("Cached function is not a TempNode?!", ctx.context) # type: ignore[attr-defined] return signature orig_sig = ctx.args[0][0].type if not isinstance(orig_sig, CallableType): ctx.api.fail("Cached 'function' is not a callable", ctx.context) return signature check_is_cacheable(orig_sig, ctx) return signature def check_is_cacheable( signature: CallableType, ctx: Union[MethodSigContext, FunctionSigContext], ) -> None: """ Check if a callable returns a type which can be cached. Args: signature: The callable to check. ctx: The signature context, used for error reporting. """ # Unwrap the true return type from the cached function. return_type = _get_true_return_type(signature) verbose = ctx.api.options.verbosity >= 1 # TODO Technically a cachedList only needs immutable values, but forcing them # to return Mapping instead of Dict is fine. ok, note = is_cacheable(return_type, signature, verbose) if ok: message = f"function {signature.name} is @cached, returning {return_type}" else: message = f"function {signature.name} is @cached, but has mutable return value {return_type}" if note: message += f" ({note})" message = message.replace("builtins.", "").replace("typing.", "") if ok and note: ctx.api.note(message, ctx.context) # type: ignore[attr-defined] elif not ok: ctx.api.fail(message, ctx.context, code=AT_CACHED_MUTABLE_RETURN) # Immutable simple values. IMMUTABLE_VALUE_TYPES = { "builtins.bool", "builtins.int", "builtins.float", "builtins.str", "builtins.bytes", } # Types defined in Synapse which are known to be immutable. IMMUTABLE_CUSTOM_TYPES = { "synapse.synapse_rust.acl.ServerAclEvaluator", "synapse.synapse_rust.push.FilteredPushRules", # This is technically not immutable, but close enough. "signedjson.types.VerifyKey", "synapse.types.StrCollection", } # Immutable containers only if the values are also immutable. IMMUTABLE_CONTAINER_TYPES_REQUIRING_IMMUTABLE_ELEMENTS = { "builtins.frozenset", "builtins.tuple", "typing.AbstractSet", "typing.Sequence", "immutabledict.immutabledict", } MUTABLE_CONTAINER_TYPES = { "builtins.set", "builtins.list", "builtins.dict", } AT_CACHED_MUTABLE_RETURN = ErrorCode( "synapse-@cached-mutable", "@cached() should have an immutable return type", "General", ) def is_cacheable( rt: mypy.types.Type, signature: CallableType, verbose: bool ) -> Tuple[bool, Optional[str]]: """ Check if a particular type is cachable. A type is cachable if it is immutable; for complex types this recurses to check each type parameter. Returns: a 2-tuple (cacheable, message). - cachable: False means the type is definitely not cacheable; true means anything else. - Optional message. """ # This should probably be done via a TypeVisitor. Apologies to the reader! if isinstance(rt, AnyType): return True, ("may be mutable" if verbose else None) elif isinstance(rt, Instance): if ( rt.type.fullname in IMMUTABLE_VALUE_TYPES or rt.type.fullname in IMMUTABLE_CUSTOM_TYPES ): # "Simple" types are generally immutable. return True, None elif rt.type.fullname == "typing.Mapping": # Generally mapping keys are immutable, but they only *have* to be # hashable, which doesn't imply immutability. E.g. Mapping[K, V] # is cachable iff K and V are cachable. return is_cacheable(rt.args[0], signature, verbose) and is_cacheable( rt.args[1], signature, verbose ) elif rt.type.fullname in IMMUTABLE_CONTAINER_TYPES_REQUIRING_IMMUTABLE_ELEMENTS: # E.g. Collection[T] is cachable iff T is cachable. return is_cacheable(rt.args[0], signature, verbose) elif rt.type.fullname in MUTABLE_CONTAINER_TYPES: # Mutable containers are mutable regardless of their underlying type. return False, f"container {rt.type.fullname} is mutable" elif "attrs" in rt.type.metadata: # attrs classes are only cachable iff it is frozen (immutable itself) # and all attributes are cachable. frozen = rt.type.metadata["attrs"]["frozen"] if frozen: for attribute in rt.type.metadata["attrs"]["attributes"]: attribute_name = attribute["name"] symbol_node = rt.type.names[attribute_name].node assert isinstance(symbol_node, Var) assert symbol_node.type is not None ok, note = is_cacheable(symbol_node.type, signature, verbose) if not ok: return False, f"non-frozen attrs property: {attribute_name}" # All attributes were frozen. return True, None else: return False, "non-frozen attrs class" elif rt.type.is_enum: # We assume Enum values are immutable return True, None else: # Ensure we fail for unknown types, these generally means that the # above code is not complete. return ( False, f"Don't know how to handle {rt.type.fullname} return type instance", ) elif isinstance(rt, TypeVarType): # We consider TypeVars immutable if they are bound to a set of immutable # types. if rt.values: for value in rt.values: ok, note = is_cacheable(value, signature, verbose) if not ok: return False, f"TypeVar bound not cacheable {value}" return True, None return False, "TypeVar is unbound" elif isinstance(rt, NoneType): # None is cachable. return True, None elif isinstance(rt, (TupleType, UnionType)): # Tuples and unions are cachable iff all their items are cachable. for item in rt.items: ok, note = is_cacheable(item, signature, verbose) if not ok: return False, note # This discards notes but that's probably fine return True, None elif isinstance(rt, TypeAliasType): # For a type alias, check if the underlying real type is cachable. return is_cacheable(mypy.types.get_proper_type(rt), signature, verbose) elif isinstance(rt, UninhabitedType): # There is no return value, just consider it cachable. This is only used # in tests. return True, None else: # Ensure we fail for unknown types, these generally means that the # above code is not complete. return False, f"Don't know how to handle {type(rt).__qualname__} return type" def plugin(version: str) -> Type[SynapsePlugin]: # This is the entry point of the plugin, and lets us deal with the fact # that the mypy plugin interface is *not* stable by looking at the version # string. # # However, since we pin the version of mypy Synapse uses in CI, we don't # really care. return SynapsePlugin