Compare commits

...

13 Commits

Author SHA1 Message Date
Erik Johnston
5de571987e WIP: Type TreeCache 2022-07-18 13:05:07 +01:00
Erik Johnston
129691f190 Comment TreeCache 2022-07-18 10:49:12 +01:00
Erik Johnston
462db2a171 Comment LruCache 2022-07-18 10:48:24 +01:00
Erik Johnston
7f7b36d56d Comment LruCache 2022-07-18 10:40:36 +01:00
Erik Johnston
057ae8b61c Comments 2022-07-17 11:34:34 +01:00
Erik Johnston
7aceec3ed9 Fix up 2022-07-15 16:52:16 +01:00
Erik Johnston
cad555f07c Better stuff 2022-07-15 16:31:53 +01:00
Erik Johnston
23c2f394a5 Fix mypy 2022-07-15 16:30:56 +01:00
Erik Johnston
602a81f5a2 don't update access 2022-07-15 16:24:10 +01:00
Erik Johnston
f046366d2a Fix test 2022-07-15 15:54:01 +01:00
Erik Johnston
a22716c5c5 Fix literal 2022-07-15 15:31:57 +01:00
Erik Johnston
40a8fba5f6 Newsfile 2022-07-15 15:27:03 +01:00
Erik Johnston
326a175987 Make DictionaryCache have better expiry properties 2022-07-15 15:26:02 +01:00
6 changed files with 372 additions and 73 deletions

1
changelog.d/13292.misc Normal file
View File

@@ -0,0 +1 @@
Make `DictionaryCache` expire full entries if they haven't been queried in a while, even if specific keys have been queried recently.

View File

@@ -202,7 +202,14 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
requests state from the cache, if False we need to query the DB for the
missing state.
"""
cache_entry = cache.get(group)
# If we are asked explicitly for a subset of keys, we only ask for those
# from the cache. This ensures that the `DictionaryCache` can make
# better decisions about what to cache and what to expire.
dict_keys = None
if not state_filter.has_wildcards():
dict_keys = state_filter.concrete_types()
cache_entry = cache.get(group, dict_keys=dict_keys)
state_dict_ids = cache_entry.value
if cache_entry.full or state_filter.is_full():

View File

@@ -14,11 +14,13 @@
import enum
import logging
import threading
from typing import Any, Dict, Generic, Iterable, Optional, Set, TypeVar
from typing import Any, Dict, Generic, Iterable, Optional, Set, Tuple, TypeVar, Union
import attr
from typing_extensions import Literal
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_items
logger = logging.getLogger(__name__)
@@ -53,20 +55,67 @@ class DictionaryEntry: # should be: Generic[DKT, DV].
return len(self.value)
class _FullCacheKey(enum.Enum):
"""The key we use to cache the full dict."""
KEY = object()
class _Sentinel(enum.Enum):
# defining a sentinel in this way allows mypy to correctly handle the
# type of a dictionary lookup.
sentinel = object()
class _PerKeyValue(Generic[DV]):
"""The cached value of a dictionary key. If `value` is the sentinel,
indicates that the requested key is known to *not* be in the full dict.
"""
__slots__ = ["value"]
def __init__(self, value: Union[DV, Literal[_Sentinel.sentinel]]) -> None:
self.value = value
def __len__(self) -> int:
# We add a `__len__` implementation as we use this class in a cache
# where the values are variable length.
return 1
class DictionaryCache(Generic[KT, DKT, DV]):
"""Caches key -> dictionary lookups, supporting caching partial dicts, i.e.
fetching a subset of dictionary keys for a particular key.
"""
def __init__(self, name: str, max_entries: int = 1000):
self.cache: LruCache[KT, DictionaryEntry] = LruCache(
max_size=max_entries, cache_name=name, size_callback=len
# We use a single cache to cache two different types of entries:
# 1. Map from (key, dict_key) -> dict value (or sentinel, indicating
# the key doesn't exist in the dict); and
# 2. Map from (key, _FullCacheKey.KEY) -> full dict.
#
# The former is used when explicit keys of the dictionary are looked up,
# and the latter when the full dictionary is requested.
#
# If when explicit keys are requested and not in the cache, we then look
# to see if we have the full dict and use that if we do. If found in the
# full dict each key is added into the cache.
#
# This set up allows the `LruCache` to prune the full dict entries if
# they haven't been used in a while, even when there have been recent
# queries for subsets of the dict.
#
# Typing:
# * A key of `(KT, DKT)` has a value of `_PerKeyValue`
# * A key of `(KT, _FullCacheKey.KEY)` has a value of `Dict[DKT, DV]`
self.cache: LruCache[
Tuple[KT, Union[DKT, Literal[_FullCacheKey.KEY]]],
Union[_PerKeyValue, Dict[DKT, DV]],
] = LruCache(
max_size=max_entries,
cache_name=name,
cache_type=TreeCache,
size_callback=len,
)
self.name = name
@@ -96,20 +145,97 @@ class DictionaryCache(Generic[KT, DKT, DV]):
Returns:
DictionaryEntry
"""
entry = self.cache.get(key, _Sentinel.sentinel)
if entry is not _Sentinel.sentinel:
if dict_keys is None:
return DictionaryEntry(
entry.full, entry.known_absent, dict(entry.value)
)
else:
return DictionaryEntry(
entry.full,
entry.known_absent,
{k: entry.value[k] for k in dict_keys if k in entry.value},
)
return DictionaryEntry(False, set(), {})
if dict_keys is None:
# First we check if we have cached the full dict.
entry = self.cache.get((key, _FullCacheKey.KEY), _Sentinel.sentinel)
if entry is not _Sentinel.sentinel:
assert isinstance(entry, dict)
return DictionaryEntry(True, set(), entry)
# If not, check if we have cached any of dict keys.
all_entries = self.cache.get_multi(
(key,),
_Sentinel.sentinel,
)
if all_entries is _Sentinel.sentinel:
return DictionaryEntry(False, set(), {})
# If there are entries we need to unwrap the returned cache nodes
# and `_PerKeyValue` into the `DictionaryEntry`.
values = {}
known_absent = set()
for dict_key, dict_value in iterate_tree_cache_items((), all_entries):
dict_key = dict_key[0]
dict_value = dict_value.value
# We have explicitly looked for a full cache key, so we
# shouldn't see one.
assert dict_key != _FullCacheKey.KEY
# ... therefore the values must be `_PerKeyValue`
assert isinstance(dict_value, _PerKeyValue)
if dict_value.value is _Sentinel.sentinel:
known_absent.add(dict_key)
else:
values[dict_key] = dict_value.value
return DictionaryEntry(False, known_absent, values)
# We are being asked for a subset of keys.
# First got and check for each requested dict key in the cache, tracking
# which we couldn't find.
values = {}
known_absent = set()
missing = set()
for dict_key in dict_keys:
entry = self.cache.get((key, dict_key), _Sentinel.sentinel)
if entry is _Sentinel.sentinel:
missing.add(dict_key)
continue
assert isinstance(entry, _PerKeyValue)
if entry.value is _Sentinel.sentinel:
known_absent.add(dict_key)
else:
values[dict_key] = entry.value
# If we found everything we can return immediately.
if not missing:
return DictionaryEntry(False, known_absent, values)
# If we are missing any keys check if we happen to have the full dict in
# the cache.
#
# We don't update the last access time for this cache fetch, as we
# aren't explicitly interested in the full dict and so we don't want
# requests for explicit dict keys to keep the full dict in the cache.
entry = self.cache.get(
(key, _FullCacheKey.KEY),
_Sentinel.sentinel,
update_last_access=False,
)
if entry is _Sentinel.sentinel:
# Not in the cache, return the subset of keys we found.
return DictionaryEntry(False, known_absent, values)
# We have the full dict!
assert isinstance(entry, dict)
values = {}
for dict_key in dict_keys:
# We explicitly add each dict key to the cache, so that cache hit
# rates for each key can be tracked separately.
value = entry.get(dict_key, _Sentinel.sentinel) # type: ignore[arg-type]
self.cache[(key, dict_key)] = _PerKeyValue(value)
if value is not _Sentinel.sentinel:
values[dict_key] = value
return DictionaryEntry(True, set(), values)
def invalidate(self, key: KT) -> None:
self.check_thread()
@@ -117,7 +243,9 @@ class DictionaryCache(Generic[KT, DKT, DV]):
# Increment the sequence number so that any SELECT statements that
# raced with the INSERT don't update the cache (SYN-369)
self.sequence += 1
self.cache.pop(key, None)
# Del-multi accepts truncated tuples.
self.cache.del_multi((key,)) # type: ignore[arg-type]
def invalidate_all(self) -> None:
self.check_thread()
@@ -149,20 +277,27 @@ class DictionaryCache(Generic[KT, DKT, DV]):
# Only update the cache if the caches sequence number matches the
# number that the cache had before the SELECT was started (SYN-369)
if fetched_keys is None:
self._insert(key, value, set())
self.cache[(key, _FullCacheKey.KEY)] = value
else:
self._update_or_insert(key, value, fetched_keys)
self._update_subset(key, value, fetched_keys)
def _update_or_insert(
self, key: KT, value: Dict[DKT, DV], known_absent: Iterable[DKT]
def _update_subset(
self, key: KT, value: Dict[DKT, DV], fetched_keys: Iterable[DKT]
) -> None:
# We pop and reinsert as we need to tell the cache the size may have
# changed
"""Add the given dictionary values as explicit keys in the cache.
entry: DictionaryEntry = self.cache.pop(key, DictionaryEntry(False, set(), {}))
entry.value.update(value)
entry.known_absent.update(known_absent)
self.cache[key] = entry
Args:
key
value: The dictionary with all the values that we should cache
fetched_keys: The full set of keys that were looked up, any keys
here not in `value` should be marked as "known absent".
"""
def _insert(self, key: KT, value: Dict[DKT, DV], known_absent: Set[DKT]) -> None:
self.cache[key] = DictionaryEntry(True, known_absent, value)
for dict_key, dict_value in value.items():
self.cache[(key, dict_key)] = _PerKeyValue(dict_value)
for dict_key in fetched_keys:
if (key, dict_key) in self.cache:
continue
self.cache[(key, dict_key)] = _PerKeyValue(_Sentinel.sentinel)

View File

@@ -44,7 +44,11 @@ from synapse.metrics.background_process_metrics import wrap_as_background_proces
from synapse.metrics.jemalloc import get_jemalloc_stats
from synapse.util import Clock, caches
from synapse.util.caches import CacheMetric, EvictionReason, register_cache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
from synapse.util.caches.treecache import (
TreeCache,
TreeCacheNode,
iterate_tree_cache_entry,
)
from synapse.util.linked_list import ListNode
if TYPE_CHECKING:
@@ -413,7 +417,7 @@ class LruCache(Generic[KT, VT]):
else:
real_clock = clock
cache: Union[Dict[KT, _Node[KT, VT]], TreeCache] = cache_type()
cache: Union[Dict[KT, _Node[KT, VT]], TreeCache[_Node[KT, VT]]] = cache_type()
self.cache = cache # Used for introspection.
self.apply_cache_factor_from_config = apply_cache_factor_from_config
@@ -537,6 +541,7 @@ class LruCache(Generic[KT, VT]):
default: Literal[None] = None,
callbacks: Collection[Callable[[], None]] = ...,
update_metrics: bool = ...,
update_last_access: bool = ...,
) -> Optional[VT]:
...
@@ -546,6 +551,7 @@ class LruCache(Generic[KT, VT]):
default: T,
callbacks: Collection[Callable[[], None]] = ...,
update_metrics: bool = ...,
update_last_access: bool = ...,
) -> Union[T, VT]:
...
@@ -555,10 +561,27 @@ class LruCache(Generic[KT, VT]):
default: Optional[T] = None,
callbacks: Collection[Callable[[], None]] = (),
update_metrics: bool = True,
update_last_access: bool = True,
) -> Union[None, T, VT]:
"""Lookup a key in the cache
Args:
key
default
callbacks: A collection of callbacks that will fire when the
node is removed from the cache (either due to invalidation
or expiry).
update_metrics: Whether to update the hit rate metrics
update_last_access: Whether to update the last access metrics
on a node if successfully fetched. These metrics are used
to determine when to remove the node from the cache. Set
to False if this fetch should *not* prevent a node from
being expired.
"""
node = cache.get(key, None)
if node is not None:
move_node_to_front(node)
if update_last_access:
move_node_to_front(node)
node.add_callbacks(callbacks)
if update_metrics and metrics:
metrics.inc_hits()
@@ -568,6 +591,42 @@ class LruCache(Generic[KT, VT]):
metrics.inc_misses()
return default
@overload
def cache_get_multi(
key: tuple,
default: Literal[None] = None,
update_metrics: bool = True,
) -> Union[None, TreeCacheNode]:
...
@overload
def cache_get_multi(
key: tuple,
default: T,
update_metrics: bool = True,
) -> Union[T, TreeCacheNode]:
...
@synchronized
def cache_get_multi(
key: tuple,
default: Optional[T] = None,
update_metrics: bool = True,
) -> Union[None, T, TreeCacheNode]:
"""Used only for `TreeCache` to fetch a subtree."""
assert isinstance(cache, TreeCache)
node = cache.get(key, None)
if node is not None:
if update_metrics and metrics:
metrics.inc_hits()
return node
else:
if update_metrics and metrics:
metrics.inc_misses()
return default
@synchronized
def cache_set(
key: KT, value: VT, callbacks: Collection[Callable[[], None]] = ()
@@ -674,6 +733,8 @@ class LruCache(Generic[KT, VT]):
self.setdefault = cache_set_default
self.pop = cache_pop
self.del_multi = cache_del_multi
if cache_type is TreeCache:
self.get_multi = cache_get_multi
# `invalidate` is exposed for consistency with DeferredCache, so that it can be
# invalidated by the cache invalidation replication stream.
self.invalidate = cache_del_multi

View File

@@ -12,18 +12,59 @@
# See the License for the specific language governing permissions and
# limitations under the License.
SENTINEL = object()
from enum import Enum
from typing import (
Any,
Dict,
Generator,
Generic,
List,
Literal,
Optional,
Tuple,
TypeVar,
Union,
overload,
)
class TreeCacheNode(dict):
class Sentinel(Enum):
sentinel = object()
V = TypeVar("V")
T = TypeVar("T")
class TreeCacheNode(Generic[V]):
"""The type of nodes in our tree.
Has its own type so we can distinguish it from real dicts that are stored at the
leaves.
Either a leaf node or a branch node.
"""
__slots__ = ["leaf_value", "sub_tree"]
class TreeCache:
def __init__(
self,
leaf_value: Union[V, Literal[Sentinel.sentinel]] = Sentinel.sentinel,
sub_tree: Optional[Dict[Any, "TreeCacheNode[V]"]] = None,
) -> None:
if leaf_value is Sentinel.sentinel and sub_tree is None:
raise Exception("One of leaf or sub tree must be set")
self.leaf_value: Union[V, Literal[Sentinel.sentinel]] = leaf_value
self.sub_tree: Optional[Dict[Any, "TreeCacheNode[V]"]] = sub_tree
@staticmethod
def leaf(value: V) -> "TreeCacheNode[V]":
return TreeCacheNode(leaf_value=value)
@staticmethod
def empty_branch() -> "TreeCacheNode[V]":
return TreeCacheNode(sub_tree={})
class TreeCache(Generic[V]):
"""
Tree-based backing store for LruCache. Allows subtrees of data to be deleted
efficiently.
@@ -35,15 +76,15 @@ class TreeCache:
def __init__(self) -> None:
self.size: int = 0
self.root = TreeCacheNode()
self.root: TreeCacheNode[V] = TreeCacheNode.empty_branch()
def __setitem__(self, key, value) -> None:
def __setitem__(self, key: tuple, value: V) -> None:
self.set(key, value)
def __contains__(self, key) -> bool:
return self.get(key, SENTINEL) is not SENTINEL
def __contains__(self, key: tuple) -> bool:
return self.get(key, None) is not None
def set(self, key, value) -> None:
def set(self, key: tuple, value: V) -> None:
if isinstance(value, TreeCacheNode):
# this would mean we couldn't tell where our tree ended and the value
# started.
@@ -51,31 +92,56 @@ class TreeCache:
node = self.root
for k in key[:-1]:
next_node = node.get(k, SENTINEL)
if next_node is SENTINEL:
next_node = node[k] = TreeCacheNode()
elif not isinstance(next_node, TreeCacheNode):
# this suggests that the caller is not being consistent with its key
# length.
sub_tree = node.sub_tree
if sub_tree is None:
raise ValueError("value conflicts with an existing subtree")
node = next_node
node[key[-1]] = value
next_node = sub_tree.get(k, None)
if next_node is None:
node = TreeCacheNode.empty_branch()
sub_tree[k] = node
else:
node = next_node
if node.sub_tree is None:
raise ValueError("value conflicts with an existing subtree")
node.sub_tree[key[-1]] = TreeCacheNode.leaf(value)
self.size += 1
def get(self, key, default=None):
@overload
def get(self, key: tuple, default: Literal[None] = None) -> Union[None, V]:
...
@overload
def get(self, key: tuple, default: T) -> Union[T, V]:
...
def get(self, key: tuple, default: Optional[T] = None) -> Union[None, T, V]:
node = self.root
for k in key[:-1]:
node = node.get(k, None)
if node is None:
for k in key:
sub_tree = node.sub_tree
if sub_tree is None:
raise ValueError("get() key too long")
next_node = sub_tree.get(k, None)
if next_node is None:
return default
return node.get(key[-1], default)
node = next_node
if node.leaf_value is Sentinel.sentinel:
raise ValueError("key points to a branch")
return node.leaf_value
def clear(self) -> None:
self.size = 0
self.root = TreeCacheNode()
def pop(self, key, default=None):
def pop(
self, key: tuple, default: Optional[T] = None
) -> Union[None, T, V, TreeCacheNode[V]]:
"""Remove the given key, or subkey, from the cache
Args:
@@ -91,20 +157,25 @@ class TreeCache:
raise TypeError("The cache key must be a tuple not %r" % (type(key),))
# a list of the nodes we have touched on the way down the tree
nodes = []
nodes: List[TreeCacheNode[V]] = []
node = self.root
for k in key[:-1]:
node = node.get(k, None)
if node is None:
return default
if not isinstance(node, TreeCacheNode):
# we've gone off the end of the tree
sub_tree = node.sub_tree
if sub_tree is None:
raise ValueError("pop() key too long")
nodes.append(node) # don't add the root node
popped = node.pop(key[-1], SENTINEL)
if popped is SENTINEL:
return default
next_node = sub_tree.get(k, None)
if next_node is None:
return default
node = next_node
nodes.append(node)
if node.sub_tree is None:
raise ValueError("pop() key too long")
popped = node.sub_tree.pop(key[-1])
# working back up the tree, clear out any nodes that are now empty
node_and_keys = list(zip(nodes, key))
@@ -116,8 +187,13 @@ class TreeCache:
if n:
break
# found an empty node: remove it from its parent, and loop.
node_and_keys[i + 1][0].pop(k)
node = node_and_keys[i + 1][0]
# We added it to the list so already know its a branch node.
assert node.sub_tree is not None
node.sub_tree.pop(k)
cnt = sum(1 for _ in iterate_tree_cache_entry(popped))
self.size -= cnt
@@ -130,12 +206,31 @@ class TreeCache:
return self.size
def iterate_tree_cache_entry(d):
def iterate_tree_cache_entry(d: TreeCacheNode[V]) -> Generator[V, None, None]:
"""Helper function to iterate over the leaves of a tree, i.e. a dict of that
can contain dicts.
"""
if isinstance(d, TreeCacheNode):
for value_d in d.values():
if d.sub_tree is not None:
for value_d in d.sub_tree.values():
yield from iterate_tree_cache_entry(value_d)
else:
yield d
assert d.leaf_value is not Sentinel.sentinel
yield d.leaf_value
def iterate_tree_cache_items(
key: tuple, value: TreeCacheNode[V]
) -> Generator[Tuple[tuple, V], None, None]:
"""Helper function to iterate over the leaves of a tree, i.e. a dict of that
can contain dicts.
Returns:
A generator yielding key/value pairs.
"""
if value.sub_tree is not None:
for sub_key, sub_value in value.sub_tree.items():
yield from iterate_tree_cache_items((*key, sub_key), sub_value)
else:
assert value.leaf_value is not Sentinel.sentinel
yield key, value.leaf_value

View File

@@ -369,7 +369,7 @@ class StateStoreTestCase(HomeserverTestCase):
state_dict_ids = cache_entry.value
self.assertEqual(cache_entry.full, False)
self.assertEqual(cache_entry.known_absent, {(e1.type, e1.state_key)})
self.assertEqual(cache_entry.known_absent, set())
self.assertDictEqual(state_dict_ids, {(e1.type, e1.state_key): e1.event_id})
############################################