Compare commits

...

3 Commits

Author SHA1 Message Date
Andrew Morgan
8acd2c01bc lil fix 2022-09-26 16:13:53 +01:00
Andrew Morgan
f1d98d3b70 wip2 2022-09-22 15:54:30 +01:00
Andrew Morgan
6ff8ba5fc6 wip 2022-09-21 17:37:38 +01:00
7 changed files with 349 additions and 20 deletions

View File

@@ -194,7 +194,7 @@ class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBase
# changed its content in the database. We can't call
# self._invalidate_cache_and_stream because self.get_event_cache isn't of the
# right type.
self.invalidate_get_event_cache_after_txn(txn, event.event_id)
self.invalidate_get_event_cache_by_event_id_after_txn(txn, event.event_id)
# Send that invalidation to replication so that other workers also invalidate
# the event cache.
self._send_invalidation_to_replication(

View File

@@ -1294,8 +1294,10 @@ class PersistEventsStore:
"""
depth_updates: Dict[str, int] = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
self.store.invalidate_get_event_cache_after_txn(txn, event.event_id)
# Remove any existing cache entries for the event_ids
self.store.invalidate_get_event_cache_by_event_id_after_txn(
txn, event.event_id
)
# Then update the `stream_ordering` position to mark the latest
# event as the front of the room. This should not be done for
# backfilled events because backfilled events have negative
@@ -1703,7 +1705,7 @@ class PersistEventsStore:
_invalidate_caches_for_event.
"""
assert event.redacts is not None
self.store.invalidate_get_event_cache_after_txn(txn, event.redacts)
self.store.invalidate_get_event_cache_by_event_id_after_txn(txn, event.redacts)
txn.call_after(self.store.get_relations_for_event.invalidate, (event.redacts,))
txn.call_after(self.store.get_applicable_edit.invalidate, (event.redacts,))

View File

@@ -80,6 +80,7 @@ from synapse.types import JsonDict, get_domain_from_id
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.dual_lookup_cache import DualLookupCache
from synapse.util.caches.lrucache import AsyncLruCache
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
@@ -245,6 +246,8 @@ class EventsWorkerStore(SQLBaseStore):
] = AsyncLruCache(
cache_name="*getEvent*",
max_size=hs.config.caches.event_cache_size,
cache_type=DualLookupCache,
dual_lookup_secondary_key_function=lambda v: (v.event.room_id,),
)
# Map from event ID to a deferred that will result in a map from event
@@ -733,7 +736,7 @@ class EventsWorkerStore(SQLBaseStore):
return event_entry_map
def invalidate_get_event_cache_after_txn(
def invalidate_get_event_cache_by_event_id_after_txn(
self, txn: LoggingTransaction, event_id: str
) -> None:
"""
@@ -747,10 +750,31 @@ class EventsWorkerStore(SQLBaseStore):
event_id: the event ID to be invalidated from caches
"""
txn.async_call_after(self._invalidate_async_get_event_cache, event_id)
txn.call_after(self._invalidate_local_get_event_cache, event_id)
txn.async_call_after(
self._invalidate_async_get_event_cache_by_event_id, event_id
)
txn.call_after(self._invalidate_local_get_event_cache_by_event_id, event_id)
async def _invalidate_async_get_event_cache(self, event_id: str) -> None:
def invalidate_get_event_cache_by_room_id_after_txn(
self, txn: LoggingTransaction, room_id: str
) -> None:
"""
Prepares a database transaction to invalidate the get event cache for a given
room ID when executed successfully. This is achieved by attaching two callbacks
to the transaction, one to invalidate the async cache and one for the in memory
sync cache (importantly called in that order).
Arguments:
txn: the database transaction to attach the callbacks to.
room_id: the room ID to invalidate all associated event caches for.
"""
txn.async_call_after(self._invalidate_async_get_event_cache_by_room_id, room_id)
txn.call_after(self._invalidate_local_get_event_cache_by_room_id, room_id)
async def _invalidate_async_get_event_cache_by_event_id(
self, event_id: str
) -> None:
"""
Invalidates an event in the asyncronous get event cache, which may be remote.
@@ -760,7 +784,18 @@ class EventsWorkerStore(SQLBaseStore):
await self._get_event_cache.invalidate((event_id,))
def _invalidate_local_get_event_cache(self, event_id: str) -> None:
async def _invalidate_async_get_event_cache_by_room_id(self, room_id: str) -> None:
"""
Invalidates all events associated with a given room in the asyncronous get event
cache, which may be remote.
Arguments:
room_id: the room ID to invalidate associated events of.
"""
await self._get_event_cache.invalidate((room_id,))
def _invalidate_local_get_event_cache_by_event_id(self, event_id: str) -> None:
"""
Invalidates an event in local in-memory get event caches.
@@ -772,6 +807,18 @@ class EventsWorkerStore(SQLBaseStore):
self._event_ref.pop(event_id, None)
self._current_event_fetches.pop(event_id, None)
def _invalidate_local_get_event_cache_by_room_id(self, room_id: str) -> None:
"""
Invalidates all events associated with a given room ID in local in-memory
get event caches.
Arguments:
room_id: the room ID to invalidate events of.
"""
self._get_event_cache.invalidate_local((room_id,))
# TODO: invalidate _event_ref and _current_event_fetches. How?
async def _get_events_from_cache(
self, events: Iterable[str], update_metrics: bool = True
) -> Dict[str, EventCacheEntry]:
@@ -2284,7 +2331,7 @@ class EventsWorkerStore(SQLBaseStore):
updatevalues={"rejection_reason": rejection_reason},
)
self.invalidate_get_event_cache_after_txn(txn, event_id)
self.invalidate_get_event_cache_by_event_id_after_txn(txn, event_id)
# TODO(faster_joins): invalidate the cache on workers. Ideally we'd just
# call '_send_invalidation_to_replication', but we actually need the other

View File

@@ -304,7 +304,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
self._invalidate_cache_and_stream(
txn, self.have_seen_event, (room_id, event_id)
)
self.invalidate_get_event_cache_after_txn(txn, event_id)
self.invalidate_get_event_cache_by_event_id_after_txn(txn, event_id)
logger.info("[purge] done")
@@ -478,6 +478,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# XXX: as with purge_history, this is racy, but no worse than other races
# that already exist.
self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,))
self._invalidate_local_get_event_cache_by_room_id(room_id)
logger.info("[purge] done")

View File

@@ -0,0 +1,238 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import (
Callable,
Dict,
Generic,
ItemsView,
List,
Optional,
TypeVar,
Union,
ValuesView,
)
# Used to discern between a value not existing in a map, or the value being 'None'.
SENTINEL = object()
# The type of the primary dict's keys.
PKT = TypeVar("PKT")
# The type of the primary dict's values.
PVT = TypeVar("PVT")
# The type of the secondary dict's keys.
SKT = TypeVar("SKT")
logger = logging.getLogger(__name__)
class SecondarySet(set):
"""
Used to differentiate between an entry in the secondary_dict, and a set stored
in the primary_dict. This is necessary as pop() can return either.
"""
class DualLookupCache(Generic[PKT, PVT, SKT]):
"""
A backing store for LruCache that supports multiple entry points.
Allows subsets of data to be deleted efficiently without requiring extra
information to query.
The data structure is two dictionaries:
* primary_dict containing a mapping of primary_key -> value.
* secondary_dict containing a mapping of secondary_key -> set of primary_key.
On insert, a mapping in the primary_dict must be created. A mapping in the
secondary_dict from a secondary_key to (a set containing) the same
primary_key will be made. The secondary_key
must be derived from the inserted value via a lambda function provided at cache
initialisation. This is so invalidated entries in the primary_dict may automatically
invalidate those in the secondary_dict. The secondary_key may be associated with one
or more primary_key's.
This creates an interface which allows for efficient lookups of a value given
a primary_key, as well as efficient invalidation of a subset of mapping in the
primary_dict given a secondary_key. A primary_key may not be associated with more
than one secondary_key.
As a worked example, consider storing a cache of room events. We could configure
the cache to store mappings between EventIDs and EventBase in the primary_dict,
while storing a mapping between room IDs and event IDs as the secondary_dict:
primary_dict: EventID -> EventBase
secondary_dict: RoomID -> {EventID, EventID, ...}
This would be efficient for the following operations:
* Given an EventID, look up the associated EventBase, and thus the roomID.
* Given a RoomID, invalidate all primary_dict entries for events in that room.
Since this is intended as a backing store for LRUCache, when it came time to evict
an entry from the primary_dict (EventID -> EventBase), the secondary_key could be
derived from a provided lambda function:
secondary_key = lambda event_base: event_base.room_id
The EventID set under room_id would then have the appropriate EventID entry evicted.
"""
def __init__(self, secondary_key_function: Callable[[PVT], SKT]) -> None:
self._primary_dict: Dict[PKT, PVT] = {}
self._secondary_dict: Dict[SKT, SecondarySet] = {}
self._secondary_key_function = secondary_key_function
def __setitem__(self, key: PKT, value: PVT) -> None:
self.set(key, value)
def __contains__(self, key: PKT) -> bool:
return key in self._primary_dict
def set(self, key: PKT, value: PVT) -> None:
"""Add an entry to the cache.
Will add an entry to the primary_dict consisting of key->value, as well as append
to the set referred to by secondary_key_function(value) in the secondary_dict.
Args:
key: The key for a new mapping in primary_dict.
value: The value for a new mapping in primary_dict.
"""
# Create an entry in the primary_dict.
self._primary_dict[key] = value
# Derive the secondary_key to use from the given primary_value.
secondary_key = self._secondary_key_function(value)
# TODO: If the lambda function resolves to None, don't insert an entry?
# And create a mapping in the secondary_dict to a set containing the
# primary_key, creating the set if necessary.
secondary_key_set = self._secondary_dict.setdefault(
secondary_key, SecondarySet()
)
secondary_key_set.add(key)
logger.info("*** Insert into primary_dict: %s: %s", key, value)
logger.info("*** Insert into secondary_dict: %s: %s", secondary_key, key)
def get(self, key: PKT, default: Optional[PVT] = None) -> Optional[PVT]:
"""Retrieve a value from the cache if it exists. If not, return the default
value.
This method simply pulls entries from the primary_dict.
# TODO: Any use cases for externally getting entries from the secondary_dict?
Args:
key: The key to search the cache for.
default: The default value to return if the given key is not found.
Returns:
The value referenced by the given key, if it exists in the cache. If not,
the value of `default` will be returned.
"""
logger.info("*** Retrieving key from primary_dict: %s", key)
return self._primary_dict.get(key, default)
def clear(self) -> None:
"""Evicts all entries from the cache."""
self._primary_dict.clear()
self._secondary_dict.clear()
def pop(
self, key: Union[PKT, SKT], default: Optional[Union[Dict[PKT, PVT], PVT]] = None
) -> Optional[Union[Dict[PKT, PVT], PVT]]:
"""Remove an entry from either the primary_dict or secondary_dict.
The primary_dict is checked first for the key. If an entry is found, it is
removed from the primary_dict and returned.
If no entry in the primary_dict exists, then the secondary_dict is checked.
If an entry exists, all associated entries in the primary_dict will be
deleted, and all primary_dict keys returned from this function in a SecondarySet.
Args:
key: A key to drop from either the primary_dict or secondary_dict.
default: The default value if the key does not exist in either dict.
Returns:
Either a matched value from the primary_dict or the secondary_dict. If no
value is found for the key, then None.
"""
# Attempt to remove from the primary_dict first.
primary_value = self._primary_dict.pop(key, SENTINEL)
if primary_value is not SENTINEL:
# We found a value in the primary_dict. Remove it from the corresponding
# entry in the secondary_dict, and then return it.
logger.info(
"*** Popped entry from primary_dict: %s: %s", key, primary_value
)
# Derive the secondary_key from the primary_value
secondary_key = self._secondary_key_function(primary_value)
# Pop the entry from the secondary_dict
secondary_key_set = self._secondary_dict[secondary_key]
if len(secondary_key_set) > 1:
# Delete just the set entry for the given key.
secondary_key_set.remove(key)
logger.info(
"*** Popping from secondary_dict: %s: %s", secondary_key, key
)
else:
# Delete the entire set referenced by the secondary_key, as it only
# has one entry.
del self._secondary_dict[secondary_key]
logger.info("*** Popping from secondary_dict: %s", secondary_key)
return primary_value
# There was no matching value in the primary_dict. Attempt the secondary_dict.
primary_key_set = self._secondary_dict.pop(key, SENTINEL)
if primary_key_set is not SENTINEL:
# We found a set in the secondary_dict.
logger.info(
"*** Found '%s' in secondary_dict: %s: ",
key,
primary_key_set,
)
popped_primary_dict_values: List[PVT] = []
# We found an entry in the secondary_dict. Delete all related entries in the
# primary_dict.
logger.info(
"*** Found key in secondary_dict to pop: %s. "
"Popping primary_dict entries",
key,
)
for primary_key in primary_key_set:
primary_value = self._primary_dict.pop(primary_key)
logger.info("*** Popping entry from primary_dict: %s - %s", primary_key, primary_value)
logger.info("*** primary_dict: %s", self._primary_dict)
popped_primary_dict_values.append(primary_value)
# Now return the unmodified copy of the set.
return popped_primary_dict_values
# No match in either dict.
return default
def values(self) -> ValuesView:
return self._primary_dict.values()
def items(self) -> ItemsView:
return self._primary_dict.items()
def __len__(self) -> int:
return len(self._primary_dict)

View File

@@ -46,8 +46,10 @@ from synapse.metrics.background_process_metrics import wrap_as_background_proces
from synapse.metrics.jemalloc import get_jemalloc_stats
from synapse.util import Clock, caches
from synapse.util.caches import CacheMetric, EvictionReason, register_cache
from synapse.util.caches.dual_lookup_cache import DualLookupCache, SecondarySet
from synapse.util.caches.treecache import (
TreeCache,
TreeCacheNode,
iterate_tree_cache_entry,
iterate_tree_cache_items,
)
@@ -375,12 +377,13 @@ class LruCache(Generic[KT, VT]):
self,
max_size: int,
cache_name: Optional[str] = None,
cache_type: Type[Union[dict, TreeCache]] = dict,
cache_type: Type[Union[dict, TreeCache, DualLookupCache]] = dict,
size_callback: Optional[Callable[[VT], int]] = None,
metrics_collection_callback: Optional[Callable[[], None]] = None,
apply_cache_factor_from_config: bool = True,
clock: Optional[Clock] = None,
prune_unread_entries: bool = True,
dual_lookup_secondary_key_function: Optional[Callable[[Any], Any]] = None,
):
"""
Args:
@@ -411,6 +414,10 @@ class LruCache(Generic[KT, VT]):
prune_unread_entries: If True, cache entries that haven't been read recently
will be evicted from the cache in the background. Set to False to
opt-out of this behaviour.
# TODO: At this point we should probably just pass an initialised cache type
# to LruCache, no?
dual_lookup_secondary_key_function:
"""
# Default `clock` to something sensible. Note that we rename it to
# `real_clock` so that mypy doesn't think its still `Optional`.
@@ -419,7 +426,30 @@ class LruCache(Generic[KT, VT]):
else:
real_clock = clock
cache: Union[Dict[KT, _Node[KT, VT]], TreeCache] = cache_type()
# TODO: I've had to make this ugly to appease mypy :(
# Perhaps initialise the backing cache and then pass to LruCache?
cache: Union[Dict[KT, _Node[KT, VT]], TreeCache, DualLookupCache]
if cache_type is DualLookupCache:
# The dual_lookup_secondary_key_function is a function that's intended to
# extract a key from the value in the cache. Since we wrap values given to
# us in a _Node object, this function will actually operate on a _Node,
# instead of directly on the object type callers are expecting.
#
# Thus, we wrap the function given by the caller in another one that
# extracts the value from the _Node, before then handing it off to the
# given function for processing.
def key_function_wrapper(node: Any) -> Any:
assert dual_lookup_secondary_key_function is not None
return dual_lookup_secondary_key_function(node.value)
cache = DualLookupCache(
secondary_key_function=key_function_wrapper,
)
elif cache_type is TreeCache:
cache = TreeCache()
else:
cache = {}
self.cache = cache # Used for introspection.
self.apply_cache_factor_from_config = apply_cache_factor_from_config
@@ -722,13 +752,25 @@ class LruCache(Generic[KT, VT]):
may be of lower cardinality than the TreeCache - in which case the whole
subtree is deleted.
"""
popped = cache.pop(key, None)
if popped is None:
# Remove an entry from the cache.
# In the case of a 'dict' cache type, we're just removing an entry from the
# dict. For a TreeCache, we're removing a subtree which has children.
popped_entry: _Node[KT, VT] = cache.pop(key, None)
if popped_entry is None:
return
# for each deleted node, we now need to remove it from the linked list
# and run its callbacks.
for leaf in iterate_tree_cache_entry(popped):
delete_node(leaf)
if isinstance(popped_entry, TreeCacheNode):
# We've popped a subtree from a TreeCache - now we need to clean up
# each child node.
for leaf in iterate_tree_cache_entry(popped_entry):
# For each deleted child node, we remove it from the linked list and
# run its callbacks.
delete_node(leaf)
elif isinstance(popped_entry, SecondarySet):
for leaf in popped_entry:
delete_node(leaf)
else:
delete_node(popped_entry)
@synchronized
def cache_clear() -> None:

View File

@@ -115,6 +115,5 @@ class PurgeTests(HomeserverTestCase):
)
# The events aren't found.
self.store._invalidate_local_get_event_cache(create_event.event_id)
self.get_failure(self.store.get_event(create_event.event_id), NotFoundError)
self.get_failure(self.store.get_event(first["event_id"]), NotFoundError)