mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Compare commits
8 Commits
dmr/storag
...
rei/gsgfg2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
20fcc711e8 | ||
|
|
9986edba00 | ||
|
|
699f2197e3 | ||
|
|
115970d0d7 | ||
|
|
fda00e102b | ||
|
|
967427c1b7 | ||
|
|
92253361c4 | ||
|
|
471266d0fd |
1
changelog.d/10681.misc
Normal file
1
changelog.d/10681.misc
Normal file
@@ -0,0 +1 @@
|
||||
Deduplicate requests in `_get_state_for_groups`.
|
||||
1
mypy.ini
1
mypy.ini
@@ -96,6 +96,7 @@ files =
|
||||
tests/handlers/test_sync.py,
|
||||
tests/rest/client/test_login.py,
|
||||
tests/rest/client/test_auth.py,
|
||||
tests/storage/test_state.py,
|
||||
tests/util/test_itertools.py,
|
||||
tests/util/test_stream_change_cache.py
|
||||
|
||||
|
||||
@@ -14,21 +14,31 @@
|
||||
|
||||
import logging
|
||||
from collections import namedtuple
|
||||
from typing import Dict, Iterable, List, Optional, Set, Tuple
|
||||
from typing import Dict, FrozenSet, Iterable, List, Optional, Set, Tuple, Union
|
||||
|
||||
import attr
|
||||
|
||||
from twisted.internet.defer import Deferred
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStore
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.storage.util.sequence import build_sequence_generator
|
||||
from synapse.types import MutableStateMap, StateMap
|
||||
from synapse.types import MutableStateMap, StateKey, StateMap
|
||||
from synapse.util.caches.descriptors import cached
|
||||
from synapse.util.caches.dictionary_cache import DictionaryCache
|
||||
from synapse.util.caches.multi_key_response_cache import MultiKeyResponseCache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
InflightStateGroupCacheKey = Union[
|
||||
Tuple[int, StateFilter], Tuple[int, str, Optional[str]]
|
||||
]
|
||||
|
||||
|
||||
MAX_STATE_DELTA_HOPS = 100
|
||||
|
||||
@@ -81,16 +91,29 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
# We size the non-members cache to be smaller than the members cache as the
|
||||
# vast majority of state in Matrix (today) is member events.
|
||||
|
||||
self._state_group_cache = DictionaryCache(
|
||||
self._state_group_cache: DictionaryCache[int, StateKey] = DictionaryCache(
|
||||
"*stateGroupCache*",
|
||||
# TODO: this hasn't been tuned yet
|
||||
50000,
|
||||
)
|
||||
self._state_group_members_cache = DictionaryCache(
|
||||
self._state_group_members_cache: DictionaryCache[
|
||||
int, StateKey
|
||||
] = DictionaryCache(
|
||||
"*stateGroupMembersCache*",
|
||||
500000,
|
||||
)
|
||||
|
||||
self._state_group_inflight_cache: MultiKeyResponseCache[
|
||||
InflightStateGroupCacheKey, Dict[int, StateMap[str]]
|
||||
] = MultiKeyResponseCache(
|
||||
self.hs.get_clock(),
|
||||
"*stateGroupInflightCache*",
|
||||
# As the results from this transaction immediately go into the
|
||||
# immediate caches _state_group_cache and _state_group_members_cache,
|
||||
# we do not keep them in the in-flight cache when done.
|
||||
timeout_ms=0,
|
||||
)
|
||||
|
||||
def get_max_state_group_txn(txn: Cursor):
|
||||
txn.execute("SELECT COALESCE(max(id), 0) FROM state_groups")
|
||||
return txn.fetchone()[0]
|
||||
@@ -168,13 +191,18 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
|
||||
return results
|
||||
|
||||
def _get_state_for_group_using_cache(self, cache, group, state_filter):
|
||||
def _get_state_for_group_using_cache(
|
||||
self,
|
||||
cache: DictionaryCache[int, StateKey],
|
||||
group: int,
|
||||
state_filter: StateFilter,
|
||||
) -> Tuple[MutableStateMap[str], bool]:
|
||||
"""Checks if group is in cache. See `_get_state_for_groups`
|
||||
|
||||
Args:
|
||||
cache(DictionaryCache): the state group cache to use
|
||||
group(int): The state group to lookup
|
||||
state_filter (StateFilter): The state filter used to fetch state
|
||||
cache: the state group cache to use
|
||||
group: The state group to lookup
|
||||
state_filter: The state filter used to fetch state
|
||||
from the database.
|
||||
|
||||
Returns 2-tuple (`state_dict`, `got_all`).
|
||||
@@ -212,7 +240,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
|
||||
) -> Dict[int, MutableStateMap[str]]:
|
||||
"""Gets the state at each of a list of state groups, optionally
|
||||
filtering by type/state_key
|
||||
filtering by type/state_key.
|
||||
|
||||
Args:
|
||||
groups: list of state groups for which we want
|
||||
@@ -221,11 +249,83 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
from the database.
|
||||
Returns:
|
||||
Dict of state group to state map.
|
||||
"""
|
||||
state_filter = state_filter or StateFilter.all()
|
||||
|
||||
|
||||
The flow for this function looks as follows:
|
||||
|
||||
* Query the immediate caches (self._state_group_cache,
|
||||
| self._state_group_members_cache).
|
||||
NONSTOP |
|
||||
|
|
||||
* Query the in-flight cache (self._state_group_inflight_cache)
|
||||
| for immediate-cache misses.
|
||||
NONSTOP |
|
||||
|
|
||||
* Service cache misses:
|
||||
| - Expand the state filter (to help cache hit ratio).
|
||||
| - Start a new transaction to fetch outstanding groups.
|
||||
| - Register entries in the in-flight cache for this transaction.
|
||||
| - (When the transaction is finished) Register entries in
|
||||
| the immediate caches.
|
||||
|
|
||||
* Wait for in-flight requests to finish...
|
||||
|
|
||||
* Assemble everything together and filter out anything we didn't
|
||||
ask for.
|
||||
|
||||
The sections marked NONSTOP must not contain any `await`s, otherwise
|
||||
race conditions could occur and the cache could be made less effective.
|
||||
"""
|
||||
|
||||
def try_combine_inflight_requests(
|
||||
group: int,
|
||||
state_filter: StateFilter,
|
||||
mut_inflight_requests: "List[Tuple[int, Deferred[Dict[int, StateMap[str]]]]]",
|
||||
) -> bool:
|
||||
"""
|
||||
Tries to collect existing in-flight requests that would give us all
|
||||
the desired state for the given group.
|
||||
|
||||
Returns true if successful, or false if not.
|
||||
If successful, adds more in-flight requests to the `mut_inflight_requests` list.
|
||||
"""
|
||||
original_inflight_requests = len(mut_inflight_requests)
|
||||
for event_type, state_keys in state_filter.types.items():
|
||||
# First see if any requests are looking up ALL state keys for this
|
||||
# event type.
|
||||
result = self._state_group_inflight_cache.get((group, event_type, None))
|
||||
if result is not None:
|
||||
inflight_requests.append((group, make_deferred_yieldable(result)))
|
||||
continue
|
||||
|
||||
if state_keys is None:
|
||||
# We want all state keys, but there isn't a request in-flight
|
||||
# wanting them all, so we have to give up here.
|
||||
del mut_inflight_requests[original_inflight_requests:]
|
||||
return False
|
||||
else:
|
||||
# If we are only interested in certain state keys,
|
||||
# we can see if other in-flight requests would manage to
|
||||
# give us all the wanted state keys.
|
||||
for state_key in state_keys:
|
||||
result = self._state_group_inflight_cache.get(
|
||||
(group, event_type, state_key)
|
||||
)
|
||||
if result is None:
|
||||
# There isn't an in-flight request already requesting
|
||||
# this, so give up here.
|
||||
del mut_inflight_requests[original_inflight_requests:]
|
||||
return False
|
||||
|
||||
inflight_requests.append(
|
||||
(group, make_deferred_yieldable(result))
|
||||
)
|
||||
return True
|
||||
|
||||
state_filter = state_filter or StateFilter.all()
|
||||
member_filter, non_member_filter = state_filter.get_member_split()
|
||||
|
||||
# QUERY THE IMMEDIATE CACHES
|
||||
# Now we look them up in the member and non-member caches
|
||||
(
|
||||
non_member_state,
|
||||
@@ -242,43 +342,131 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
for group in groups:
|
||||
state[group].update(member_state[group])
|
||||
|
||||
# Now fetch any missing groups from the database
|
||||
|
||||
incomplete_groups = incomplete_groups_m | incomplete_groups_nm
|
||||
|
||||
if not incomplete_groups:
|
||||
return state
|
||||
|
||||
cache_sequence_nm = self._state_group_cache.sequence
|
||||
cache_sequence_m = self._state_group_members_cache.sequence
|
||||
# QUERY THE IN-FLIGHT CACHE
|
||||
# list (group ID -> Deferred that will contain a result for that group)
|
||||
inflight_requests: List[Tuple[int, Deferred[Dict[int, StateMap[str]]]]] = []
|
||||
inflight_cache_misses: List[int] = []
|
||||
|
||||
# Help the cache hit ratio by expanding the filter a bit
|
||||
# When we get around to requesting state from the database, we help the
|
||||
# cache hit ratio by expanding the filter a bit.
|
||||
# However, we need to know this now so that we can properly query the
|
||||
# in-flight cache where include_others is concerned.
|
||||
db_state_filter = state_filter.return_expanded()
|
||||
|
||||
group_to_state_dict = await self._get_state_groups_from_groups(
|
||||
list(incomplete_groups), state_filter=db_state_filter
|
||||
)
|
||||
for group in incomplete_groups:
|
||||
event_type: str
|
||||
state_keys: Optional[FrozenSet[str]]
|
||||
|
||||
# Now lets update the caches
|
||||
self._insert_into_cache(
|
||||
group_to_state_dict,
|
||||
db_state_filter,
|
||||
cache_seq_num_members=cache_sequence_m,
|
||||
cache_seq_num_non_members=cache_sequence_nm,
|
||||
)
|
||||
# First check if our exact state filter is being looked up.
|
||||
result = self._state_group_inflight_cache.get((group, db_state_filter))
|
||||
if result is not None:
|
||||
inflight_requests.append((group, make_deferred_yieldable(result)))
|
||||
continue
|
||||
|
||||
# Then check if the universal state filter is being looked up.
|
||||
result = self._state_group_inflight_cache.get((group, StateFilter.all()))
|
||||
if result is not None:
|
||||
inflight_requests.append((group, make_deferred_yieldable(result)))
|
||||
continue
|
||||
|
||||
if state_filter.include_others:
|
||||
# if the state filter includes others, we only match against the
|
||||
# state filter directly, so we give up here.
|
||||
# This is because it's too complex to cache this case properly.
|
||||
inflight_cache_misses.append(group)
|
||||
continue
|
||||
elif not db_state_filter.include_others:
|
||||
# Try looking to see if the same filter but with include_others
|
||||
# is being looked up.
|
||||
result = self._state_group_inflight_cache.get(
|
||||
(group, attr.evolve(db_state_filter, include_others=True))
|
||||
)
|
||||
if result is not None:
|
||||
inflight_requests.append((group, make_deferred_yieldable(result)))
|
||||
continue
|
||||
|
||||
if try_combine_inflight_requests(group, state_filter, inflight_requests):
|
||||
# succeeded in finding in-flight requests that could be combined
|
||||
# together to give all the state we need for this group.
|
||||
continue
|
||||
|
||||
inflight_cache_misses.append(group)
|
||||
|
||||
# SERVICE CACHE MISSES
|
||||
if inflight_cache_misses:
|
||||
cache_sequence_nm = self._state_group_cache.sequence
|
||||
cache_sequence_m = self._state_group_members_cache.sequence
|
||||
|
||||
async def get_state_groups_from_groups_then_add_to_cache() -> Dict[
|
||||
int, StateMap[str]
|
||||
]:
|
||||
groups_to_state_dict = await self._get_state_groups_from_groups(
|
||||
list(inflight_cache_misses), state_filter=db_state_filter
|
||||
)
|
||||
|
||||
# Now let's update the caches.
|
||||
self._insert_into_cache(
|
||||
groups_to_state_dict,
|
||||
db_state_filter,
|
||||
cache_seq_num_members=cache_sequence_m,
|
||||
cache_seq_num_non_members=cache_sequence_nm,
|
||||
)
|
||||
|
||||
return groups_to_state_dict
|
||||
|
||||
# Make a list of keys for us to store in the in-flight cache.
|
||||
# This should list all the keys that the request will pick up from
|
||||
# the database.
|
||||
keys: List[InflightStateGroupCacheKey] = []
|
||||
for group in inflight_cache_misses:
|
||||
if db_state_filter.include_others:
|
||||
# We can't properly add cache keys for all the 'other'
|
||||
# state keys that `include_others` specifies (since there are
|
||||
# an unlimited number of 'other' state keys), but we can
|
||||
# add a cache key with the exact state filter in use
|
||||
# (in addition to cache keys specifying the definite state
|
||||
# keys we are requesting).
|
||||
keys.append((group, db_state_filter))
|
||||
|
||||
for event_type, state_keys in db_state_filter.types.items():
|
||||
if state_keys is None:
|
||||
keys.append((group, event_type, None))
|
||||
else:
|
||||
for state_key in state_keys:
|
||||
keys.append((group, event_type, state_key))
|
||||
|
||||
spawned_request = self._state_group_inflight_cache.set_and_compute(
|
||||
tuple(keys), get_state_groups_from_groups_then_add_to_cache
|
||||
)
|
||||
for group in inflight_cache_misses:
|
||||
inflight_requests.append((group, spawned_request))
|
||||
|
||||
# WAIT FOR IN-FLIGHT REQUESTS TO FINISH
|
||||
for group, inflight_request in inflight_requests:
|
||||
request_result = await inflight_request
|
||||
state[group].update(request_result[group])
|
||||
|
||||
# ASSEMBLE
|
||||
# And finally update the result dict, by filtering out any extra
|
||||
# stuff we pulled out of the database.
|
||||
for group, group_state_dict in group_to_state_dict.items():
|
||||
for group in groups:
|
||||
# We just replace any existing entries, as we will have loaded
|
||||
# everything we need from the database anyway.
|
||||
state[group] = state_filter.filter_state(group_state_dict)
|
||||
state[group] = state_filter.filter_state(state[group])
|
||||
|
||||
return state
|
||||
|
||||
def _get_state_for_groups_using_cache(
|
||||
self, groups: Iterable[int], cache: DictionaryCache, state_filter: StateFilter
|
||||
) -> Tuple[Dict[int, StateMap[str]], Set[int]]:
|
||||
self,
|
||||
groups: Iterable[int],
|
||||
cache: DictionaryCache[int, StateKey],
|
||||
state_filter: StateFilter,
|
||||
) -> Tuple[Dict[int, MutableStateMap[str]], Set[int]]:
|
||||
"""Gets the state at each of a list of state groups, optionally
|
||||
filtering by type/state_key, querying from a specific cache.
|
||||
|
||||
|
||||
@@ -25,12 +25,15 @@ from typing import (
|
||||
)
|
||||
|
||||
import attr
|
||||
from frozendict import frozendict
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.events import EventBase
|
||||
from synapse.types import MutableStateMap, StateMap
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import FrozenSet # noqa: used within quoted type hint; flake8 sad
|
||||
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.databases import Databases
|
||||
|
||||
@@ -40,7 +43,7 @@ logger = logging.getLogger(__name__)
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
@attr.s(slots=True, frozen=True)
|
||||
class StateFilter:
|
||||
"""A filter used when querying for state.
|
||||
|
||||
@@ -53,14 +56,19 @@ class StateFilter:
|
||||
appear in `types`.
|
||||
"""
|
||||
|
||||
types = attr.ib(type=Dict[str, Optional[Set[str]]])
|
||||
types = attr.ib(type="frozendict[str, Optional[FrozenSet[str]]]")
|
||||
include_others = attr.ib(default=False, type=bool)
|
||||
|
||||
def __attrs_post_init__(self):
|
||||
# If `include_others` is set we canonicalise the filter by removing
|
||||
# wildcards from the types dictionary
|
||||
if self.include_others:
|
||||
self.types = {k: v for k, v in self.types.items() if v is not None}
|
||||
# REVIEW: yucky
|
||||
object.__setattr__(
|
||||
self,
|
||||
"types",
|
||||
frozendict({k: v for k, v in self.types.items() if v is not None}),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def all() -> "StateFilter":
|
||||
@@ -69,7 +77,7 @@ class StateFilter:
|
||||
Returns:
|
||||
The new state filter.
|
||||
"""
|
||||
return StateFilter(types={}, include_others=True)
|
||||
return StateFilter(types=frozendict(), include_others=True)
|
||||
|
||||
@staticmethod
|
||||
def none() -> "StateFilter":
|
||||
@@ -78,7 +86,7 @@ class StateFilter:
|
||||
Returns:
|
||||
The new state filter.
|
||||
"""
|
||||
return StateFilter(types={}, include_others=False)
|
||||
return StateFilter(types=frozendict(), include_others=False)
|
||||
|
||||
@staticmethod
|
||||
def from_types(types: Iterable[Tuple[str, Optional[str]]]) -> "StateFilter":
|
||||
@@ -103,7 +111,12 @@ class StateFilter:
|
||||
|
||||
type_dict.setdefault(typ, set()).add(s) # type: ignore
|
||||
|
||||
return StateFilter(types=type_dict)
|
||||
return StateFilter(
|
||||
types=frozendict(
|
||||
(k, frozenset(v) if v is not None else None)
|
||||
for k, v in type_dict.items()
|
||||
)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def from_lazy_load_member_list(members: Iterable[str]) -> "StateFilter":
|
||||
@@ -116,7 +129,10 @@ class StateFilter:
|
||||
Returns:
|
||||
The new state filter
|
||||
"""
|
||||
return StateFilter(types={EventTypes.Member: set(members)}, include_others=True)
|
||||
return StateFilter(
|
||||
types=frozendict({EventTypes.Member: frozenset(members)}),
|
||||
include_others=True,
|
||||
)
|
||||
|
||||
def return_expanded(self) -> "StateFilter":
|
||||
"""Creates a new StateFilter where type wild cards have been removed
|
||||
@@ -173,7 +189,7 @@ class StateFilter:
|
||||
# We want to return all non-members, but only particular
|
||||
# memberships
|
||||
return StateFilter(
|
||||
types={EventTypes.Member: self.types[EventTypes.Member]},
|
||||
types=frozendict({EventTypes.Member: self.types[EventTypes.Member]}),
|
||||
include_others=True,
|
||||
)
|
||||
|
||||
@@ -245,14 +261,15 @@ class StateFilter:
|
||||
|
||||
return len(self.concrete_types())
|
||||
|
||||
def filter_state(self, state_dict: StateMap[T]) -> StateMap[T]:
|
||||
"""Returns the state filtered with by this StateFilter
|
||||
def filter_state(self, state_dict: StateMap[T]) -> MutableStateMap[T]:
|
||||
"""Returns the state filtered with by this StateFilter.
|
||||
|
||||
Args:
|
||||
state: The state map to filter
|
||||
|
||||
Returns:
|
||||
The filtered state map
|
||||
The filtered state map.
|
||||
This is a copy, so it's safe to mutate.
|
||||
"""
|
||||
if self.is_full():
|
||||
return dict(state_dict)
|
||||
@@ -324,14 +341,16 @@ class StateFilter:
|
||||
if state_keys is None:
|
||||
member_filter = StateFilter.all()
|
||||
else:
|
||||
member_filter = StateFilter({EventTypes.Member: state_keys})
|
||||
member_filter = StateFilter(frozendict({EventTypes.Member: state_keys}))
|
||||
elif self.include_others:
|
||||
member_filter = StateFilter.all()
|
||||
else:
|
||||
member_filter = StateFilter.none()
|
||||
|
||||
non_member_filter = StateFilter(
|
||||
types={k: v for k, v in self.types.items() if k != EventTypes.Member},
|
||||
types=frozendict(
|
||||
{k: v for k, v in self.types.items() if k != EventTypes.Member}
|
||||
),
|
||||
include_others=self.include_others,
|
||||
)
|
||||
|
||||
|
||||
210
synapse/util/caches/multi_key_response_cache.py
Normal file
210
synapse/util/caches/multi_key_response_cache.py
Normal file
@@ -0,0 +1,210 @@
|
||||
# Copyright 2016 OpenMarket Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from typing import Any, Awaitable, Callable, Dict, Generic, Optional, Tuple, TypeVar
|
||||
|
||||
import attr
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.util import Clock
|
||||
from synapse.util.async_helpers import ObservableDeferred
|
||||
from synapse.util.caches import register_cache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# the type of the key in the cache
|
||||
KV = TypeVar("KV")
|
||||
|
||||
# the type of the result from the operation
|
||||
RV = TypeVar("RV")
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True)
|
||||
class MultiKeyResponseCacheContext(Generic[KV]):
|
||||
"""Information about a missed MultiKeyResponseCache hit
|
||||
|
||||
This object can be passed into the callback for additional feedback
|
||||
"""
|
||||
|
||||
cache_keys: Tuple[KV, ...]
|
||||
"""The cache key that caused the cache miss
|
||||
|
||||
This should be considered read-only.
|
||||
|
||||
TODO: in attrs 20.1, make it frozen with an on_setattr.
|
||||
"""
|
||||
|
||||
should_cache: bool = True
|
||||
"""Whether the result should be cached once the request completes.
|
||||
|
||||
This can be modified by the callback if it decides its result should not be cached.
|
||||
"""
|
||||
|
||||
|
||||
class MultiKeyResponseCache(Generic[KV, RV]):
|
||||
"""
|
||||
This caches a deferred response. Until the deferred completes it will be
|
||||
returned from the cache. This means that if the client retries the request
|
||||
while the response is still being computed, that original response will be
|
||||
used rather than trying to compute a new response.
|
||||
|
||||
Unlike the plain ResponseCache, this cache admits multiple keys to the
|
||||
deferred response.
|
||||
"""
|
||||
|
||||
def __init__(self, clock: Clock, name: str, timeout_ms: float = 0):
|
||||
# This is poorly-named: it includes both complete and incomplete results.
|
||||
# We keep complete results rather than switching to absolute values because
|
||||
# that makes it easier to cache Failure results.
|
||||
self.pending_result_cache: Dict[KV, ObservableDeferred[RV]] = {}
|
||||
|
||||
self.clock = clock
|
||||
self.timeout_sec = timeout_ms / 1000.0
|
||||
|
||||
self._name = name
|
||||
self._metrics = register_cache(
|
||||
"multikey_response_cache", name, self, resizable=False
|
||||
)
|
||||
|
||||
def size(self) -> int:
|
||||
return len(self.pending_result_cache)
|
||||
|
||||
def __len__(self) -> int:
|
||||
return self.size()
|
||||
|
||||
def get(self, key: KV) -> "Optional[defer.Deferred[RV]]":
|
||||
"""Look up the given key.
|
||||
|
||||
Returns a new Deferred (which also doesn't follow the synapse
|
||||
logcontext rules). You will probably want to make_deferred_yieldable the result.
|
||||
|
||||
If there is no entry for the key, returns None.
|
||||
|
||||
Args:
|
||||
key: key to get/set in the cache
|
||||
|
||||
Returns:
|
||||
None if there is no entry for this key; otherwise a deferred which
|
||||
resolves to the result.
|
||||
"""
|
||||
result = self.pending_result_cache.get(key)
|
||||
if result is not None:
|
||||
self._metrics.inc_hits()
|
||||
return result.observe()
|
||||
else:
|
||||
self._metrics.inc_misses()
|
||||
return None
|
||||
|
||||
def _set(
|
||||
self, context: MultiKeyResponseCacheContext[KV], deferred: "defer.Deferred[RV]"
|
||||
) -> "defer.Deferred[RV]":
|
||||
"""Set the entry for the given key to the given deferred.
|
||||
|
||||
*deferred* should run its callbacks in the sentinel logcontext (ie,
|
||||
you should wrap normal synapse deferreds with
|
||||
synapse.logging.context.run_in_background).
|
||||
|
||||
Returns a new Deferred (which also doesn't follow the synapse logcontext rules).
|
||||
You will probably want to make_deferred_yieldable the result.
|
||||
|
||||
Args:
|
||||
context: Information about the cache miss
|
||||
deferred: The deferred which resolves to the result.
|
||||
|
||||
Returns:
|
||||
A new deferred which resolves to the actual result.
|
||||
"""
|
||||
result = ObservableDeferred(deferred, consumeErrors=True)
|
||||
keys = context.cache_keys
|
||||
for key in keys:
|
||||
if key not in self.pending_result_cache:
|
||||
# we only add the key if it's not already there, since we assume
|
||||
# that we won't overtake prior entries.
|
||||
self.pending_result_cache[key] = result
|
||||
|
||||
def on_complete(r):
|
||||
# if this cache has a non-zero timeout, and the callback has not cleared
|
||||
# the should_cache bit, we leave it in the cache for now and schedule
|
||||
# its removal later.
|
||||
if self.timeout_sec and context.should_cache:
|
||||
for key in keys:
|
||||
# TODO sketch, should do this in only one call_later.
|
||||
self.clock.call_later(
|
||||
self.timeout_sec, self.pending_result_cache.pop, key, None
|
||||
)
|
||||
else:
|
||||
for key in keys:
|
||||
# otherwise, remove the result immediately.
|
||||
self.pending_result_cache.pop(key, None)
|
||||
return r
|
||||
|
||||
# make sure we do this *after* adding the entry to pending_result_cache,
|
||||
# in case the result is already complete (in which case flipping the order would
|
||||
# leave us with a stuck entry in the cache).
|
||||
result.addBoth(on_complete)
|
||||
return result.observe()
|
||||
|
||||
def set_and_compute(
|
||||
self,
|
||||
keys: Tuple[KV, ...],
|
||||
callback: Callable[..., Awaitable[RV]],
|
||||
*args: Any,
|
||||
cache_context: bool = False,
|
||||
**kwargs: Any,
|
||||
) -> "defer.Deferred[RV]":
|
||||
"""Perform a *set* call, taking care of logcontexts
|
||||
|
||||
Makes a call to *callback(*args, **kwargs)*, which should
|
||||
follow the synapse logcontext rules, and adds the result to the cache.
|
||||
|
||||
Example usage:
|
||||
|
||||
async def handle_request(request):
|
||||
# etc
|
||||
return result
|
||||
|
||||
result = await response_cache.wrap(
|
||||
key,
|
||||
handle_request,
|
||||
request,
|
||||
)
|
||||
|
||||
Args:
|
||||
keys: keys to get/set in the cache
|
||||
|
||||
callback: function to call
|
||||
|
||||
*args: positional parameters to pass to the callback, if it is used
|
||||
|
||||
cache_context: if set, the callback will be given a `cache_context` kw arg,
|
||||
which will be a ResponseCacheContext object.
|
||||
|
||||
**kwargs: named parameters to pass to the callback, if it is used
|
||||
|
||||
Returns:
|
||||
The result of the callback (from the cache, or otherwise)
|
||||
"""
|
||||
|
||||
# TODO sketch logger.debug(
|
||||
# "[%s]: no cached result for [%s], calculating new one", self._name, key
|
||||
# )
|
||||
context = MultiKeyResponseCacheContext(cache_keys=keys)
|
||||
if cache_context:
|
||||
kwargs["cache_context"] = context
|
||||
d = run_in_background(callback, *args, **kwargs)
|
||||
result = self._set(context, d)
|
||||
|
||||
return make_deferred_yieldable(result)
|
||||
@@ -14,6 +14,8 @@
|
||||
|
||||
import logging
|
||||
|
||||
from frozendict import frozendict
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.room_versions import RoomVersions
|
||||
from synapse.storage.state import StateFilter
|
||||
@@ -183,7 +185,9 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.storage.state.get_state_for_event(
|
||||
e5.event_id,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: {self.u_alice.to_string()}},
|
||||
types=frozendict(
|
||||
{EventTypes.Member: frozenset({self.u_alice.to_string()})}
|
||||
),
|
||||
include_others=True,
|
||||
),
|
||||
)
|
||||
@@ -203,7 +207,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.storage.state.get_state_for_event(
|
||||
e5.event_id,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: set()}, include_others=True
|
||||
types=frozendict({EventTypes.Member: frozenset()}),
|
||||
include_others=True,
|
||||
),
|
||||
)
|
||||
)
|
||||
@@ -228,7 +233,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: set()}, include_others=True
|
||||
types=frozendict({EventTypes.Member: frozenset()}), include_others=True
|
||||
),
|
||||
)
|
||||
|
||||
@@ -245,7 +250,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_members_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: set()}, include_others=True
|
||||
types=frozendict({EventTypes.Member: frozenset()}), include_others=True
|
||||
),
|
||||
)
|
||||
|
||||
@@ -258,7 +263,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: None}, include_others=True
|
||||
types=frozendict({EventTypes.Member: None}), include_others=True
|
||||
),
|
||||
)
|
||||
|
||||
@@ -275,7 +280,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_members_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: None}, include_others=True
|
||||
types=frozendict({EventTypes.Member: None}), include_others=True
|
||||
),
|
||||
)
|
||||
|
||||
@@ -295,7 +300,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: {e5.state_key}}, include_others=True
|
||||
types=frozendict({EventTypes.Member: frozenset({e5.state_key})}),
|
||||
include_others=True,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -312,7 +318,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_members_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: {e5.state_key}}, include_others=True
|
||||
types=frozendict({EventTypes.Member: frozenset({e5.state_key})}),
|
||||
include_others=True,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -325,7 +332,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_members_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: {e5.state_key}}, include_others=False
|
||||
types=frozendict({EventTypes.Member: frozenset({e5.state_key})}),
|
||||
include_others=False,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -375,7 +383,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: set()}, include_others=True
|
||||
types=frozendict({EventTypes.Member: frozenset()}), include_others=True
|
||||
),
|
||||
)
|
||||
|
||||
@@ -387,7 +395,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_members_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: set()}, include_others=True
|
||||
types=frozendict({EventTypes.Member: frozenset()}), include_others=True
|
||||
),
|
||||
)
|
||||
|
||||
@@ -400,7 +408,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: None}, include_others=True
|
||||
types=frozendict({EventTypes.Member: None}), include_others=True
|
||||
),
|
||||
)
|
||||
|
||||
@@ -411,7 +419,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_members_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: None}, include_others=True
|
||||
types=frozendict({EventTypes.Member: None}), include_others=True
|
||||
),
|
||||
)
|
||||
|
||||
@@ -430,7 +438,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: {e5.state_key}}, include_others=True
|
||||
types=frozendict({EventTypes.Member: frozenset({e5.state_key})}),
|
||||
include_others=True,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -441,7 +450,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_members_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: {e5.state_key}}, include_others=True
|
||||
types=frozendict({EventTypes.Member: frozenset({e5.state_key})}),
|
||||
include_others=True,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -454,7 +464,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: {e5.state_key}}, include_others=False
|
||||
types=frozendict({EventTypes.Member: frozenset({e5.state_key})}),
|
||||
include_others=False,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -465,7 +476,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_members_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: {e5.state_key}}, include_others=False
|
||||
types=frozendict({EventTypes.Member: frozenset({e5.state_key})}),
|
||||
include_others=False,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user