mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-09 01:30:18 +00:00
Compare commits
5 Commits
anoa/codex
...
clokep/eri
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
09825c2775 | ||
|
|
fcc4dc7181 | ||
|
|
1780a70748 | ||
|
|
f3b7940e14 | ||
|
|
53e83c76b2 |
21
Cargo.lock
generated
21
Cargo.lock
generated
@@ -102,6 +102,15 @@ version = "1.0.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "adab1eaa3408fb7f0c777a73e7465fd5656136fc93b670eb6df3c88c2c1344e3"
|
||||
|
||||
[[package]]
|
||||
name = "intrusive-collections"
|
||||
version = "0.9.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b694dc9f70c3bda874626d2aed13b780f137aab435f4e9814121955cf706122e"
|
||||
dependencies = [
|
||||
"memoffset 0.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "itoa"
|
||||
version = "1.0.4"
|
||||
@@ -151,6 +160,15 @@ dependencies = [
|
||||
"autocfg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memoffset"
|
||||
version = "0.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "once_cell"
|
||||
version = "1.15.0"
|
||||
@@ -199,7 +217,7 @@ dependencies = [
|
||||
"cfg-if",
|
||||
"indoc",
|
||||
"libc",
|
||||
"memoffset",
|
||||
"memoffset 0.6.5",
|
||||
"parking_lot",
|
||||
"pyo3-build-config",
|
||||
"pyo3-ffi",
|
||||
@@ -402,6 +420,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"blake2",
|
||||
"hex",
|
||||
"intrusive-collections",
|
||||
"lazy_static",
|
||||
"log",
|
||||
"pyo3",
|
||||
|
||||
1
changelog.d/13733.misc
Normal file
1
changelog.d/13733.misc
Normal file
@@ -0,0 +1 @@
|
||||
Convert `LruCache` linked lists into Rust.
|
||||
@@ -23,6 +23,7 @@ name = "synapse.synapse_rust"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.63"
|
||||
intrusive-collections = "0.9.4"
|
||||
lazy_static = "1.4.0"
|
||||
log = "0.4.17"
|
||||
pyo3 = { version = "0.17.1", features = [
|
||||
|
||||
@@ -17,6 +17,8 @@ fn get_rust_file_digest() -> &'static str {
|
||||
env!("SYNAPSE_RUST_DIGEST")
|
||||
}
|
||||
|
||||
mod lru_cache;
|
||||
|
||||
/// Formats the sum of two numbers as string.
|
||||
#[pyfunction]
|
||||
#[pyo3(text_signature = "(a, b, /)")]
|
||||
@@ -42,5 +44,6 @@ fn synapse_rust(py: Python<'_>, m: &PyModule) -> PyResult<()> {
|
||||
acl::register_module(py, m)?;
|
||||
push::register_module(py, m)?;
|
||||
|
||||
lru_cache::register_module(py, m)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
250
rust/src/lru_cache.rs
Normal file
250
rust/src/lru_cache.rs
Normal file
@@ -0,0 +1,250 @@
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use intrusive_collections::{intrusive_adapter, LinkedListAtomicLink};
|
||||
use intrusive_collections::{LinkedList, LinkedListLink};
|
||||
use lazy_static::lazy_static;
|
||||
use log::error;
|
||||
use pyo3::prelude::*;
|
||||
use pyo3::types::PySet;
|
||||
|
||||
/// Called when registering modules with python.
|
||||
pub fn register_module(py: Python<'_>, m: &PyModule) -> PyResult<()> {
|
||||
let child_module = PyModule::new(py, "push")?;
|
||||
child_module.add_class::<LruCacheNode>()?;
|
||||
child_module.add_class::<PerCacheLinkedList>()?;
|
||||
child_module.add_function(wrap_pyfunction!(get_global_list, m)?)?;
|
||||
|
||||
m.add_submodule(child_module)?;
|
||||
|
||||
// We need to manually add the module to sys.modules to make `from
|
||||
// synapse.synapse_rust import push` work.
|
||||
py.import("sys")?
|
||||
.getattr("modules")?
|
||||
.set_item("synapse.synapse_rust.lru_cache", child_module)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[pyclass]
|
||||
#[derive(Clone)]
|
||||
struct PerCacheLinkedList(Arc<Mutex<LinkedList<LruCacheNodeAdapterPerCache>>>);
|
||||
|
||||
#[pymethods]
|
||||
impl PerCacheLinkedList {
|
||||
#[new]
|
||||
fn new() -> PerCacheLinkedList {
|
||||
PerCacheLinkedList(Default::default())
|
||||
}
|
||||
|
||||
fn get_back(&self) -> Option<LruCacheNode> {
|
||||
let list = self.0.lock().expect("poisoned");
|
||||
list.back().clone_pointer().map(|n| LruCacheNode(n))
|
||||
}
|
||||
}
|
||||
|
||||
struct LruCacheNodeInner {
|
||||
per_cache_link: LinkedListAtomicLink,
|
||||
global_list_link: LinkedListAtomicLink,
|
||||
per_cache_list: Arc<Mutex<LinkedList<LruCacheNodeAdapterPerCache>>>,
|
||||
cache: Mutex<Option<PyObject>>,
|
||||
key: PyObject,
|
||||
value: Arc<Mutex<PyObject>>,
|
||||
callbacks: Py<PySet>,
|
||||
memory: usize,
|
||||
last_access_ts_secs: usize,
|
||||
}
|
||||
|
||||
impl LruCacheNodeInner {
|
||||
fn update_last_access(&mut self, ts_secs: usize) {
|
||||
self.last_access_ts_secs = ts_secs;
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass]
|
||||
struct LruCacheNode(Arc<LruCacheNodeInner>);
|
||||
|
||||
#[pymethods]
|
||||
impl LruCacheNode {
|
||||
#[new]
|
||||
fn py_new(
|
||||
cache: PyObject,
|
||||
cache_list: PerCacheLinkedList,
|
||||
key: PyObject,
|
||||
value: PyObject,
|
||||
callbacks: Py<PySet>,
|
||||
memory: usize,
|
||||
ts_secs: usize,
|
||||
) -> Self {
|
||||
let node = Arc::new(LruCacheNodeInner {
|
||||
per_cache_link: Default::default(),
|
||||
global_list_link: Default::default(),
|
||||
per_cache_list: cache_list.0,
|
||||
cache: Mutex::new(Some(cache)),
|
||||
key,
|
||||
value: Arc::new(Mutex::new(value)),
|
||||
callbacks,
|
||||
memory,
|
||||
last_access_ts_secs: ts_secs,
|
||||
});
|
||||
|
||||
GLOBAL_LIST
|
||||
.lock()
|
||||
.expect("posioned")
|
||||
.push_front(node.clone());
|
||||
|
||||
node.per_cache_list
|
||||
.lock()
|
||||
.expect("posioned")
|
||||
.push_front(node.clone());
|
||||
|
||||
LruCacheNode(node)
|
||||
}
|
||||
|
||||
fn add_callbacks(&self, py: Python<'_>, new_callbacks: &PyAny) -> PyResult<()> {
|
||||
if new_callbacks.len()? == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let current_callbacks = self.0.callbacks.as_ref(py);
|
||||
|
||||
for cb in new_callbacks.iter()? {
|
||||
current_callbacks.add(cb?)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn run_and_clear_callbacks(&self, py: Python<'_>) {
|
||||
let callbacks = self.0.callbacks.as_ref(py);
|
||||
|
||||
if callbacks.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
for callback in callbacks {
|
||||
if let Err(err) = callback.call0() {
|
||||
error!("LruCacheNode callback errored: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
callbacks.clear();
|
||||
}
|
||||
|
||||
fn drop_from_cache(&self) -> PyResult<()> {
|
||||
let cache = self.0.cache.lock().expect("poisoned").take();
|
||||
|
||||
if let Some(cache) = cache {
|
||||
Python::with_gil(|py| cache.call_method1(py, "pop", (&self.0.key, None::<()>)))?;
|
||||
}
|
||||
|
||||
self.drop_from_lists();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn drop_from_lists(&self) {
|
||||
if self.0.global_list_link.is_linked() {
|
||||
let mut glboal_list = GLOBAL_LIST.lock().expect("poisoned");
|
||||
|
||||
let mut curor_mut = unsafe {
|
||||
// Getting the cursor is unsafe as we need to ensure the list link
|
||||
// belongs to the given list.
|
||||
glboal_list.cursor_mut_from_ptr(Arc::into_raw(self.0.clone()))
|
||||
};
|
||||
|
||||
curor_mut.remove();
|
||||
}
|
||||
|
||||
if self.0.per_cache_link.is_linked() {
|
||||
let mut per_cache_list = self.0.per_cache_list.lock().expect("poisoned");
|
||||
|
||||
let mut curor_mut = unsafe {
|
||||
// Getting the cursor is unsafe as we need to ensure the list link
|
||||
// belongs to the given list.
|
||||
per_cache_list.cursor_mut_from_ptr(Arc::into_raw(self.0.clone()))
|
||||
};
|
||||
|
||||
curor_mut.remove();
|
||||
}
|
||||
}
|
||||
|
||||
fn move_to_front(&self, ts_secs: usize) {
|
||||
if self.0.global_list_link.is_linked() {
|
||||
let mut global_list = GLOBAL_LIST.lock().expect("poisoned");
|
||||
|
||||
let mut curor_mut = unsafe {
|
||||
// Getting the cursor is unsafe as we need to ensure the list link
|
||||
// belongs to the given list.
|
||||
global_list.cursor_mut_from_ptr(Arc::into_raw(self.0.clone()))
|
||||
};
|
||||
curor_mut.remove();
|
||||
|
||||
global_list.push_front(self.0.clone());
|
||||
|
||||
// TODO Update self.0.last_access_ts_secs
|
||||
}
|
||||
|
||||
if self.0.per_cache_link.is_linked() {
|
||||
let mut per_cache_list = self.0.per_cache_list.lock().expect("poisoned");
|
||||
|
||||
let mut curor_mut = unsafe {
|
||||
// Getting the cursor is unsafe as we need to ensure the list link
|
||||
// belongs to the given list.
|
||||
per_cache_list.cursor_mut_from_ptr(Arc::into_raw(self.0.clone()))
|
||||
};
|
||||
|
||||
curor_mut.remove();
|
||||
|
||||
per_cache_list.push_front(self.0.clone());
|
||||
}
|
||||
}
|
||||
|
||||
#[getter]
|
||||
fn key(&self) -> &PyObject {
|
||||
&self.0.key
|
||||
}
|
||||
|
||||
#[getter]
|
||||
fn value(&self) -> PyObject {
|
||||
self.0.value.lock().expect("poisoned").clone()
|
||||
}
|
||||
|
||||
#[setter]
|
||||
fn set_value(&self, value: PyObject) {
|
||||
*self.0.value.lock().expect("poisoned") = value
|
||||
}
|
||||
|
||||
#[getter]
|
||||
fn memory(&self) -> usize {
|
||||
self.0.memory
|
||||
}
|
||||
|
||||
#[getter]
|
||||
fn last_access_ts_secs(&self) -> usize { self.0.last_access_ts_secs }
|
||||
}
|
||||
|
||||
#[pyfunction]
|
||||
fn get_global_list() -> Vec<LruCacheNode> {
|
||||
let list = GLOBAL_LIST.lock().expect("poisoned");
|
||||
|
||||
let mut vec = Vec::new();
|
||||
|
||||
let mut cursor = list.front();
|
||||
|
||||
while let Some(n) = cursor.clone_pointer() {
|
||||
vec.push(LruCacheNode(n));
|
||||
|
||||
cursor.move_next();
|
||||
}
|
||||
|
||||
vec
|
||||
}
|
||||
|
||||
intrusive_adapter!(LruCacheNodeAdapterPerCache = Arc<LruCacheNodeInner>: LruCacheNodeInner { per_cache_link: LinkedListLink });
|
||||
intrusive_adapter!(LruCacheNodeAdapterGlobal = Arc<LruCacheNodeInner>: LruCacheNodeInner { global_list_link: LinkedListLink });
|
||||
|
||||
lazy_static! {
|
||||
static ref GLOBAL_LIST_ADAPTER: LruCacheNodeAdapterGlobal = LruCacheNodeAdapterGlobal::new();
|
||||
static ref GLOBAL_LIST: Arc<Mutex<LinkedList<LruCacheNodeAdapterGlobal>>> =
|
||||
Arc::new(Mutex::new(LinkedList::new(GLOBAL_LIST_ADAPTER.clone())));
|
||||
}
|
||||
48
stubs/synapse/synapse_rust/lru_cache.pyi
Normal file
48
stubs/synapse/synapse_rust/lru_cache.pyi
Normal file
@@ -0,0 +1,48 @@
|
||||
# Copyright 2023 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from typing import Callable, Generic, List, Optional, Set, TypeVar, Collection
|
||||
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
|
||||
# Key and Value type for the cache
|
||||
KT = TypeVar("KT")
|
||||
VT = TypeVar("VT")
|
||||
|
||||
class LruCacheNode(Generic[KT, VT]):
|
||||
key: KT
|
||||
value: VT
|
||||
memory: int
|
||||
last_access_ts_secs: int
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
cache: LruCache,
|
||||
cache_list: "PerCacheLinkedList",
|
||||
key: object,
|
||||
value: object,
|
||||
callbacks: Set[Callable[[], None]],
|
||||
memory: int,
|
||||
ts_secs: int,
|
||||
) -> None: ...
|
||||
def add_callbacks(self, new_callbacks: Collection[Callable[[], None]]) -> None: ...
|
||||
def run_and_clear_callbacks(self) -> None: ...
|
||||
def drop_from_cache(self) -> None: ...
|
||||
def drop_from_lists(self) -> None: ...
|
||||
def move_to_front(self, ts_secs: int) -> None: ...
|
||||
|
||||
class PerCacheLinkedList(Generic[KT, VT]):
|
||||
def __init__(self) -> None: ...
|
||||
def get_back(self) -> Optional[LruCacheNode[KT, VT]]: ...
|
||||
|
||||
def get_global_list() -> List[LruCacheNode]: ...
|
||||
@@ -44,6 +44,11 @@ from twisted.internet.interfaces import IReactorTime
|
||||
from synapse.config import cache as cache_config
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.metrics.jemalloc import get_jemalloc_stats
|
||||
from synapse.synapse_rust.lru_cache import (
|
||||
LruCacheNode,
|
||||
PerCacheLinkedList,
|
||||
get_global_list,
|
||||
)
|
||||
from synapse.util import Clock, caches
|
||||
from synapse.util.caches import CacheMetric, EvictionReason, register_cache
|
||||
from synapse.util.caches.treecache import (
|
||||
@@ -51,7 +56,6 @@ from synapse.util.caches.treecache import (
|
||||
iterate_tree_cache_entry,
|
||||
iterate_tree_cache_items,
|
||||
)
|
||||
from synapse.util.linked_list import ListNode
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
@@ -94,22 +98,10 @@ VT = TypeVar("VT")
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
class _TimedListNode(ListNode[T]):
|
||||
"""A `ListNode` that tracks last access time."""
|
||||
|
||||
__slots__ = ["last_access_ts_secs"]
|
||||
|
||||
def update_last_access(self, clock: Clock) -> None:
|
||||
self.last_access_ts_secs = int(clock.time())
|
||||
|
||||
|
||||
# Whether to insert new cache entries to the global list. We only add to it if
|
||||
# time based eviction is enabled.
|
||||
USE_GLOBAL_LIST = False
|
||||
|
||||
# A linked list of all cache entries, allowing efficient time based eviction.
|
||||
GLOBAL_ROOT = ListNode["_Node"].create_root_node()
|
||||
|
||||
|
||||
@wrap_as_background_process("LruCache._expire_old_entries")
|
||||
async def _expire_old_entries(
|
||||
@@ -123,9 +115,12 @@ async def _expire_old_entries(
|
||||
target_cache_memory_usage = autotune_config["target_cache_memory_usage"]
|
||||
min_cache_ttl = autotune_config["min_cache_ttl"] / 1000
|
||||
|
||||
# A linked list of all cache entries, allowing efficient time based eviction.
|
||||
global_root = get_global_list()
|
||||
|
||||
now = int(clock.time())
|
||||
node = GLOBAL_ROOT.prev_node
|
||||
assert node is not None
|
||||
assert len(global_root) > 0
|
||||
node = global_root[0]
|
||||
|
||||
i = 0
|
||||
|
||||
@@ -147,10 +142,7 @@ async def _expire_old_entries(
|
||||
"Unable to read allocated memory, skipping memory-based cache eviction."
|
||||
)
|
||||
|
||||
while node is not GLOBAL_ROOT:
|
||||
# Only the root node isn't a `_TimedListNode`.
|
||||
assert isinstance(node, _TimedListNode)
|
||||
|
||||
for node in global_root[1:]:
|
||||
# if node has not aged past expiry_seconds and we are not evicting due to memory usage, there's
|
||||
# nothing to do here
|
||||
if (
|
||||
@@ -237,125 +229,6 @@ def setup_expire_lru_cache_entries(hs: "HomeServer") -> None:
|
||||
)
|
||||
|
||||
|
||||
class _Node(Generic[KT, VT]):
|
||||
__slots__ = [
|
||||
"_list_node",
|
||||
"_global_list_node",
|
||||
"_cache",
|
||||
"key",
|
||||
"value",
|
||||
"callbacks",
|
||||
"memory",
|
||||
]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
root: "ListNode[_Node]",
|
||||
key: KT,
|
||||
value: VT,
|
||||
cache: "weakref.ReferenceType[LruCache[KT, VT]]",
|
||||
clock: Clock,
|
||||
callbacks: Collection[Callable[[], None]] = (),
|
||||
prune_unread_entries: bool = True,
|
||||
):
|
||||
self._list_node = ListNode.insert_after(self, root)
|
||||
self._global_list_node: Optional[_TimedListNode] = None
|
||||
if USE_GLOBAL_LIST and prune_unread_entries:
|
||||
self._global_list_node = _TimedListNode.insert_after(self, GLOBAL_ROOT)
|
||||
self._global_list_node.update_last_access(clock)
|
||||
|
||||
# We store a weak reference to the cache object so that this _Node can
|
||||
# remove itself from the cache. If the cache is dropped we ensure we
|
||||
# remove our entries in the lists.
|
||||
self._cache = cache
|
||||
|
||||
self.key = key
|
||||
self.value = value
|
||||
|
||||
# Set of callbacks to run when the node gets deleted. We store as a list
|
||||
# rather than a set to keep memory usage down (and since we expect few
|
||||
# entries per node, the performance of checking for duplication in a
|
||||
# list vs using a set is negligible).
|
||||
#
|
||||
# Note that we store this as an optional list to keep the memory
|
||||
# footprint down. Storing `None` is free as its a singleton, while empty
|
||||
# lists are 56 bytes (and empty sets are 216 bytes, if we did the naive
|
||||
# thing and used sets).
|
||||
self.callbacks: Optional[List[Callable[[], None]]] = None
|
||||
|
||||
self.add_callbacks(callbacks)
|
||||
|
||||
self.memory = 0
|
||||
if caches.TRACK_MEMORY_USAGE:
|
||||
self.memory = (
|
||||
_get_size_of(key)
|
||||
+ _get_size_of(value)
|
||||
+ _get_size_of(self._list_node, recurse=False)
|
||||
+ _get_size_of(self.callbacks, recurse=False)
|
||||
+ _get_size_of(self, recurse=False)
|
||||
)
|
||||
self.memory += _get_size_of(self.memory, recurse=False)
|
||||
|
||||
if self._global_list_node:
|
||||
self.memory += _get_size_of(self._global_list_node, recurse=False)
|
||||
self.memory += _get_size_of(self._global_list_node.last_access_ts_secs)
|
||||
|
||||
def add_callbacks(self, callbacks: Collection[Callable[[], None]]) -> None:
|
||||
"""Add to stored list of callbacks, removing duplicates."""
|
||||
|
||||
if not callbacks:
|
||||
return
|
||||
|
||||
if not self.callbacks:
|
||||
self.callbacks = []
|
||||
|
||||
for callback in callbacks:
|
||||
if callback not in self.callbacks:
|
||||
self.callbacks.append(callback)
|
||||
|
||||
def run_and_clear_callbacks(self) -> None:
|
||||
"""Run all callbacks and clear the stored list of callbacks. Used when
|
||||
the node is being deleted.
|
||||
"""
|
||||
|
||||
if not self.callbacks:
|
||||
return
|
||||
|
||||
for callback in self.callbacks:
|
||||
callback()
|
||||
|
||||
self.callbacks = None
|
||||
|
||||
def drop_from_cache(self) -> None:
|
||||
"""Drop this node from the cache.
|
||||
|
||||
Ensures that the entry gets removed from the cache and that we get
|
||||
removed from all lists.
|
||||
"""
|
||||
cache = self._cache()
|
||||
if (
|
||||
cache is None
|
||||
or cache.pop(self.key, _Sentinel.sentinel) is _Sentinel.sentinel
|
||||
):
|
||||
# `cache.pop` should call `drop_from_lists()`, unless this Node had
|
||||
# already been removed from the cache.
|
||||
self.drop_from_lists()
|
||||
|
||||
def drop_from_lists(self) -> None:
|
||||
"""Remove this node from the cache lists."""
|
||||
self._list_node.remove_from_list()
|
||||
|
||||
if self._global_list_node:
|
||||
self._global_list_node.remove_from_list()
|
||||
|
||||
def move_to_front(self, clock: Clock, cache_list_root: ListNode) -> None:
|
||||
"""Moves this node to the front of all the lists its in."""
|
||||
self._list_node.move_after(cache_list_root)
|
||||
if self._global_list_node:
|
||||
self._global_list_node.move_after(GLOBAL_ROOT)
|
||||
self._global_list_node.update_last_access(clock)
|
||||
|
||||
|
||||
class _Sentinel(Enum):
|
||||
# defining a sentinel in this way allows mypy to correctly handle the
|
||||
# type of a dictionary lookup.
|
||||
@@ -417,7 +290,7 @@ class LruCache(Generic[KT, VT]):
|
||||
else:
|
||||
real_clock = clock
|
||||
|
||||
cache: Union[Dict[KT, _Node[KT, VT]], TreeCache] = cache_type()
|
||||
cache: "Union[Dict[KT, LruCacheNode[KT, VT]], TreeCache]" = cache_type()
|
||||
self.cache = cache # Used for introspection.
|
||||
self.apply_cache_factor_from_config = apply_cache_factor_from_config
|
||||
|
||||
@@ -449,30 +322,24 @@ class LruCache(Generic[KT, VT]):
|
||||
self.metrics = metrics
|
||||
|
||||
# We create a single weakref to self here so that we don't need to keep
|
||||
# creating more each time we create a `_Node`.
|
||||
# creating more each time we create a `LruCacheNode`.
|
||||
weak_ref_to_self = weakref.ref(self)
|
||||
|
||||
list_root = ListNode[_Node[KT, VT]].create_root_node()
|
||||
rust_linked_list: "PerCacheLinkedList[KT, VT]" = PerCacheLinkedList()
|
||||
|
||||
lock = threading.Lock()
|
||||
|
||||
def evict() -> None:
|
||||
while cache_len() > self.max_size:
|
||||
# Get the last node in the list (i.e. the oldest node).
|
||||
todelete = list_root.prev_node
|
||||
todelete = rust_linked_list.get_back()
|
||||
|
||||
# The list root should always have a valid `prev_node` if the
|
||||
# cache is not empty.
|
||||
assert todelete is not None
|
||||
|
||||
# The node should always have a reference to a cache entry, as
|
||||
# we only drop the cache entry when we remove the node from the
|
||||
# list.
|
||||
node = todelete.get_cache_entry()
|
||||
assert node is not None
|
||||
|
||||
evicted_len = delete_node(node)
|
||||
cache.pop(node.key, None)
|
||||
evicted_len = delete_node(todelete)
|
||||
cache.pop(todelete.key, None)
|
||||
if metrics:
|
||||
metrics.inc_evictions(EvictionReason.size, evicted_len)
|
||||
|
||||
@@ -500,14 +367,14 @@ class LruCache(Generic[KT, VT]):
|
||||
def add_node(
|
||||
key: KT, value: VT, callbacks: Collection[Callable[[], None]] = ()
|
||||
) -> None:
|
||||
node: _Node[KT, VT] = _Node(
|
||||
list_root,
|
||||
node: "LruCacheNode[KT, VT]" = LruCacheNode(
|
||||
self,
|
||||
rust_linked_list,
|
||||
key,
|
||||
value,
|
||||
weak_ref_to_self,
|
||||
real_clock,
|
||||
callbacks,
|
||||
prune_unread_entries,
|
||||
set(callbacks),
|
||||
0,
|
||||
int(real_clock.time()),
|
||||
)
|
||||
cache[key] = node
|
||||
|
||||
@@ -517,10 +384,10 @@ class LruCache(Generic[KT, VT]):
|
||||
if caches.TRACK_MEMORY_USAGE and metrics:
|
||||
metrics.inc_memory_usage(node.memory)
|
||||
|
||||
def move_node_to_front(node: _Node[KT, VT]) -> None:
|
||||
node.move_to_front(real_clock, list_root)
|
||||
def move_node_to_front(node: "LruCacheNode[KT, VT]") -> None:
|
||||
node.move_to_front(int(real_clock.time()))
|
||||
|
||||
def delete_node(node: _Node[KT, VT]) -> int:
|
||||
def delete_node(node: "LruCacheNode[KT, VT]") -> int:
|
||||
node.drop_from_lists()
|
||||
|
||||
deleted_len = 1
|
||||
@@ -639,7 +506,7 @@ class LruCache(Generic[KT, VT]):
|
||||
if update_metrics and metrics:
|
||||
metrics.inc_hits()
|
||||
|
||||
# We store entries in the `TreeCache` with values of type `_Node`,
|
||||
# We store entries in the `TreeCache` with values of type `LruCacheNode`,
|
||||
# which we need to unwrap.
|
||||
return (
|
||||
(full_key, lru_node.value)
|
||||
@@ -734,8 +601,8 @@ class LruCache(Generic[KT, VT]):
|
||||
node.run_and_clear_callbacks()
|
||||
node.drop_from_lists()
|
||||
|
||||
assert list_root.next_node == list_root
|
||||
assert list_root.prev_node == list_root
|
||||
# assert list_root.next_node == list_root
|
||||
# assert list_root.prev_node == list_root
|
||||
|
||||
cache.clear()
|
||||
if size_callback:
|
||||
|
||||
@@ -1,150 +0,0 @@
|
||||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""A circular doubly linked list implementation.
|
||||
"""
|
||||
|
||||
import threading
|
||||
from typing import Generic, Optional, Type, TypeVar
|
||||
|
||||
P = TypeVar("P")
|
||||
LN = TypeVar("LN", bound="ListNode")
|
||||
|
||||
|
||||
class ListNode(Generic[P]):
|
||||
"""A node in a circular doubly linked list, with an (optional) reference to
|
||||
a cache entry.
|
||||
|
||||
The reference should only be `None` for the root node or if the node has
|
||||
been removed from the list.
|
||||
"""
|
||||
|
||||
# A lock to protect mutating the list prev/next pointers.
|
||||
_LOCK = threading.Lock()
|
||||
|
||||
# We don't use attrs here as in py3.6 you can't have `attr.s(slots=True)`
|
||||
# and inherit from `Generic` for some reason
|
||||
__slots__ = [
|
||||
"cache_entry",
|
||||
"prev_node",
|
||||
"next_node",
|
||||
]
|
||||
|
||||
def __init__(self, cache_entry: Optional[P] = None) -> None:
|
||||
self.cache_entry = cache_entry
|
||||
self.prev_node: Optional[ListNode[P]] = None
|
||||
self.next_node: Optional[ListNode[P]] = None
|
||||
|
||||
@classmethod
|
||||
def create_root_node(cls: Type["ListNode[P]"]) -> "ListNode[P]":
|
||||
"""Create a new linked list by creating a "root" node, which is a node
|
||||
that has prev_node/next_node pointing to itself and no associated cache
|
||||
entry.
|
||||
"""
|
||||
root = cls()
|
||||
root.prev_node = root
|
||||
root.next_node = root
|
||||
return root
|
||||
|
||||
@classmethod
|
||||
def insert_after(
|
||||
cls: Type[LN],
|
||||
cache_entry: P,
|
||||
node: "ListNode[P]",
|
||||
) -> LN:
|
||||
"""Create a new list node that is placed after the given node.
|
||||
|
||||
Args:
|
||||
cache_entry: The associated cache entry.
|
||||
node: The existing node in the list to insert the new entry after.
|
||||
"""
|
||||
new_node = cls(cache_entry)
|
||||
with cls._LOCK:
|
||||
new_node._refs_insert_after(node)
|
||||
return new_node
|
||||
|
||||
def remove_from_list(self) -> None:
|
||||
"""Remove this node from the list."""
|
||||
with self._LOCK:
|
||||
self._refs_remove_node_from_list()
|
||||
|
||||
# We drop the reference to the cache entry to break the reference cycle
|
||||
# between the list node and cache entry, allowing the two to be dropped
|
||||
# immediately rather than at the next GC.
|
||||
self.cache_entry = None
|
||||
|
||||
def move_after(self, node: "ListNode[P]") -> None:
|
||||
"""Move this node from its current location in the list to after the
|
||||
given node.
|
||||
"""
|
||||
with self._LOCK:
|
||||
# We assert that both this node and the target node is still "alive".
|
||||
assert self.prev_node
|
||||
assert self.next_node
|
||||
assert node.prev_node
|
||||
assert node.next_node
|
||||
|
||||
assert self is not node
|
||||
|
||||
# Remove self from the list
|
||||
self._refs_remove_node_from_list()
|
||||
|
||||
# Insert self back into the list, after target node
|
||||
self._refs_insert_after(node)
|
||||
|
||||
def _refs_remove_node_from_list(self) -> None:
|
||||
"""Internal method to *just* remove the node from the list, without
|
||||
e.g. clearing out the cache entry.
|
||||
"""
|
||||
if self.prev_node is None or self.next_node is None:
|
||||
# We've already been removed from the list.
|
||||
return
|
||||
|
||||
prev_node = self.prev_node
|
||||
next_node = self.next_node
|
||||
|
||||
prev_node.next_node = next_node
|
||||
next_node.prev_node = prev_node
|
||||
|
||||
# We set these to None so that we don't get circular references,
|
||||
# allowing us to be dropped without having to go via the GC.
|
||||
self.prev_node = None
|
||||
self.next_node = None
|
||||
|
||||
def _refs_insert_after(self, node: "ListNode[P]") -> None:
|
||||
"""Internal method to insert the node after the given node."""
|
||||
|
||||
# This method should only be called when we're not already in the list.
|
||||
assert self.prev_node is None
|
||||
assert self.next_node is None
|
||||
|
||||
# We expect the given node to be in the list and thus have valid
|
||||
# prev/next refs.
|
||||
assert node.next_node
|
||||
assert node.prev_node
|
||||
|
||||
prev_node = node
|
||||
next_node = node.next_node
|
||||
|
||||
self.prev_node = prev_node
|
||||
self.next_node = next_node
|
||||
|
||||
prev_node.next_node = self
|
||||
next_node.prev_node = self
|
||||
|
||||
def get_cache_entry(self) -> Optional[P]:
|
||||
"""Get the cache entry, returns None if this is the root node (i.e.
|
||||
cache_entry is None) or if the entry has been dropped.
|
||||
"""
|
||||
return self.cache_entry
|
||||
Reference in New Issue
Block a user