Compare commits

...

3 Commits

Author SHA1 Message Date
Erik Johnston
1780a70748 Newsfile 2022-09-09 16:26:14 +01:00
Erik Johnston
f3b7940e14 More stuff 2022-09-09 16:22:46 +01:00
Erik Johnston
53e83c76b2 SNAPSHOT 2022-09-09 16:22:45 +01:00
6 changed files with 260 additions and 17 deletions

View File

@@ -3,3 +3,7 @@
[workspace]
members = ["rust"]
[profile.release]
debug = true

1
changelog.d/13733.misc Normal file
View File

@@ -0,0 +1 @@
Convert `LruCache` linked lists into Rust.

View File

@@ -18,4 +18,7 @@ crate-type = ["cdylib"]
name = "synapse.synapse_rust"
[dependencies]
intrusive-collections = "0.9.4"
lazy_static = "1.4.0"
log = "0.4.17"
pyo3 = { version = "0.16.5", features = ["extension-module", "macros", "abi3", "abi3-py37"] }

View File

@@ -1,5 +1,7 @@
use pyo3::prelude::*;
mod lru_cache;
/// Formats the sum of two numbers as string.
#[pyfunction]
#[pyo3(text_signature = "(a, b, /)")]
@@ -9,8 +11,9 @@ fn sum_as_string(a: usize, b: usize) -> PyResult<String> {
/// The entry point for defining the Python module.
#[pymodule]
fn synapse_rust(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
fn synapse_rust(py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(sum_as_string, m)?)?;
lru_cache::register_module(py, m)?;
Ok(())
}

236
rust/src/lru_cache.rs Normal file
View File

@@ -0,0 +1,236 @@
use std::sync::{Arc, Mutex};
use intrusive_collections::{intrusive_adapter, LinkedListAtomicLink};
use intrusive_collections::{LinkedList, LinkedListLink};
use lazy_static::lazy_static;
use log::error;
use pyo3::prelude::*;
use pyo3::types::PySet;
/// Called when registering modules with python.
pub fn register_module(py: Python<'_>, m: &PyModule) -> PyResult<()> {
let child_module = PyModule::new(py, "push")?;
child_module.add_class::<LruCacheNode>()?;
child_module.add_class::<PerCacheLinkedList>()?;
child_module.add_function(wrap_pyfunction!(get_global_list, m)?)?;
m.add_submodule(child_module)?;
// We need to manually add the module to sys.modules to make `from
// synapse.synapse_rust import push` work.
py.import("sys")?
.getattr("modules")?
.set_item("synapse.synapse_rust.lru_cache", child_module)?;
Ok(())
}
#[pyclass]
#[derive(Clone)]
struct PerCacheLinkedList(Arc<Mutex<LinkedList<LruCacheNodeAdapterPerCache>>>);
#[pymethods]
impl PerCacheLinkedList {
#[new]
fn new() -> PerCacheLinkedList {
PerCacheLinkedList(Default::default())
}
fn get_back(&self) -> Option<LruCacheNode> {
let list = self.0.lock().expect("poisoned");
list.back().clone_pointer().map(|n| LruCacheNode(n))
}
}
struct LruCacheNodeInner {
per_cache_link: LinkedListAtomicLink,
global_list_link: LinkedListAtomicLink,
per_cache_list: Arc<Mutex<LinkedList<LruCacheNodeAdapterPerCache>>>,
cache: Mutex<Option<PyObject>>,
key: PyObject,
value: Arc<Mutex<PyObject>>,
callbacks: Py<PySet>,
memory: usize,
}
#[pyclass]
struct LruCacheNode(Arc<LruCacheNodeInner>);
#[pymethods]
impl LruCacheNode {
#[new]
fn py_new(
cache: PyObject,
cache_list: PerCacheLinkedList,
key: PyObject,
value: PyObject,
callbacks: Py<PySet>,
memory: usize,
) -> Self {
let node = Arc::new(LruCacheNodeInner {
per_cache_link: Default::default(),
global_list_link: Default::default(),
per_cache_list: cache_list.0,
cache: Mutex::new(Some(cache)),
key,
value: Arc::new(Mutex::new(value)),
callbacks,
memory,
});
GLOBAL_LIST
.lock()
.expect("posioned")
.push_front(node.clone());
node.per_cache_list
.lock()
.expect("posioned")
.push_front(node.clone());
LruCacheNode(node)
}
fn add_callbacks(&self, py: Python<'_>, new_callbacks: &PyAny) -> PyResult<()> {
if new_callbacks.len()? == 0 {
return Ok(());
}
let current_callbacks = self.0.callbacks.as_ref(py);
for cb in new_callbacks.iter()? {
current_callbacks.add(cb?)?;
}
Ok(())
}
fn run_and_clear_callbacks(&self, py: Python<'_>) {
let callbacks = self.0.callbacks.as_ref(py);
if callbacks.is_empty() {
return;
}
for callback in callbacks {
if let Err(err) = callback.call0() {
error!("LruCacheNode callback errored: {err}");
}
}
callbacks.clear();
}
fn drop_from_cache(&self) -> PyResult<()> {
let cache = self.0.cache.lock().expect("poisoned").take();
if let Some(cache) = cache {
Python::with_gil(|py| cache.call_method1(py, "pop", (&self.0.key, None::<()>)))?;
}
self.drop_from_lists();
Ok(())
}
fn drop_from_lists(&self) {
if self.0.global_list_link.is_linked() {
let mut glboal_list = GLOBAL_LIST.lock().expect("poisoned");
let mut curor_mut = unsafe {
// Getting the cursor is unsafe as we need to ensure the list link
// belongs to the given list.
glboal_list.cursor_mut_from_ptr(Arc::into_raw(self.0.clone()))
};
curor_mut.remove();
}
if self.0.per_cache_link.is_linked() {
let mut per_cache_list = self.0.per_cache_list.lock().expect("poisoned");
let mut curor_mut = unsafe {
// Getting the cursor is unsafe as we need to ensure the list link
// belongs to the given list.
per_cache_list.cursor_mut_from_ptr(Arc::into_raw(self.0.clone()))
};
curor_mut.remove();
}
}
fn move_to_front(&self) {
if self.0.global_list_link.is_linked() {
let mut global_list = GLOBAL_LIST.lock().expect("poisoned");
let mut curor_mut = unsafe {
// Getting the cursor is unsafe as we need to ensure the list link
// belongs to the given list.
global_list.cursor_mut_from_ptr(Arc::into_raw(self.0.clone()))
};
curor_mut.remove();
global_list.push_front(self.0.clone());
}
if self.0.per_cache_link.is_linked() {
let mut per_cache_list = self.0.per_cache_list.lock().expect("poisoned");
let mut curor_mut = unsafe {
// Getting the cursor is unsafe as we need to ensure the list link
// belongs to the given list.
per_cache_list.cursor_mut_from_ptr(Arc::into_raw(self.0.clone()))
};
curor_mut.remove();
per_cache_list.push_front(self.0.clone());
}
}
#[getter]
fn key(&self) -> &PyObject {
&self.0.key
}
#[getter]
fn value(&self) -> PyObject {
self.0.value.lock().expect("poisoned").clone()
}
#[setter]
fn set_value(&self, value: PyObject) {
*self.0.value.lock().expect("poisoned") = value
}
#[getter]
fn memory(&self) -> usize {
self.0.memory
}
}
#[pyfunction]
fn get_global_list() -> Vec<LruCacheNode> {
let list = GLOBAL_LIST.lock().expect("poisoned");
let mut vec = Vec::new();
let mut cursor = list.front();
while let Some(n) = cursor.clone_pointer() {
vec.push(LruCacheNode(n));
cursor.move_next();
}
vec
}
intrusive_adapter!(LruCacheNodeAdapterPerCache = Arc<LruCacheNodeInner>: LruCacheNodeInner { per_cache_link: LinkedListLink });
intrusive_adapter!(LruCacheNodeAdapterGlobal = Arc<LruCacheNodeInner>: LruCacheNodeInner { global_list_link: LinkedListLink });
lazy_static! {
static ref GLOBAL_LIST_ADAPTER: LruCacheNodeAdapterGlobal = LruCacheNodeAdapterGlobal::new();
static ref GLOBAL_LIST: Arc<Mutex<LinkedList<LruCacheNodeAdapterGlobal>>> =
Arc::new(Mutex::new(LinkedList::new(GLOBAL_LIST_ADAPTER.clone())));
}

View File

@@ -44,6 +44,7 @@ from twisted.internet.interfaces import IReactorTime
from synapse.config import cache as cache_config
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.metrics.jemalloc import get_jemalloc_stats
from synapse.synapse_rust.lru_cache import LruCacheNode, PerCacheLinkedList
from synapse.util import Clock, caches
from synapse.util.caches import CacheMetric, EvictionReason, register_cache
from synapse.util.caches.treecache import (
@@ -456,25 +457,21 @@ class LruCache(Generic[KT, VT]):
list_root = ListNode[_Node[KT, VT]].create_root_node()
rust_linked_list = PerCacheLinkedList()
lock = threading.Lock()
def evict() -> None:
while cache_len() > self.max_size:
# Get the last node in the list (i.e. the oldest node).
todelete = list_root.prev_node
todelete = rust_linked_list.get_back()
# The list root should always have a valid `prev_node` if the
# cache is not empty.
assert todelete is not None
# The node should always have a reference to a cache entry, as
# we only drop the cache entry when we remove the node from the
# list.
node = todelete.get_cache_entry()
assert node is not None
evicted_len = delete_node(node)
cache.pop(node.key, None)
evicted_len = delete_node(todelete)
cache.pop(todelete.key, None)
if metrics:
metrics.inc_evictions(EvictionReason.size, evicted_len)
@@ -502,14 +499,13 @@ class LruCache(Generic[KT, VT]):
def add_node(
key: KT, value: VT, callbacks: Collection[Callable[[], None]] = ()
) -> None:
node: _Node[KT, VT] = _Node(
list_root,
node: _Node[KT, VT] = LruCacheNode(
self,
rust_linked_list,
key,
value,
weak_ref_to_self,
real_clock,
callbacks,
prune_unread_entries,
set(callbacks),
0,
)
cache[key] = node
@@ -520,7 +516,7 @@ class LruCache(Generic[KT, VT]):
metrics.inc_memory_usage(node.memory)
def move_node_to_front(node: _Node[KT, VT]) -> None:
node.move_to_front(real_clock, list_root)
node.move_to_front()
def delete_node(node: _Node[KT, VT]) -> int:
node.drop_from_lists()