mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-07 01:20:16 +00:00
Compare commits
47 Commits
erikj/tree
...
hs/hacked-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1eb989fa7a | ||
|
|
fb719663c5 | ||
|
|
61293c86c1 | ||
|
|
fad91897ec | ||
|
|
64970400ae | ||
|
|
f6be8041d8 | ||
|
|
f19795355b | ||
|
|
81dd28c216 | ||
|
|
8bfb649cda | ||
|
|
7d33ba70df | ||
|
|
dacc395dca | ||
|
|
6a20b4f32e | ||
|
|
a5e400286d | ||
|
|
1f3e399ea0 | ||
|
|
76288b9fbd | ||
|
|
89ad2f60ca | ||
|
|
1315b6a0f2 | ||
|
|
5e8c7b4b05 | ||
|
|
d77072ff6e | ||
|
|
f98d0985d3 | ||
|
|
82fcd3cce8 | ||
|
|
4be9d0918e | ||
|
|
5f631512b5 | ||
|
|
9009d29b81 | ||
|
|
16f23761bb | ||
|
|
45b1c58898 | ||
|
|
316db51bed | ||
|
|
4a3260092d | ||
|
|
0831f16757 | ||
|
|
52d3e5c796 | ||
|
|
2585f57e60 | ||
|
|
1e11898863 | ||
|
|
48b35d9404 | ||
|
|
2be3a0284f | ||
|
|
f55d926682 | ||
|
|
8b36deef2f | ||
|
|
c2f5415afe | ||
|
|
2de6060266 | ||
|
|
873da386d9 | ||
|
|
e9fbbf1342 | ||
|
|
9ac17af4b4 | ||
|
|
c673e3ec1c | ||
|
|
5358283ac6 | ||
|
|
fc38c182bd | ||
|
|
963e1c6540 | ||
|
|
0b117731ef | ||
|
|
264e9a6ee3 |
1
changelog.d/9379.feature
Normal file
1
changelog.d/9379.feature
Normal file
@@ -0,0 +1 @@
|
||||
Store cached events in the external redis cache, when redis is enabled.
|
||||
@@ -676,6 +676,13 @@ retention:
|
||||
#
|
||||
#event_cache_size: 10K
|
||||
|
||||
# The expiry time of an event stored in the external cache (Redis). This
|
||||
# time will be reset each time the event is accessed.
|
||||
# This is only used when Redis is configured.
|
||||
# Defaults to 30 minutes
|
||||
#
|
||||
#external_event_cache_expiry_ms: 1800000
|
||||
|
||||
caches:
|
||||
# Controls the global cache factor, which is the default cache factor
|
||||
# for all caches if a specific factor for that cache is not otherwise
|
||||
|
||||
@@ -35,6 +35,7 @@ from synapse.logging.context import (
|
||||
make_deferred_yieldable,
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.replication.tcp.external_cache import ExternalCache
|
||||
from synapse.storage.database import DatabasePool, make_conn
|
||||
from synapse.storage.databases.main.client_ips import ClientIpBackgroundUpdateStore
|
||||
from synapse.storage.databases.main.deviceinbox import DeviceInboxBackgroundUpdateStore
|
||||
@@ -208,13 +209,19 @@ class Store(
|
||||
"Attempt to set room_is_public during port_db: database not empty?"
|
||||
)
|
||||
|
||||
|
||||
class MockHomeserver:
|
||||
def __init__(self, config):
|
||||
self.clock = Clock(reactor)
|
||||
self.config = config
|
||||
self.hostname = config.server_name
|
||||
self.version_string = "Synapse/" + get_version_string(synapse)
|
||||
self.external_cache = ExternalCache(self)
|
||||
|
||||
def get_outbound_redis_connection(self):
|
||||
return None
|
||||
|
||||
def get_external_cache(self):
|
||||
return self.external_cache
|
||||
|
||||
def get_clock(self):
|
||||
return self.clock
|
||||
|
||||
@@ -31,6 +31,8 @@ class RedisProtocol(protocol.Protocol):
|
||||
only_if_exists: bool = False,
|
||||
) -> None: ...
|
||||
async def get(self, key: str) -> Any: ...
|
||||
async def delete(self, key: str) -> None: ...
|
||||
async def expire(self, key: str, expire: int) -> None: ...
|
||||
|
||||
class SubscriberProtocol(RedisProtocol):
|
||||
def __init__(self, *args, **kwargs): ...
|
||||
|
||||
@@ -32,6 +32,7 @@ _CACHES_LOCK = threading.Lock()
|
||||
|
||||
_DEFAULT_FACTOR_SIZE = 0.5
|
||||
_DEFAULT_EVENT_CACHE_SIZE = "10K"
|
||||
_DEFAULT_EXTERNAL_CACHE_EXPIRY_MS = 30 * 60 * 1000 # 30 minutes
|
||||
|
||||
|
||||
class CacheProperties:
|
||||
@@ -115,6 +116,13 @@ class CacheConfig(Config):
|
||||
#
|
||||
#event_cache_size: 10K
|
||||
|
||||
# The expiry time of an event stored in the external cache (Redis). This
|
||||
# time will be reset each time the event is accessed.
|
||||
# This is only used when Redis is configured.
|
||||
# Defaults to 30 minutes
|
||||
#
|
||||
#external_event_cache_expiry_ms: 1800000
|
||||
|
||||
caches:
|
||||
# Controls the global cache factor, which is the default cache factor
|
||||
# for all caches if a specific factor for that cache is not otherwise
|
||||
@@ -166,6 +174,13 @@ class CacheConfig(Config):
|
||||
self.event_cache_size = self.parse_size(
|
||||
config.get("event_cache_size", _DEFAULT_EVENT_CACHE_SIZE)
|
||||
)
|
||||
|
||||
self.external_event_cache_expiry_ms = config.get(
|
||||
"external_event_cache_expiry_ms", _DEFAULT_EXTERNAL_CACHE_EXPIRY_MS
|
||||
)
|
||||
if not isinstance(self.external_event_cache_expiry_ms, (int, float)):
|
||||
raise ConfigError("external_event_cache_expiry_ms must be a number.")
|
||||
|
||||
self.cache_factors: Dict[str, float] = {}
|
||||
|
||||
cache_config = config.get("caches") or {}
|
||||
|
||||
@@ -35,6 +35,12 @@ get_counter = Counter(
|
||||
labelnames=["cache_name", "hit"],
|
||||
)
|
||||
|
||||
delete_counter = Counter(
|
||||
"synapse_external_cache_delete",
|
||||
"Number of times we deleted keys from a cache",
|
||||
labelnames=["cache_name"],
|
||||
)
|
||||
|
||||
response_timer = Histogram(
|
||||
"synapse_external_cache_response_time_seconds",
|
||||
"Time taken to get a response from Redis for a cache get/set request",
|
||||
@@ -72,7 +78,24 @@ class ExternalCache:
|
||||
"""
|
||||
return self._redis_connection is not None
|
||||
|
||||
async def set(self, cache_name: str, key: str, value: Any, expiry_ms: int) -> None:
|
||||
async def delete(self, cache_name: str, key: str) -> None:
|
||||
"""Delete a key from the named cache."""
|
||||
|
||||
if self._redis_connection is None:
|
||||
return
|
||||
delete_counter.labels(cache_name).inc()
|
||||
|
||||
logger.debug("Deleting %s %s", cache_name, key)
|
||||
|
||||
return await make_deferred_yieldable(
|
||||
self._redis_connection.delete(
|
||||
self._get_redis_key(cache_name, key),
|
||||
)
|
||||
)
|
||||
|
||||
async def set(
|
||||
self, cache_name: str, key: str, value: Any, expiry_ms: Optional[int] = None
|
||||
) -> None:
|
||||
"""Add the key/value to the named cache, with the expiry time given."""
|
||||
|
||||
if self._redis_connection is None:
|
||||
@@ -95,15 +118,18 @@ class ExternalCache:
|
||||
)
|
||||
)
|
||||
|
||||
async def get(self, cache_name: str, key: str) -> Optional[Any]:
|
||||
async def get(
|
||||
self, cache_name: str, key: str, expiry_ms: Optional[int] = None
|
||||
) -> Optional[Any]:
|
||||
"""Look up a key/value in the named cache."""
|
||||
|
||||
if self._redis_connection is None:
|
||||
return None
|
||||
|
||||
cache_key = self._get_redis_key(cache_name, key)
|
||||
with response_timer.labels("get").time():
|
||||
result = await make_deferred_yieldable(
|
||||
self._redis_connection.get(self._get_redis_key(cache_name, key))
|
||||
self._redis_connection.get(cache_key)
|
||||
)
|
||||
|
||||
logger.debug("Got cache result %s %s: %r", cache_name, key, result)
|
||||
@@ -113,6 +139,13 @@ class ExternalCache:
|
||||
if not result:
|
||||
return None
|
||||
|
||||
if expiry_ms:
|
||||
# If we are using this key, bump the expiry time
|
||||
# NOTE: txredisapi does not support pexire, so we must use (expire) seconds
|
||||
await make_deferred_yieldable(
|
||||
self._redis_connection.expire(cache_key, expiry_ms // 1000)
|
||||
)
|
||||
|
||||
# For some reason the integers get magically converted back to integers
|
||||
if isinstance(result, int):
|
||||
return result
|
||||
|
||||
@@ -78,7 +78,7 @@ logger = logging.getLogger(__name__)
|
||||
EVENT_QUEUE_THREADS = 3 # Max number of threads that will fetch events
|
||||
EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events
|
||||
EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events
|
||||
|
||||
GET_EVENT_CACHE_NAME = "getEvent"
|
||||
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class _EventCacheEntry:
|
||||
@@ -165,10 +165,14 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
5 * 60 * 1000,
|
||||
)
|
||||
|
||||
self._external_cache = hs.get_external_cache()
|
||||
self._get_event_cache = LruCache(
|
||||
cache_name="*getEvent*",
|
||||
max_size=hs.config.caches.event_cache_size,
|
||||
)
|
||||
self._external_cache_event_expiry_ms = (
|
||||
hs.config.caches.external_event_cache_expiry_ms
|
||||
)
|
||||
|
||||
# Map from event ID to a deferred that will result in a map from event
|
||||
# ID to cache entry. Note that the returned dict may not have the
|
||||
@@ -511,7 +515,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
Returns:
|
||||
map from event id to result
|
||||
"""
|
||||
event_entry_map = self._get_events_from_cache(
|
||||
event_entry_map = await self._get_events_from_cache(
|
||||
event_ids,
|
||||
)
|
||||
|
||||
@@ -593,8 +597,77 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
def _invalidate_get_event_cache(self, event_id):
|
||||
self._get_event_cache.invalidate((event_id,))
|
||||
if self._external_cache.is_enabled():
|
||||
# XXX: Is there danger in doing this?
|
||||
# We could hold a set of recently evicted keys in memory if
|
||||
# we need this to be synchronous?
|
||||
run_as_background_process(
|
||||
"getEvent_external_cache_delete",
|
||||
self._external_cache.delete,
|
||||
GET_EVENT_CACHE_NAME,
|
||||
event_id,
|
||||
)
|
||||
|
||||
def _get_events_from_cache(
|
||||
def create_external_cache_event_from_event(self, event, redacted_event=None):
|
||||
if redacted_event:
|
||||
redacted_event = self.create_external_cache_event_from_event(
|
||||
redacted_event
|
||||
)[0]
|
||||
|
||||
event_dict = event.get_dict()
|
||||
|
||||
for key, value in event.unsigned.items():
|
||||
if isinstance(value, EventBase):
|
||||
event_dict["unsigned"][key] = {"_cache_event_id": value.event_id}
|
||||
|
||||
return _EventCacheEntry(
|
||||
event={
|
||||
"event_dict": event_dict,
|
||||
"room_version": event.room_version.identifier,
|
||||
"internal_metadata_dict": event.get_internal_metadata_dict(),
|
||||
"rejected_reason": event.rejected_reason,
|
||||
"stream_ordering": event.internal_metadata.stream_ordering,
|
||||
},
|
||||
redacted_event=redacted_event,
|
||||
)
|
||||
|
||||
async def _create_event_cache_entry_from_external_cache_entry(
|
||||
self, external_entry: Tuple[JsonDict, Optional[JsonDict]]
|
||||
) -> Optional[_EventCacheEntry]:
|
||||
"""Create a _EventCacheEntry from a tuple of dicts
|
||||
Args:
|
||||
external_entry: A tuple of event, redacted_event
|
||||
Returns:
|
||||
A _EventCacheEntry containing the frozen event(s)
|
||||
"""
|
||||
event_dict = external_entry[0].get("event_dict")
|
||||
for key, value in event_dict.get("unsigned", {}).items():
|
||||
# If unsigned contained any events, get them now
|
||||
if isinstance(value, dict) and value.get("_cache_event_id"):
|
||||
event_dict["unsigned"][key] = await self.get_event(
|
||||
value["_cache_event_id"]
|
||||
)
|
||||
|
||||
original_ev = make_event_from_dict(
|
||||
event_dict=event_dict,
|
||||
room_version=KNOWN_ROOM_VERSIONS[external_entry[0].get("room_version")],
|
||||
internal_metadata_dict=external_entry[0].get("internal_metadata_dict"),
|
||||
rejected_reason=external_entry[0].get("rejected_reason"),
|
||||
)
|
||||
original_ev.internal_metadata.stream_ordering = external_entry[0].get(
|
||||
"stream_ordering"
|
||||
)
|
||||
redacted_ev = None
|
||||
if external_entry[1]:
|
||||
redacted_ev = make_event_from_dict(
|
||||
event_dict=external_entry[1].get("event_dict"),
|
||||
room_version=KNOWN_ROOM_VERSIONS[external_entry[1].get("room_version")],
|
||||
internal_metadata_dict=external_entry[1].get("internal_metadata_dict"),
|
||||
rejected_reason=external_entry[1].get("rejected_reason"),
|
||||
)
|
||||
return _EventCacheEntry(event=original_ev, redacted_event=redacted_ev)
|
||||
|
||||
async def _get_events_from_cache(
|
||||
self, events: Iterable[str], update_metrics: bool = True
|
||||
) -> Dict[str, _EventCacheEntry]:
|
||||
"""Fetch events from the caches.
|
||||
@@ -608,9 +681,27 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
event_map = {}
|
||||
|
||||
for event_id in events:
|
||||
# L1 cache - internal
|
||||
ret = self._get_event_cache.get(
|
||||
(event_id,), None, update_metrics=update_metrics
|
||||
)
|
||||
|
||||
if not ret and self._external_cache.is_enabled():
|
||||
# L2 cache - external
|
||||
cache_result = await self._external_cache.get(
|
||||
GET_EVENT_CACHE_NAME,
|
||||
event_id,
|
||||
self._external_cache_event_expiry_ms,
|
||||
)
|
||||
if cache_result:
|
||||
ret = (
|
||||
await self._create_event_cache_entry_from_external_cache_entry(
|
||||
cache_result
|
||||
)
|
||||
)
|
||||
# We got a hit here, store it in the L1 cache
|
||||
self._get_event_cache.set((event_id,), ret)
|
||||
|
||||
if not ret:
|
||||
continue
|
||||
|
||||
@@ -889,10 +980,22 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
cache_entry = _EventCacheEntry(
|
||||
event=original_ev, redacted_event=redacted_event
|
||||
)
|
||||
|
||||
self._get_event_cache.set((event_id,), cache_entry)
|
||||
result_map[event_id] = cache_entry
|
||||
|
||||
if self._external_cache.is_enabled():
|
||||
# Store in the L2 cache
|
||||
# Redis cannot store a FrozenEvent, so we transform these
|
||||
# into two dicts
|
||||
redis_cache_entry = self.create_external_cache_event_from_event(
|
||||
original_ev, redacted_event
|
||||
)
|
||||
await self._external_cache.set(
|
||||
GET_EVENT_CACHE_NAME,
|
||||
event_id,
|
||||
redis_cache_entry,
|
||||
)
|
||||
|
||||
return result_map
|
||||
|
||||
async def _enqueue_events(self, events):
|
||||
|
||||
@@ -629,7 +629,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
# We don't update the event cache hit ratio as it completely throws off
|
||||
# the hit ratio counts. After all, we don't populate the cache if we
|
||||
# miss it here
|
||||
event_map = self._get_events_from_cache(member_event_ids, update_metrics=False)
|
||||
event_map = await self._get_events_from_cache(member_event_ids, update_metrics=False)
|
||||
|
||||
missing_member_event_ids = []
|
||||
for event_id in member_event_ids:
|
||||
|
||||
@@ -525,6 +525,10 @@ class FakeRedisPubSubProtocol(Protocol):
|
||||
self.send("OK")
|
||||
elif command == b"GET":
|
||||
self.send(None)
|
||||
elif command == b"DEL":
|
||||
self.send("OK")
|
||||
elif command == b"EXPIRE":
|
||||
self.send("OK")
|
||||
else:
|
||||
raise Exception("Unknown command")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user