mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Compare commits
26 Commits
anoa/docs_
...
erikj/cach
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e2b2decd11 | ||
|
|
d088695f15 | ||
|
|
0fddb9aacd | ||
|
|
b05667b610 | ||
|
|
8d308066db | ||
|
|
2bb036eda7 | ||
|
|
5deb349a7f | ||
|
|
2e78ef4d22 | ||
|
|
0f82904fdd | ||
|
|
f2e61d3cc1 | ||
|
|
3d6c982c41 | ||
|
|
bebb7f0f60 | ||
|
|
d6b8e471cf | ||
|
|
e73881a439 | ||
|
|
51a728ec24 | ||
|
|
acd2778d61 | ||
|
|
ffa6e96b5f | ||
|
|
a1dfe34d86 | ||
|
|
c232e16d23 | ||
|
|
ee36be5eef | ||
|
|
b3e99c25bf | ||
|
|
f83ad8dd2d | ||
|
|
915163f72f | ||
|
|
9596641e1d | ||
|
|
c4d468b69f | ||
|
|
f38350fdda |
@@ -614,3 +614,55 @@ __all__ = [
|
||||
"InFlightGauge",
|
||||
"BucketCollector",
|
||||
]
|
||||
|
||||
|
||||
try:
|
||||
import ctypes
|
||||
import ctypes.util
|
||||
|
||||
jemalloc = ctypes.CDLL(ctypes.util.find_library("jemalloc"))
|
||||
|
||||
def get_val(name):
|
||||
allocated = ctypes.c_size_t(0)
|
||||
allocated_len = ctypes.c_size_t(ctypes.sizeof(allocated))
|
||||
jemalloc.mallctl(
|
||||
name.encode("ascii"),
|
||||
ctypes.byref(allocated),
|
||||
ctypes.byref(allocated_len),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
return allocated.value
|
||||
|
||||
def refresh_stats():
|
||||
epoch = ctypes.c_uint64(0)
|
||||
jemalloc.mallctl(
|
||||
b"epoch", None, None, ctypes.byref(epoch), ctypes.sizeof(epoch)
|
||||
)
|
||||
|
||||
class JemallocCollector(object):
|
||||
def collect(self):
|
||||
refresh_stats()
|
||||
|
||||
g = GaugeMetricFamily(
|
||||
"jemalloc_stats_app_memory",
|
||||
"",
|
||||
labels=["type"],
|
||||
)
|
||||
for t in (
|
||||
"allocated",
|
||||
"active",
|
||||
"resident",
|
||||
"mapped",
|
||||
"retained",
|
||||
"metadata",
|
||||
):
|
||||
g.add_metric([t], value=get_val(f"stats.{t}"))
|
||||
|
||||
yield g
|
||||
|
||||
REGISTRY.register(JemallocCollector())
|
||||
|
||||
|
||||
except Exception:
|
||||
logger.exception("Failed to start jemalloc metrics")
|
||||
|
||||
@@ -345,6 +345,15 @@ class RulesForRoom:
|
||||
# to self around in the callback.
|
||||
self.invalidate_all_cb = _Invalidation(rules_for_room_cache, room_id)
|
||||
|
||||
def get_data_for_memory_size(self):
|
||||
return (
|
||||
self.member_map,
|
||||
self.rules_by_user,
|
||||
self.state_group,
|
||||
self.sequence,
|
||||
self.uninteresting_user_set,
|
||||
)
|
||||
|
||||
async def get_rules(
|
||||
self, event: EventBase, context: EventContext
|
||||
) -> Dict[str, List[Dict[str, dict]]]:
|
||||
|
||||
@@ -1043,6 +1043,9 @@ class _JoinedHostsCache:
|
||||
|
||||
self._len = 0
|
||||
|
||||
def get_data_for_memory_size(self):
|
||||
return (self.hosts_to_joined_users, self.state_group)
|
||||
|
||||
async def get_destinations(self, state_entry: "_StateCacheEntry") -> Set[str]:
|
||||
"""Get set of destinations for a state entry
|
||||
|
||||
|
||||
@@ -231,8 +231,8 @@ class DomainSpecificString(
|
||||
# Deny iteration because it will bite you if you try to create a singleton
|
||||
# set by:
|
||||
# users = set(user)
|
||||
def __iter__(self):
|
||||
raise ValueError("Attempted to iterate a %s" % (type(self).__name__,))
|
||||
# def __iter__(self):
|
||||
# raise ValueError("Attempted to iterate a %s" % (type(self).__name__,))
|
||||
|
||||
# Because this class is a namedtuple of strings and booleans, it is deeply
|
||||
# immutable.
|
||||
|
||||
@@ -33,6 +33,7 @@ cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"])
|
||||
cache_evicted = Gauge("synapse_util_caches_cache:evicted_size", "", ["name"])
|
||||
cache_total = Gauge("synapse_util_caches_cache:total", "", ["name"])
|
||||
cache_max_size = Gauge("synapse_util_caches_cache_max_size", "", ["name"])
|
||||
cache_memory_usage = Gauge("synapse_util_caches_cache_memory_usage", "", ["name"])
|
||||
|
||||
response_cache_size = Gauge("synapse_util_caches_response_cache:size", "", ["name"])
|
||||
response_cache_hits = Gauge("synapse_util_caches_response_cache:hits", "", ["name"])
|
||||
@@ -53,6 +54,7 @@ class CacheMetric:
|
||||
hits = attr.ib(default=0)
|
||||
misses = attr.ib(default=0)
|
||||
evicted_size = attr.ib(default=0)
|
||||
memory_usage = attr.ib(default=0)
|
||||
|
||||
def inc_hits(self):
|
||||
self.hits += 1
|
||||
@@ -80,6 +82,7 @@ class CacheMetric:
|
||||
cache_hits.labels(self._cache_name).set(self.hits)
|
||||
cache_evicted.labels(self._cache_name).set(self.evicted_size)
|
||||
cache_total.labels(self._cache_name).set(self.hits + self.misses)
|
||||
cache_memory_usage.labels(self._cache_name).set(self.memory_usage)
|
||||
if getattr(self._cache, "max_size", None):
|
||||
cache_max_size.labels(self._cache_name).set(self._cache.max_size)
|
||||
if self._collect_callback:
|
||||
|
||||
@@ -27,6 +27,7 @@ from typing import (
|
||||
cast,
|
||||
overload,
|
||||
)
|
||||
import random
|
||||
|
||||
from typing_extensions import Literal
|
||||
|
||||
@@ -34,6 +35,11 @@ from synapse.config import cache as cache_config
|
||||
from synapse.util.caches import CacheMetric, register_cache
|
||||
from synapse.util.caches.treecache import TreeCache
|
||||
|
||||
try:
|
||||
from pympler import asizeof
|
||||
except ImportError:
|
||||
asizeof = None
|
||||
|
||||
# Function type: the type used for invalidation callbacks
|
||||
FT = TypeVar("FT", bound=Callable[..., Any])
|
||||
|
||||
@@ -55,15 +61,34 @@ def enumerate_leaves(node, depth):
|
||||
|
||||
|
||||
class _Node:
|
||||
__slots__ = ["prev_node", "next_node", "key", "value", "callbacks"]
|
||||
__slots__ = [
|
||||
"prev_node",
|
||||
"next_node",
|
||||
"key",
|
||||
"value",
|
||||
"callbacks",
|
||||
"memory",
|
||||
"allocated_ts",
|
||||
]
|
||||
|
||||
def __init__(self, prev_node, next_node, key, value, callbacks=set()):
|
||||
self.prev_node = prev_node
|
||||
self.next_node = next_node
|
||||
def __init__(self, prev_node, next_node, key, value, allocated_ts, callbacks=set()):
|
||||
self.key = key
|
||||
self.value = value
|
||||
self.callbacks = callbacks
|
||||
|
||||
if asizeof:
|
||||
data = getattr(value, "get_data_for_memory_size", None)
|
||||
if data:
|
||||
self.memory = asizeof.asizeof(key) + asizeof.asizeof(data)
|
||||
else:
|
||||
self.memory = asizeof.asizeof(key) + asizeof.asizeof(value)
|
||||
else:
|
||||
self.memory = 0
|
||||
|
||||
self.prev_node = prev_node
|
||||
self.next_node = next_node
|
||||
self.allocated_ts = allocated_ts
|
||||
|
||||
|
||||
class LruCache(Generic[KT, VT]):
|
||||
"""
|
||||
@@ -82,6 +107,7 @@ class LruCache(Generic[KT, VT]):
|
||||
size_callback: Optional[Callable] = None,
|
||||
metrics_collection_callback: Optional[Callable[[], None]] = None,
|
||||
apply_cache_factor_from_config: bool = True,
|
||||
reactor=None,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
@@ -110,6 +136,11 @@ class LruCache(Generic[KT, VT]):
|
||||
apply_cache_factor_from_config (bool): If true, `max_size` will be
|
||||
multiplied by a cache factor derived from the homeserver config
|
||||
"""
|
||||
if reactor is None:
|
||||
from twisted.internet import reactor as _reactor
|
||||
|
||||
reactor = _reactor
|
||||
|
||||
cache = cache_type()
|
||||
self.cache = cache # Used for introspection.
|
||||
self.apply_cache_factor_from_config = apply_cache_factor_from_config
|
||||
@@ -141,13 +172,14 @@ class LruCache(Generic[KT, VT]):
|
||||
# this is exposed for access from outside this class
|
||||
self.metrics = metrics
|
||||
|
||||
list_root = _Node(None, None, None, None)
|
||||
list_root = _Node(None, None, None, None, -1)
|
||||
list_root.next_node = list_root
|
||||
list_root.prev_node = list_root
|
||||
|
||||
lock = threading.Lock()
|
||||
|
||||
def evict():
|
||||
ten_minutes_ago = int(reactor.seconds()) - 10 * 60
|
||||
while cache_len() > self.max_size:
|
||||
todelete = list_root.prev_node
|
||||
evicted_len = delete_node(todelete)
|
||||
@@ -155,6 +187,24 @@ class LruCache(Generic[KT, VT]):
|
||||
if metrics:
|
||||
metrics.inc_evictions(evicted_len)
|
||||
|
||||
todelete = list_root.prev_node
|
||||
while 0 < todelete.allocated_ts < ten_minutes_ago + 60:
|
||||
if list_root == todelete:
|
||||
break
|
||||
|
||||
if ten_minutes_ago < todelete.allocated_ts:
|
||||
todelete = todelete.prev_node
|
||||
continue
|
||||
|
||||
next_todelete = todelete.prev_node
|
||||
|
||||
evicted_len = delete_node(todelete)
|
||||
cache.pop(todelete.key, None)
|
||||
if metrics:
|
||||
metrics.inc_evictions(evicted_len)
|
||||
|
||||
todelete = next_todelete
|
||||
|
||||
def synchronized(f: FT) -> FT:
|
||||
@wraps(f)
|
||||
def inner(*args, **kwargs):
|
||||
@@ -179,11 +229,18 @@ class LruCache(Generic[KT, VT]):
|
||||
def add_node(key, value, callbacks=set()):
|
||||
prev_node = list_root
|
||||
next_node = prev_node.next_node
|
||||
node = _Node(prev_node, next_node, key, value, callbacks)
|
||||
|
||||
ts = int(reactor.seconds()) + random.randint(-60, 60)
|
||||
print("Allocating at", ts)
|
||||
|
||||
node = _Node(prev_node, next_node, key, value, ts, callbacks)
|
||||
prev_node.next_node = node
|
||||
next_node.prev_node = node
|
||||
cache[key] = node
|
||||
|
||||
if metrics:
|
||||
metrics.memory_usage += node.memory
|
||||
|
||||
if size_callback:
|
||||
cached_cache_len[0] += size_callback(node.value)
|
||||
|
||||
@@ -205,6 +262,9 @@ class LruCache(Generic[KT, VT]):
|
||||
prev_node.next_node = next_node
|
||||
next_node.prev_node = prev_node
|
||||
|
||||
if metrics:
|
||||
metrics.memory_usage -= node.memory
|
||||
|
||||
deleted_len = 1
|
||||
if size_callback:
|
||||
deleted_len = size_callback(node.value)
|
||||
@@ -242,8 +302,13 @@ class LruCache(Generic[KT, VT]):
|
||||
):
|
||||
node = cache.get(key, None)
|
||||
if node is not None:
|
||||
move_node_to_front(node)
|
||||
node.callbacks.update(callbacks)
|
||||
ten_minutes_ago = int(reactor.seconds()) - 10 * 60
|
||||
if 0 < node.allocated_ts < ten_minutes_ago:
|
||||
delete_node(node)
|
||||
cache.pop(node.key, None)
|
||||
else:
|
||||
move_node_to_front(node)
|
||||
node.callbacks.update(callbacks)
|
||||
if update_metrics and metrics:
|
||||
metrics.inc_hits()
|
||||
return node.value
|
||||
|
||||
@@ -30,6 +30,23 @@ class LruCacheTestCase(unittest.HomeserverTestCase):
|
||||
self.assertEquals(cache.get("key"), "value")
|
||||
self.assertEquals(cache["key"], "value")
|
||||
|
||||
def test_time_evict(self):
|
||||
self.reactor.advance(100 * 60)
|
||||
|
||||
cache = LruCache(100, reactor=self.reactor)
|
||||
cache["key"] = "value"
|
||||
cache["key2"] = "value2"
|
||||
|
||||
cache._on_resize()
|
||||
self.assertEquals(cache.get("key"), "value")
|
||||
|
||||
self.reactor.advance(20 * 60)
|
||||
|
||||
print(self.reactor.seconds())
|
||||
|
||||
cache._on_resize()
|
||||
self.assertEquals(cache.get("key"), None)
|
||||
|
||||
def test_eviction(self):
|
||||
cache = LruCache(2)
|
||||
cache[1] = 1
|
||||
|
||||
Reference in New Issue
Block a user