Compare commits

...

10 Commits

Author SHA1 Message Date
Erik Johnston
a99c692906 Merge branch 'erikj/reduce_size_of_cache' into erikj/merge_cache_prs 2021-04-26 16:35:34 +01:00
Erik Johnston
58b5bbb445 Reduce memory footprint of caches 2021-04-26 16:25:11 +01:00
Erik Johnston
5add13e05d Newsfile 2021-04-26 11:13:08 +01:00
Erik Johnston
2bf93f9b34 Fix 2021-04-26 10:58:04 +01:00
Erik Johnston
bcf8858b67 Don't explode if memory has been twiddled 2021-04-26 10:56:42 +01:00
Erik Johnston
99fb72e63e Move TRACK_MEMORY_USAGE to root 2021-04-26 10:50:15 +01:00
Erik Johnston
567fe5e387 Make TRACK_MEMORY_USAGE configurable 2021-04-26 10:39:54 +01:00
Erik Johnston
0c9bab290f Ignore singletons 2021-04-26 10:29:26 +01:00
Erik Johnston
5003bd29d2 Don't have a global Asizer 2021-04-23 17:16:49 +01:00
Erik Johnston
e9f5812eff Track memory usage of caches 2021-04-23 16:26:10 +01:00
8 changed files with 148 additions and 20 deletions

1
changelog.d/9881.feature Normal file
View File

@@ -0,0 +1 @@
Add experimental option to track memory usage of the caches.

View File

@@ -172,3 +172,6 @@ ignore_missing_imports = True
[mypy-txacme.*]
ignore_missing_imports = True
[mypy-pympler.*]
ignore_missing_imports = True

View File

@@ -454,6 +454,7 @@ def start(config_options):
config.server.update_user_directory = False
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
synapse.util.caches.lrucache.TRACK_MEMORY_USAGE = config.caches.track_memory_usage
hs = GenericWorkerServer(
config.server_name,

View File

@@ -341,6 +341,7 @@ def setup(config_options):
sys.exit(0)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
synapse.util.caches.lrucache.TRACK_MEMORY_USAGE = config.caches.track_memory_usage
hs = SynapseHomeServer(
config.server_name,

View File

@@ -17,6 +17,8 @@ import re
import threading
from typing import Callable, Dict
from synapse.python_dependencies import DependencyException, check_requirements
from ._base import Config, ConfigError
# The prefix for all cache factor-related environment variables
@@ -189,6 +191,15 @@ class CacheConfig(Config):
)
self.cache_factors[cache] = factor
self.track_memory_usage = cache_config.get("track_memory_usage", False)
if self.track_memory_usage:
try:
check_requirements("cache_memory")
except DependencyException as e:
raise ConfigError(
e.message # noqa: B306, DependencyException.message is a property
)
# Resize all caches (if necessary) with the new factors we've loaded
self.resize_all_caches()

View File

@@ -121,6 +121,7 @@ CONDITIONAL_REQUIREMENTS = {
# hiredis is not a *strict* dependency, but it makes things much faster.
# (if it is not installed, we fall back to slow code.)
"redis": ["txredisapi>=1.4.7", "hiredis"],
"cache_memory": ["pympler"],
}
ALL_OPTIONAL_REQUIREMENTS = set() # type: Set[str]

View File

@@ -24,6 +24,11 @@ from synapse.config.cache import add_resizable_cache
logger = logging.getLogger(__name__)
# Whether to track estimated memory usage of the LruCaches.
TRACK_MEMORY_USAGE = False
caches_by_name = {} # type: Dict[str, Sized]
collectors_by_name = {} # type: Dict[str, CacheMetric]
@@ -32,6 +37,11 @@ cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"])
cache_evicted = Gauge("synapse_util_caches_cache:evicted_size", "", ["name"])
cache_total = Gauge("synapse_util_caches_cache:total", "", ["name"])
cache_max_size = Gauge("synapse_util_caches_cache_max_size", "", ["name"])
cache_memory_usage = Gauge(
"synapse_util_caches_cache_memory_usage",
"Estimated size in bytes of the caches",
["name"],
)
response_cache_size = Gauge("synapse_util_caches_response_cache:size", "", ["name"])
response_cache_hits = Gauge("synapse_util_caches_response_cache:hits", "", ["name"])
@@ -52,6 +62,7 @@ class CacheMetric:
hits = attr.ib(default=0)
misses = attr.ib(default=0)
evicted_size = attr.ib(default=0)
memory_usage = attr.ib(default=None)
def inc_hits(self):
self.hits += 1
@@ -62,6 +73,19 @@ class CacheMetric:
def inc_evictions(self, size=1):
self.evicted_size += size
def inc_memory_usage(self, memory: int):
if self.memory_usage is None:
self.memory_usage = 0
self.memory_usage += memory
def dec_memory_usage(self, memory: int):
self.memory_usage -= memory
def clear_memory_usage(self):
if self.memory_usage is not None:
self.memory_usage = 0
def describe(self):
return []
@@ -81,6 +105,8 @@ class CacheMetric:
cache_total.labels(self._cache_name).set(self.hits + self.misses)
if getattr(self._cache, "max_size", None):
cache_max_size.labels(self._cache_name).set(self._cache.max_size)
if self.memory_usage is not None:
cache_memory_usage.labels(self._cache_name).set(self.memory_usage)
if self._collect_callback:
self._collect_callback()
except Exception as e:

View File

@@ -17,8 +17,10 @@ from functools import wraps
from typing import (
Any,
Callable,
Collection,
Generic,
Iterable,
List,
Optional,
Type,
TypeVar,
@@ -30,9 +32,35 @@ from typing import (
from typing_extensions import Literal
from synapse.config import cache as cache_config
from synapse.util.caches import CacheMetric, register_cache
from synapse.util.caches import TRACK_MEMORY_USAGE, CacheMetric, register_cache
from synapse.util.caches.treecache import TreeCache
try:
from pympler.asizeof import Asizer
def _get_size_of(val: Any, *, recurse=True) -> int:
"""Get an estimate of the size in bytes of the object.
Args:
val: The object to size.
recurse: If true will include referenced values in the size,
otherwise only sizes the given object.
"""
# Ignore singleton values when calculating memory usage.
if val in ((), None, ""):
return 0
sizer = Asizer()
sizer.exclude_refs((), None, "")
return sizer.asizeof(val, limit=100 if recurse else 0)
except ImportError:
def _get_size_of(val: Any, *, recurse=True) -> int:
return 0
# Function type: the type used for invalidation callbacks
FT = TypeVar("FT", bound=Callable[..., Any])
@@ -54,16 +82,67 @@ def enumerate_leaves(node, depth):
class _Node:
__slots__ = ["prev_node", "next_node", "key", "value", "callbacks"]
__slots__ = ["prev_node", "next_node", "key", "value", "callbacks", "memory"]
def __init__(
self, prev_node, next_node, key, value, callbacks: Optional[set] = None
self,
prev_node,
next_node,
key,
value,
callbacks: Collection[Callable[[], None]] = (),
):
self.prev_node = prev_node
self.next_node = next_node
self.key = key
self.value = value
self.callbacks = callbacks or set()
self.memory = 0
# Set of callbacks to run when the node gets deleted. We store as a list
# rather than a set to keep memory usage down (and since we expect few
# entries per node the performance of checking for duplication in a list
# vs using a set is negligible).
#
# Note that we store this as an optional list to keep the memory
# footprint down. Empty lists are 56 bytes (and empty sets are 216 bytes).
self.callbacks = None # type: Optional[List[Callable[[], None]]]
self.add_callbacks(callbacks)
if TRACK_MEMORY_USAGE:
self.memory = (
_get_size_of(key)
+ _get_size_of(value)
+ _get_size_of(self.callbacks, recurse=False)
+ _get_size_of(self, recurse=False)
)
self.memory += _get_size_of(self.memory, recurse=False)
def add_callbacks(self, callbacks: Collection[Callable[[], None]]) -> None:
"""Add to stored list of callbacks, removing duplicates."""
if not callbacks:
return
if not self.callbacks:
self.callbacks = []
for callback in callbacks:
if callback not in self.callbacks:
self.callbacks.append(callback)
def run_and_clear_callbacks(self) -> None:
"""Run all callbacks and clear the stored set of callbacks. Used when
the node is being deleted.
"""
if not self.callbacks:
return
for callback in self.callbacks:
callback()
self.callbacks = None
class LruCache(Generic[KT, VT]):
@@ -177,10 +256,10 @@ class LruCache(Generic[KT, VT]):
self.len = synchronized(cache_len)
def add_node(key, value, callbacks: Optional[set] = None):
def add_node(key, value, callbacks: Collection[Callable[[], None]] = ()):
prev_node = list_root
next_node = prev_node.next_node
node = _Node(prev_node, next_node, key, value, callbacks or set())
node = _Node(prev_node, next_node, key, value, callbacks)
prev_node.next_node = node
next_node.prev_node = node
cache[key] = node
@@ -188,6 +267,9 @@ class LruCache(Generic[KT, VT]):
if size_callback:
cached_cache_len[0] += size_callback(node.value)
if TRACK_MEMORY_USAGE and metrics:
metrics.inc_memory_usage(node.memory)
def move_node_to_front(node):
prev_node = node.prev_node
next_node = node.next_node
@@ -211,16 +293,18 @@ class LruCache(Generic[KT, VT]):
deleted_len = size_callback(node.value)
cached_cache_len[0] -= deleted_len
for cb in node.callbacks:
cb()
node.callbacks.clear()
node.run_and_clear_callbacks()
if TRACK_MEMORY_USAGE and metrics:
metrics.dec_memory_usage(node.memory)
return deleted_len
@overload
def cache_get(
key: KT,
default: Literal[None] = None,
callbacks: Iterable[Callable[[], None]] = ...,
callbacks: Collection[Callable[[], None]] = ...,
update_metrics: bool = ...,
) -> Optional[VT]:
...
@@ -229,7 +313,7 @@ class LruCache(Generic[KT, VT]):
def cache_get(
key: KT,
default: T,
callbacks: Iterable[Callable[[], None]] = ...,
callbacks: Collection[Callable[[], None]] = ...,
update_metrics: bool = ...,
) -> Union[T, VT]:
...
@@ -238,13 +322,13 @@ class LruCache(Generic[KT, VT]):
def cache_get(
key: KT,
default: Optional[T] = None,
callbacks: Iterable[Callable[[], None]] = (),
callbacks: Collection[Callable[[], None]] = (),
update_metrics: bool = True,
):
node = cache.get(key, None)
if node is not None:
move_node_to_front(node)
node.callbacks.update(callbacks)
node.add_callbacks(callbacks)
if update_metrics and metrics:
metrics.inc_hits()
return node.value
@@ -260,10 +344,8 @@ class LruCache(Generic[KT, VT]):
# We sometimes store large objects, e.g. dicts, which cause
# the inequality check to take a long time. So let's only do
# the check if we have some callbacks to call.
if node.callbacks and value != node.value:
for cb in node.callbacks:
cb()
node.callbacks.clear()
if value != node.value:
node.run_and_clear_callbacks()
# We don't bother to protect this by value != node.value as
# generally size_callback will be cheap compared with equality
@@ -273,7 +355,7 @@ class LruCache(Generic[KT, VT]):
cached_cache_len[0] -= size_callback(node.value)
cached_cache_len[0] += size_callback(value)
node.callbacks.update(callbacks)
node.add_callbacks(callbacks)
move_node_to_front(node)
node.value = value
@@ -326,12 +408,14 @@ class LruCache(Generic[KT, VT]):
list_root.next_node = list_root
list_root.prev_node = list_root
for node in cache.values():
for cb in node.callbacks:
cb()
node.run_and_clear_callbacks()
cache.clear()
if size_callback:
cached_cache_len[0] = 0
if TRACK_MEMORY_USAGE and metrics:
metrics.clear_memory_usage()
@synchronized
def cache_contains(key: KT) -> bool:
return key in cache