Compare commits

...

26 Commits

Author SHA1 Message Date
Erik Johnston
e2b2decd11 Fix 2021-04-02 11:28:51 +01:00
Erik Johnston
d088695f15 fix 2021-04-02 11:20:07 +01:00
Erik Johnston
0fddb9aacd fix 2021-04-02 11:10:25 +01:00
Erik Johnston
b05667b610 Don't limit 2021-04-02 09:55:47 +01:00
Erik Johnston
8d308066db fixup 2021-04-01 18:02:06 +01:00
Erik Johnston
2bb036eda7 fixup 2021-04-01 17:46:14 +01:00
Erik Johnston
5deb349a7f Fixup 2021-04-01 17:33:24 +01:00
Erik Johnston
2e78ef4d22 Smear 2021-04-01 17:13:55 +01:00
Erik Johnston
0f82904fdd fixup 2021-04-01 16:55:59 +01:00
Erik Johnston
f2e61d3cc1 Only evict 100 at once 2021-04-01 16:50:54 +01:00
Erik Johnston
3d6c982c41 Randomise 2021-04-01 16:50:08 +01:00
Erik Johnston
bebb7f0f60 Merge remote-tracking branch 'origin/develop' into erikj/cache_memory_usage 2021-04-01 16:33:32 +01:00
Erik Johnston
d6b8e471cf Time out caches after ten minutes 2021-04-01 16:33:23 +01:00
Erik Johnston
e73881a439 Add metadata type 2021-03-29 18:54:38 +01:00
Erik Johnston
51a728ec24 Fixup 2021-03-29 18:35:57 +01:00
Erik Johnston
acd2778d61 Fixup 2021-03-29 18:34:21 +01:00
Erik Johnston
ffa6e96b5f Fix 2021-03-29 18:31:49 +01:00
Erik Johnston
a1dfe34d86 Log errors 2021-03-29 18:29:25 +01:00
Erik Johnston
c232e16d23 Export jemalloc stats 2021-03-29 18:27:28 +01:00
Erik Johnston
ee36be5eef Merge remote-tracking branch 'origin/develop' into erikj/cache_memory_usage 2021-03-29 14:24:11 +01:00
Erik Johnston
b3e99c25bf Handle RulesForRoom and _JoinedHostsCache 2021-03-29 14:23:28 +01:00
Erik Johnston
f83ad8dd2d Fixup 2021-03-29 11:12:00 +01:00
Erik Johnston
915163f72f Ignore _JoinedHostsCache as it includes DataStore 2021-03-29 11:02:10 +01:00
Erik Johnston
9596641e1d Fix 2021-03-26 17:51:11 +00:00
Erik Johnston
c4d468b69f Don't ban __iter__ 2021-03-26 17:45:36 +00:00
Erik Johnston
f38350fdda Report cache memory usage 2021-03-26 17:38:50 +00:00
7 changed files with 159 additions and 10 deletions

View File

@@ -614,3 +614,55 @@ __all__ = [
"InFlightGauge",
"BucketCollector",
]
try:
import ctypes
import ctypes.util
jemalloc = ctypes.CDLL(ctypes.util.find_library("jemalloc"))
def get_val(name):
allocated = ctypes.c_size_t(0)
allocated_len = ctypes.c_size_t(ctypes.sizeof(allocated))
jemalloc.mallctl(
name.encode("ascii"),
ctypes.byref(allocated),
ctypes.byref(allocated_len),
None,
None,
)
return allocated.value
def refresh_stats():
epoch = ctypes.c_uint64(0)
jemalloc.mallctl(
b"epoch", None, None, ctypes.byref(epoch), ctypes.sizeof(epoch)
)
class JemallocCollector(object):
def collect(self):
refresh_stats()
g = GaugeMetricFamily(
"jemalloc_stats_app_memory",
"",
labels=["type"],
)
for t in (
"allocated",
"active",
"resident",
"mapped",
"retained",
"metadata",
):
g.add_metric([t], value=get_val(f"stats.{t}"))
yield g
REGISTRY.register(JemallocCollector())
except Exception:
logger.exception("Failed to start jemalloc metrics")

View File

@@ -345,6 +345,15 @@ class RulesForRoom:
# to self around in the callback.
self.invalidate_all_cb = _Invalidation(rules_for_room_cache, room_id)
def get_data_for_memory_size(self):
return (
self.member_map,
self.rules_by_user,
self.state_group,
self.sequence,
self.uninteresting_user_set,
)
async def get_rules(
self, event: EventBase, context: EventContext
) -> Dict[str, List[Dict[str, dict]]]:

View File

@@ -1043,6 +1043,9 @@ class _JoinedHostsCache:
self._len = 0
def get_data_for_memory_size(self):
return (self.hosts_to_joined_users, self.state_group)
async def get_destinations(self, state_entry: "_StateCacheEntry") -> Set[str]:
"""Get set of destinations for a state entry

View File

@@ -231,8 +231,8 @@ class DomainSpecificString(
# Deny iteration because it will bite you if you try to create a singleton
# set by:
# users = set(user)
def __iter__(self):
raise ValueError("Attempted to iterate a %s" % (type(self).__name__,))
# def __iter__(self):
# raise ValueError("Attempted to iterate a %s" % (type(self).__name__,))
# Because this class is a namedtuple of strings and booleans, it is deeply
# immutable.

View File

@@ -33,6 +33,7 @@ cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"])
cache_evicted = Gauge("synapse_util_caches_cache:evicted_size", "", ["name"])
cache_total = Gauge("synapse_util_caches_cache:total", "", ["name"])
cache_max_size = Gauge("synapse_util_caches_cache_max_size", "", ["name"])
cache_memory_usage = Gauge("synapse_util_caches_cache_memory_usage", "", ["name"])
response_cache_size = Gauge("synapse_util_caches_response_cache:size", "", ["name"])
response_cache_hits = Gauge("synapse_util_caches_response_cache:hits", "", ["name"])
@@ -53,6 +54,7 @@ class CacheMetric:
hits = attr.ib(default=0)
misses = attr.ib(default=0)
evicted_size = attr.ib(default=0)
memory_usage = attr.ib(default=0)
def inc_hits(self):
self.hits += 1
@@ -80,6 +82,7 @@ class CacheMetric:
cache_hits.labels(self._cache_name).set(self.hits)
cache_evicted.labels(self._cache_name).set(self.evicted_size)
cache_total.labels(self._cache_name).set(self.hits + self.misses)
cache_memory_usage.labels(self._cache_name).set(self.memory_usage)
if getattr(self._cache, "max_size", None):
cache_max_size.labels(self._cache_name).set(self._cache.max_size)
if self._collect_callback:

View File

@@ -27,6 +27,7 @@ from typing import (
cast,
overload,
)
import random
from typing_extensions import Literal
@@ -34,6 +35,11 @@ from synapse.config import cache as cache_config
from synapse.util.caches import CacheMetric, register_cache
from synapse.util.caches.treecache import TreeCache
try:
from pympler import asizeof
except ImportError:
asizeof = None
# Function type: the type used for invalidation callbacks
FT = TypeVar("FT", bound=Callable[..., Any])
@@ -55,15 +61,34 @@ def enumerate_leaves(node, depth):
class _Node:
__slots__ = ["prev_node", "next_node", "key", "value", "callbacks"]
__slots__ = [
"prev_node",
"next_node",
"key",
"value",
"callbacks",
"memory",
"allocated_ts",
]
def __init__(self, prev_node, next_node, key, value, callbacks=set()):
self.prev_node = prev_node
self.next_node = next_node
def __init__(self, prev_node, next_node, key, value, allocated_ts, callbacks=set()):
self.key = key
self.value = value
self.callbacks = callbacks
if asizeof:
data = getattr(value, "get_data_for_memory_size", None)
if data:
self.memory = asizeof.asizeof(key) + asizeof.asizeof(data)
else:
self.memory = asizeof.asizeof(key) + asizeof.asizeof(value)
else:
self.memory = 0
self.prev_node = prev_node
self.next_node = next_node
self.allocated_ts = allocated_ts
class LruCache(Generic[KT, VT]):
"""
@@ -82,6 +107,7 @@ class LruCache(Generic[KT, VT]):
size_callback: Optional[Callable] = None,
metrics_collection_callback: Optional[Callable[[], None]] = None,
apply_cache_factor_from_config: bool = True,
reactor=None,
):
"""
Args:
@@ -110,6 +136,11 @@ class LruCache(Generic[KT, VT]):
apply_cache_factor_from_config (bool): If true, `max_size` will be
multiplied by a cache factor derived from the homeserver config
"""
if reactor is None:
from twisted.internet import reactor as _reactor
reactor = _reactor
cache = cache_type()
self.cache = cache # Used for introspection.
self.apply_cache_factor_from_config = apply_cache_factor_from_config
@@ -141,13 +172,14 @@ class LruCache(Generic[KT, VT]):
# this is exposed for access from outside this class
self.metrics = metrics
list_root = _Node(None, None, None, None)
list_root = _Node(None, None, None, None, -1)
list_root.next_node = list_root
list_root.prev_node = list_root
lock = threading.Lock()
def evict():
ten_minutes_ago = int(reactor.seconds()) - 10 * 60
while cache_len() > self.max_size:
todelete = list_root.prev_node
evicted_len = delete_node(todelete)
@@ -155,6 +187,24 @@ class LruCache(Generic[KT, VT]):
if metrics:
metrics.inc_evictions(evicted_len)
todelete = list_root.prev_node
while 0 < todelete.allocated_ts < ten_minutes_ago + 60:
if list_root == todelete:
break
if ten_minutes_ago < todelete.allocated_ts:
todelete = todelete.prev_node
continue
next_todelete = todelete.prev_node
evicted_len = delete_node(todelete)
cache.pop(todelete.key, None)
if metrics:
metrics.inc_evictions(evicted_len)
todelete = next_todelete
def synchronized(f: FT) -> FT:
@wraps(f)
def inner(*args, **kwargs):
@@ -179,11 +229,18 @@ class LruCache(Generic[KT, VT]):
def add_node(key, value, callbacks=set()):
prev_node = list_root
next_node = prev_node.next_node
node = _Node(prev_node, next_node, key, value, callbacks)
ts = int(reactor.seconds()) + random.randint(-60, 60)
print("Allocating at", ts)
node = _Node(prev_node, next_node, key, value, ts, callbacks)
prev_node.next_node = node
next_node.prev_node = node
cache[key] = node
if metrics:
metrics.memory_usage += node.memory
if size_callback:
cached_cache_len[0] += size_callback(node.value)
@@ -205,6 +262,9 @@ class LruCache(Generic[KT, VT]):
prev_node.next_node = next_node
next_node.prev_node = prev_node
if metrics:
metrics.memory_usage -= node.memory
deleted_len = 1
if size_callback:
deleted_len = size_callback(node.value)
@@ -242,8 +302,13 @@ class LruCache(Generic[KT, VT]):
):
node = cache.get(key, None)
if node is not None:
move_node_to_front(node)
node.callbacks.update(callbacks)
ten_minutes_ago = int(reactor.seconds()) - 10 * 60
if 0 < node.allocated_ts < ten_minutes_ago:
delete_node(node)
cache.pop(node.key, None)
else:
move_node_to_front(node)
node.callbacks.update(callbacks)
if update_metrics and metrics:
metrics.inc_hits()
return node.value

View File

@@ -30,6 +30,23 @@ class LruCacheTestCase(unittest.HomeserverTestCase):
self.assertEquals(cache.get("key"), "value")
self.assertEquals(cache["key"], "value")
def test_time_evict(self):
self.reactor.advance(100 * 60)
cache = LruCache(100, reactor=self.reactor)
cache["key"] = "value"
cache["key2"] = "value2"
cache._on_resize()
self.assertEquals(cache.get("key"), "value")
self.reactor.advance(20 * 60)
print(self.reactor.seconds())
cache._on_resize()
self.assertEquals(cache.get("key"), None)
def test_eviction(self):
cache = LruCache(2)
cache[1] = 1