mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-19 02:20:44 +00:00
Compare commits
3 Commits
travis/pre
...
erikj/rust
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1780a70748 | ||
|
|
f3b7940e14 | ||
|
|
53e83c76b2 |
@@ -3,3 +3,7 @@
|
|||||||
|
|
||||||
[workspace]
|
[workspace]
|
||||||
members = ["rust"]
|
members = ["rust"]
|
||||||
|
|
||||||
|
|
||||||
|
[profile.release]
|
||||||
|
debug = true
|
||||||
|
|||||||
1
changelog.d/13733.misc
Normal file
1
changelog.d/13733.misc
Normal file
@@ -0,0 +1 @@
|
|||||||
|
Convert `LruCache` linked lists into Rust.
|
||||||
@@ -18,4 +18,7 @@ crate-type = ["cdylib"]
|
|||||||
name = "synapse.synapse_rust"
|
name = "synapse.synapse_rust"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
intrusive-collections = "0.9.4"
|
||||||
|
lazy_static = "1.4.0"
|
||||||
|
log = "0.4.17"
|
||||||
pyo3 = { version = "0.16.5", features = ["extension-module", "macros", "abi3", "abi3-py37"] }
|
pyo3 = { version = "0.16.5", features = ["extension-module", "macros", "abi3", "abi3-py37"] }
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
use pyo3::prelude::*;
|
use pyo3::prelude::*;
|
||||||
|
|
||||||
|
mod lru_cache;
|
||||||
|
|
||||||
/// Formats the sum of two numbers as string.
|
/// Formats the sum of two numbers as string.
|
||||||
#[pyfunction]
|
#[pyfunction]
|
||||||
#[pyo3(text_signature = "(a, b, /)")]
|
#[pyo3(text_signature = "(a, b, /)")]
|
||||||
@@ -9,8 +11,9 @@ fn sum_as_string(a: usize, b: usize) -> PyResult<String> {
|
|||||||
|
|
||||||
/// The entry point for defining the Python module.
|
/// The entry point for defining the Python module.
|
||||||
#[pymodule]
|
#[pymodule]
|
||||||
fn synapse_rust(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
|
fn synapse_rust(py: Python<'_>, m: &PyModule) -> PyResult<()> {
|
||||||
m.add_function(wrap_pyfunction!(sum_as_string, m)?)?;
|
m.add_function(wrap_pyfunction!(sum_as_string, m)?)?;
|
||||||
|
|
||||||
|
lru_cache::register_module(py, m)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
236
rust/src/lru_cache.rs
Normal file
236
rust/src/lru_cache.rs
Normal file
@@ -0,0 +1,236 @@
|
|||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
use intrusive_collections::{intrusive_adapter, LinkedListAtomicLink};
|
||||||
|
use intrusive_collections::{LinkedList, LinkedListLink};
|
||||||
|
use lazy_static::lazy_static;
|
||||||
|
use log::error;
|
||||||
|
use pyo3::prelude::*;
|
||||||
|
use pyo3::types::PySet;
|
||||||
|
|
||||||
|
/// Called when registering modules with python.
|
||||||
|
pub fn register_module(py: Python<'_>, m: &PyModule) -> PyResult<()> {
|
||||||
|
let child_module = PyModule::new(py, "push")?;
|
||||||
|
child_module.add_class::<LruCacheNode>()?;
|
||||||
|
child_module.add_class::<PerCacheLinkedList>()?;
|
||||||
|
child_module.add_function(wrap_pyfunction!(get_global_list, m)?)?;
|
||||||
|
|
||||||
|
m.add_submodule(child_module)?;
|
||||||
|
|
||||||
|
// We need to manually add the module to sys.modules to make `from
|
||||||
|
// synapse.synapse_rust import push` work.
|
||||||
|
py.import("sys")?
|
||||||
|
.getattr("modules")?
|
||||||
|
.set_item("synapse.synapse_rust.lru_cache", child_module)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[pyclass]
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct PerCacheLinkedList(Arc<Mutex<LinkedList<LruCacheNodeAdapterPerCache>>>);
|
||||||
|
|
||||||
|
#[pymethods]
|
||||||
|
impl PerCacheLinkedList {
|
||||||
|
#[new]
|
||||||
|
fn new() -> PerCacheLinkedList {
|
||||||
|
PerCacheLinkedList(Default::default())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_back(&self) -> Option<LruCacheNode> {
|
||||||
|
let list = self.0.lock().expect("poisoned");
|
||||||
|
list.back().clone_pointer().map(|n| LruCacheNode(n))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct LruCacheNodeInner {
|
||||||
|
per_cache_link: LinkedListAtomicLink,
|
||||||
|
global_list_link: LinkedListAtomicLink,
|
||||||
|
per_cache_list: Arc<Mutex<LinkedList<LruCacheNodeAdapterPerCache>>>,
|
||||||
|
cache: Mutex<Option<PyObject>>,
|
||||||
|
key: PyObject,
|
||||||
|
value: Arc<Mutex<PyObject>>,
|
||||||
|
callbacks: Py<PySet>,
|
||||||
|
memory: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[pyclass]
|
||||||
|
struct LruCacheNode(Arc<LruCacheNodeInner>);
|
||||||
|
|
||||||
|
#[pymethods]
|
||||||
|
impl LruCacheNode {
|
||||||
|
#[new]
|
||||||
|
fn py_new(
|
||||||
|
cache: PyObject,
|
||||||
|
cache_list: PerCacheLinkedList,
|
||||||
|
key: PyObject,
|
||||||
|
value: PyObject,
|
||||||
|
callbacks: Py<PySet>,
|
||||||
|
memory: usize,
|
||||||
|
) -> Self {
|
||||||
|
let node = Arc::new(LruCacheNodeInner {
|
||||||
|
per_cache_link: Default::default(),
|
||||||
|
global_list_link: Default::default(),
|
||||||
|
per_cache_list: cache_list.0,
|
||||||
|
cache: Mutex::new(Some(cache)),
|
||||||
|
key,
|
||||||
|
value: Arc::new(Mutex::new(value)),
|
||||||
|
callbacks,
|
||||||
|
memory,
|
||||||
|
});
|
||||||
|
|
||||||
|
GLOBAL_LIST
|
||||||
|
.lock()
|
||||||
|
.expect("posioned")
|
||||||
|
.push_front(node.clone());
|
||||||
|
|
||||||
|
node.per_cache_list
|
||||||
|
.lock()
|
||||||
|
.expect("posioned")
|
||||||
|
.push_front(node.clone());
|
||||||
|
|
||||||
|
LruCacheNode(node)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_callbacks(&self, py: Python<'_>, new_callbacks: &PyAny) -> PyResult<()> {
|
||||||
|
if new_callbacks.len()? == 0 {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let current_callbacks = self.0.callbacks.as_ref(py);
|
||||||
|
|
||||||
|
for cb in new_callbacks.iter()? {
|
||||||
|
current_callbacks.add(cb?)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run_and_clear_callbacks(&self, py: Python<'_>) {
|
||||||
|
let callbacks = self.0.callbacks.as_ref(py);
|
||||||
|
|
||||||
|
if callbacks.is_empty() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for callback in callbacks {
|
||||||
|
if let Err(err) = callback.call0() {
|
||||||
|
error!("LruCacheNode callback errored: {err}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
callbacks.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn drop_from_cache(&self) -> PyResult<()> {
|
||||||
|
let cache = self.0.cache.lock().expect("poisoned").take();
|
||||||
|
|
||||||
|
if let Some(cache) = cache {
|
||||||
|
Python::with_gil(|py| cache.call_method1(py, "pop", (&self.0.key, None::<()>)))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.drop_from_lists();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn drop_from_lists(&self) {
|
||||||
|
if self.0.global_list_link.is_linked() {
|
||||||
|
let mut glboal_list = GLOBAL_LIST.lock().expect("poisoned");
|
||||||
|
|
||||||
|
let mut curor_mut = unsafe {
|
||||||
|
// Getting the cursor is unsafe as we need to ensure the list link
|
||||||
|
// belongs to the given list.
|
||||||
|
glboal_list.cursor_mut_from_ptr(Arc::into_raw(self.0.clone()))
|
||||||
|
};
|
||||||
|
|
||||||
|
curor_mut.remove();
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.0.per_cache_link.is_linked() {
|
||||||
|
let mut per_cache_list = self.0.per_cache_list.lock().expect("poisoned");
|
||||||
|
|
||||||
|
let mut curor_mut = unsafe {
|
||||||
|
// Getting the cursor is unsafe as we need to ensure the list link
|
||||||
|
// belongs to the given list.
|
||||||
|
per_cache_list.cursor_mut_from_ptr(Arc::into_raw(self.0.clone()))
|
||||||
|
};
|
||||||
|
|
||||||
|
curor_mut.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn move_to_front(&self) {
|
||||||
|
if self.0.global_list_link.is_linked() {
|
||||||
|
let mut global_list = GLOBAL_LIST.lock().expect("poisoned");
|
||||||
|
|
||||||
|
let mut curor_mut = unsafe {
|
||||||
|
// Getting the cursor is unsafe as we need to ensure the list link
|
||||||
|
// belongs to the given list.
|
||||||
|
global_list.cursor_mut_from_ptr(Arc::into_raw(self.0.clone()))
|
||||||
|
};
|
||||||
|
curor_mut.remove();
|
||||||
|
|
||||||
|
global_list.push_front(self.0.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.0.per_cache_link.is_linked() {
|
||||||
|
let mut per_cache_list = self.0.per_cache_list.lock().expect("poisoned");
|
||||||
|
|
||||||
|
let mut curor_mut = unsafe {
|
||||||
|
// Getting the cursor is unsafe as we need to ensure the list link
|
||||||
|
// belongs to the given list.
|
||||||
|
per_cache_list.cursor_mut_from_ptr(Arc::into_raw(self.0.clone()))
|
||||||
|
};
|
||||||
|
|
||||||
|
curor_mut.remove();
|
||||||
|
|
||||||
|
per_cache_list.push_front(self.0.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[getter]
|
||||||
|
fn key(&self) -> &PyObject {
|
||||||
|
&self.0.key
|
||||||
|
}
|
||||||
|
|
||||||
|
#[getter]
|
||||||
|
fn value(&self) -> PyObject {
|
||||||
|
self.0.value.lock().expect("poisoned").clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[setter]
|
||||||
|
fn set_value(&self, value: PyObject) {
|
||||||
|
*self.0.value.lock().expect("poisoned") = value
|
||||||
|
}
|
||||||
|
|
||||||
|
#[getter]
|
||||||
|
fn memory(&self) -> usize {
|
||||||
|
self.0.memory
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[pyfunction]
|
||||||
|
fn get_global_list() -> Vec<LruCacheNode> {
|
||||||
|
let list = GLOBAL_LIST.lock().expect("poisoned");
|
||||||
|
|
||||||
|
let mut vec = Vec::new();
|
||||||
|
|
||||||
|
let mut cursor = list.front();
|
||||||
|
|
||||||
|
while let Some(n) = cursor.clone_pointer() {
|
||||||
|
vec.push(LruCacheNode(n));
|
||||||
|
|
||||||
|
cursor.move_next();
|
||||||
|
}
|
||||||
|
|
||||||
|
vec
|
||||||
|
}
|
||||||
|
|
||||||
|
intrusive_adapter!(LruCacheNodeAdapterPerCache = Arc<LruCacheNodeInner>: LruCacheNodeInner { per_cache_link: LinkedListLink });
|
||||||
|
intrusive_adapter!(LruCacheNodeAdapterGlobal = Arc<LruCacheNodeInner>: LruCacheNodeInner { global_list_link: LinkedListLink });
|
||||||
|
|
||||||
|
lazy_static! {
|
||||||
|
static ref GLOBAL_LIST_ADAPTER: LruCacheNodeAdapterGlobal = LruCacheNodeAdapterGlobal::new();
|
||||||
|
static ref GLOBAL_LIST: Arc<Mutex<LinkedList<LruCacheNodeAdapterGlobal>>> =
|
||||||
|
Arc::new(Mutex::new(LinkedList::new(GLOBAL_LIST_ADAPTER.clone())));
|
||||||
|
}
|
||||||
@@ -44,6 +44,7 @@ from twisted.internet.interfaces import IReactorTime
|
|||||||
from synapse.config import cache as cache_config
|
from synapse.config import cache as cache_config
|
||||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||||
from synapse.metrics.jemalloc import get_jemalloc_stats
|
from synapse.metrics.jemalloc import get_jemalloc_stats
|
||||||
|
from synapse.synapse_rust.lru_cache import LruCacheNode, PerCacheLinkedList
|
||||||
from synapse.util import Clock, caches
|
from synapse.util import Clock, caches
|
||||||
from synapse.util.caches import CacheMetric, EvictionReason, register_cache
|
from synapse.util.caches import CacheMetric, EvictionReason, register_cache
|
||||||
from synapse.util.caches.treecache import (
|
from synapse.util.caches.treecache import (
|
||||||
@@ -456,25 +457,21 @@ class LruCache(Generic[KT, VT]):
|
|||||||
|
|
||||||
list_root = ListNode[_Node[KT, VT]].create_root_node()
|
list_root = ListNode[_Node[KT, VT]].create_root_node()
|
||||||
|
|
||||||
|
rust_linked_list = PerCacheLinkedList()
|
||||||
|
|
||||||
lock = threading.Lock()
|
lock = threading.Lock()
|
||||||
|
|
||||||
def evict() -> None:
|
def evict() -> None:
|
||||||
while cache_len() > self.max_size:
|
while cache_len() > self.max_size:
|
||||||
# Get the last node in the list (i.e. the oldest node).
|
# Get the last node in the list (i.e. the oldest node).
|
||||||
todelete = list_root.prev_node
|
todelete = rust_linked_list.get_back()
|
||||||
|
|
||||||
# The list root should always have a valid `prev_node` if the
|
# The list root should always have a valid `prev_node` if the
|
||||||
# cache is not empty.
|
# cache is not empty.
|
||||||
assert todelete is not None
|
assert todelete is not None
|
||||||
|
|
||||||
# The node should always have a reference to a cache entry, as
|
evicted_len = delete_node(todelete)
|
||||||
# we only drop the cache entry when we remove the node from the
|
cache.pop(todelete.key, None)
|
||||||
# list.
|
|
||||||
node = todelete.get_cache_entry()
|
|
||||||
assert node is not None
|
|
||||||
|
|
||||||
evicted_len = delete_node(node)
|
|
||||||
cache.pop(node.key, None)
|
|
||||||
if metrics:
|
if metrics:
|
||||||
metrics.inc_evictions(EvictionReason.size, evicted_len)
|
metrics.inc_evictions(EvictionReason.size, evicted_len)
|
||||||
|
|
||||||
@@ -502,14 +499,13 @@ class LruCache(Generic[KT, VT]):
|
|||||||
def add_node(
|
def add_node(
|
||||||
key: KT, value: VT, callbacks: Collection[Callable[[], None]] = ()
|
key: KT, value: VT, callbacks: Collection[Callable[[], None]] = ()
|
||||||
) -> None:
|
) -> None:
|
||||||
node: _Node[KT, VT] = _Node(
|
node: _Node[KT, VT] = LruCacheNode(
|
||||||
list_root,
|
self,
|
||||||
|
rust_linked_list,
|
||||||
key,
|
key,
|
||||||
value,
|
value,
|
||||||
weak_ref_to_self,
|
set(callbacks),
|
||||||
real_clock,
|
0,
|
||||||
callbacks,
|
|
||||||
prune_unread_entries,
|
|
||||||
)
|
)
|
||||||
cache[key] = node
|
cache[key] = node
|
||||||
|
|
||||||
@@ -520,7 +516,7 @@ class LruCache(Generic[KT, VT]):
|
|||||||
metrics.inc_memory_usage(node.memory)
|
metrics.inc_memory_usage(node.memory)
|
||||||
|
|
||||||
def move_node_to_front(node: _Node[KT, VT]) -> None:
|
def move_node_to_front(node: _Node[KT, VT]) -> None:
|
||||||
node.move_to_front(real_clock, list_root)
|
node.move_to_front()
|
||||||
|
|
||||||
def delete_node(node: _Node[KT, VT]) -> int:
|
def delete_node(node: _Node[KT, VT]) -> int:
|
||||||
node.drop_from_lists()
|
node.drop_from_lists()
|
||||||
|
|||||||
Reference in New Issue
Block a user