mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Compare commits
8 Commits
dmr/storag
...
rei/gsgfg2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
20fcc711e8 | ||
|
|
9986edba00 | ||
|
|
699f2197e3 | ||
|
|
115970d0d7 | ||
|
|
fda00e102b | ||
|
|
967427c1b7 | ||
|
|
92253361c4 | ||
|
|
471266d0fd |
1
changelog.d/10681.misc
Normal file
1
changelog.d/10681.misc
Normal file
@@ -0,0 +1 @@
|
|||||||
|
Deduplicate requests in `_get_state_for_groups`.
|
||||||
1
mypy.ini
1
mypy.ini
@@ -96,6 +96,7 @@ files =
|
|||||||
tests/handlers/test_sync.py,
|
tests/handlers/test_sync.py,
|
||||||
tests/rest/client/test_login.py,
|
tests/rest/client/test_login.py,
|
||||||
tests/rest/client/test_auth.py,
|
tests/rest/client/test_auth.py,
|
||||||
|
tests/storage/test_state.py,
|
||||||
tests/util/test_itertools.py,
|
tests/util/test_itertools.py,
|
||||||
tests/util/test_stream_change_cache.py
|
tests/util/test_stream_change_cache.py
|
||||||
|
|
||||||
|
|||||||
@@ -14,21 +14,31 @@
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from typing import Dict, Iterable, List, Optional, Set, Tuple
|
from typing import Dict, FrozenSet, Iterable, List, Optional, Set, Tuple, Union
|
||||||
|
|
||||||
|
import attr
|
||||||
|
|
||||||
|
from twisted.internet.defer import Deferred
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
|
from synapse.logging.context import make_deferred_yieldable
|
||||||
from synapse.storage._base import SQLBaseStore
|
from synapse.storage._base import SQLBaseStore
|
||||||
from synapse.storage.database import DatabasePool
|
from synapse.storage.database import DatabasePool
|
||||||
from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStore
|
from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStore
|
||||||
from synapse.storage.state import StateFilter
|
from synapse.storage.state import StateFilter
|
||||||
from synapse.storage.types import Cursor
|
from synapse.storage.types import Cursor
|
||||||
from synapse.storage.util.sequence import build_sequence_generator
|
from synapse.storage.util.sequence import build_sequence_generator
|
||||||
from synapse.types import MutableStateMap, StateMap
|
from synapse.types import MutableStateMap, StateKey, StateMap
|
||||||
from synapse.util.caches.descriptors import cached
|
from synapse.util.caches.descriptors import cached
|
||||||
from synapse.util.caches.dictionary_cache import DictionaryCache
|
from synapse.util.caches.dictionary_cache import DictionaryCache
|
||||||
|
from synapse.util.caches.multi_key_response_cache import MultiKeyResponseCache
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
InflightStateGroupCacheKey = Union[
|
||||||
|
Tuple[int, StateFilter], Tuple[int, str, Optional[str]]
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
MAX_STATE_DELTA_HOPS = 100
|
MAX_STATE_DELTA_HOPS = 100
|
||||||
|
|
||||||
@@ -81,16 +91,29 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
|||||||
# We size the non-members cache to be smaller than the members cache as the
|
# We size the non-members cache to be smaller than the members cache as the
|
||||||
# vast majority of state in Matrix (today) is member events.
|
# vast majority of state in Matrix (today) is member events.
|
||||||
|
|
||||||
self._state_group_cache = DictionaryCache(
|
self._state_group_cache: DictionaryCache[int, StateKey] = DictionaryCache(
|
||||||
"*stateGroupCache*",
|
"*stateGroupCache*",
|
||||||
# TODO: this hasn't been tuned yet
|
# TODO: this hasn't been tuned yet
|
||||||
50000,
|
50000,
|
||||||
)
|
)
|
||||||
self._state_group_members_cache = DictionaryCache(
|
self._state_group_members_cache: DictionaryCache[
|
||||||
|
int, StateKey
|
||||||
|
] = DictionaryCache(
|
||||||
"*stateGroupMembersCache*",
|
"*stateGroupMembersCache*",
|
||||||
500000,
|
500000,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self._state_group_inflight_cache: MultiKeyResponseCache[
|
||||||
|
InflightStateGroupCacheKey, Dict[int, StateMap[str]]
|
||||||
|
] = MultiKeyResponseCache(
|
||||||
|
self.hs.get_clock(),
|
||||||
|
"*stateGroupInflightCache*",
|
||||||
|
# As the results from this transaction immediately go into the
|
||||||
|
# immediate caches _state_group_cache and _state_group_members_cache,
|
||||||
|
# we do not keep them in the in-flight cache when done.
|
||||||
|
timeout_ms=0,
|
||||||
|
)
|
||||||
|
|
||||||
def get_max_state_group_txn(txn: Cursor):
|
def get_max_state_group_txn(txn: Cursor):
|
||||||
txn.execute("SELECT COALESCE(max(id), 0) FROM state_groups")
|
txn.execute("SELECT COALESCE(max(id), 0) FROM state_groups")
|
||||||
return txn.fetchone()[0]
|
return txn.fetchone()[0]
|
||||||
@@ -168,13 +191,18 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
|||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
def _get_state_for_group_using_cache(self, cache, group, state_filter):
|
def _get_state_for_group_using_cache(
|
||||||
|
self,
|
||||||
|
cache: DictionaryCache[int, StateKey],
|
||||||
|
group: int,
|
||||||
|
state_filter: StateFilter,
|
||||||
|
) -> Tuple[MutableStateMap[str], bool]:
|
||||||
"""Checks if group is in cache. See `_get_state_for_groups`
|
"""Checks if group is in cache. See `_get_state_for_groups`
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
cache(DictionaryCache): the state group cache to use
|
cache: the state group cache to use
|
||||||
group(int): The state group to lookup
|
group: The state group to lookup
|
||||||
state_filter (StateFilter): The state filter used to fetch state
|
state_filter: The state filter used to fetch state
|
||||||
from the database.
|
from the database.
|
||||||
|
|
||||||
Returns 2-tuple (`state_dict`, `got_all`).
|
Returns 2-tuple (`state_dict`, `got_all`).
|
||||||
@@ -212,7 +240,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
|||||||
self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
|
self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
|
||||||
) -> Dict[int, MutableStateMap[str]]:
|
) -> Dict[int, MutableStateMap[str]]:
|
||||||
"""Gets the state at each of a list of state groups, optionally
|
"""Gets the state at each of a list of state groups, optionally
|
||||||
filtering by type/state_key
|
filtering by type/state_key.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
groups: list of state groups for which we want
|
groups: list of state groups for which we want
|
||||||
@@ -221,11 +249,83 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
|||||||
from the database.
|
from the database.
|
||||||
Returns:
|
Returns:
|
||||||
Dict of state group to state map.
|
Dict of state group to state map.
|
||||||
"""
|
|
||||||
state_filter = state_filter or StateFilter.all()
|
|
||||||
|
|
||||||
|
|
||||||
|
The flow for this function looks as follows:
|
||||||
|
|
||||||
|
* Query the immediate caches (self._state_group_cache,
|
||||||
|
| self._state_group_members_cache).
|
||||||
|
NONSTOP |
|
||||||
|
|
|
||||||
|
* Query the in-flight cache (self._state_group_inflight_cache)
|
||||||
|
| for immediate-cache misses.
|
||||||
|
NONSTOP |
|
||||||
|
|
|
||||||
|
* Service cache misses:
|
||||||
|
| - Expand the state filter (to help cache hit ratio).
|
||||||
|
| - Start a new transaction to fetch outstanding groups.
|
||||||
|
| - Register entries in the in-flight cache for this transaction.
|
||||||
|
| - (When the transaction is finished) Register entries in
|
||||||
|
| the immediate caches.
|
||||||
|
|
|
||||||
|
* Wait for in-flight requests to finish...
|
||||||
|
|
|
||||||
|
* Assemble everything together and filter out anything we didn't
|
||||||
|
ask for.
|
||||||
|
|
||||||
|
The sections marked NONSTOP must not contain any `await`s, otherwise
|
||||||
|
race conditions could occur and the cache could be made less effective.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def try_combine_inflight_requests(
|
||||||
|
group: int,
|
||||||
|
state_filter: StateFilter,
|
||||||
|
mut_inflight_requests: "List[Tuple[int, Deferred[Dict[int, StateMap[str]]]]]",
|
||||||
|
) -> bool:
|
||||||
|
"""
|
||||||
|
Tries to collect existing in-flight requests that would give us all
|
||||||
|
the desired state for the given group.
|
||||||
|
|
||||||
|
Returns true if successful, or false if not.
|
||||||
|
If successful, adds more in-flight requests to the `mut_inflight_requests` list.
|
||||||
|
"""
|
||||||
|
original_inflight_requests = len(mut_inflight_requests)
|
||||||
|
for event_type, state_keys in state_filter.types.items():
|
||||||
|
# First see if any requests are looking up ALL state keys for this
|
||||||
|
# event type.
|
||||||
|
result = self._state_group_inflight_cache.get((group, event_type, None))
|
||||||
|
if result is not None:
|
||||||
|
inflight_requests.append((group, make_deferred_yieldable(result)))
|
||||||
|
continue
|
||||||
|
|
||||||
|
if state_keys is None:
|
||||||
|
# We want all state keys, but there isn't a request in-flight
|
||||||
|
# wanting them all, so we have to give up here.
|
||||||
|
del mut_inflight_requests[original_inflight_requests:]
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
# If we are only interested in certain state keys,
|
||||||
|
# we can see if other in-flight requests would manage to
|
||||||
|
# give us all the wanted state keys.
|
||||||
|
for state_key in state_keys:
|
||||||
|
result = self._state_group_inflight_cache.get(
|
||||||
|
(group, event_type, state_key)
|
||||||
|
)
|
||||||
|
if result is None:
|
||||||
|
# There isn't an in-flight request already requesting
|
||||||
|
# this, so give up here.
|
||||||
|
del mut_inflight_requests[original_inflight_requests:]
|
||||||
|
return False
|
||||||
|
|
||||||
|
inflight_requests.append(
|
||||||
|
(group, make_deferred_yieldable(result))
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
|
||||||
|
state_filter = state_filter or StateFilter.all()
|
||||||
member_filter, non_member_filter = state_filter.get_member_split()
|
member_filter, non_member_filter = state_filter.get_member_split()
|
||||||
|
|
||||||
|
# QUERY THE IMMEDIATE CACHES
|
||||||
# Now we look them up in the member and non-member caches
|
# Now we look them up in the member and non-member caches
|
||||||
(
|
(
|
||||||
non_member_state,
|
non_member_state,
|
||||||
@@ -242,43 +342,131 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
|||||||
for group in groups:
|
for group in groups:
|
||||||
state[group].update(member_state[group])
|
state[group].update(member_state[group])
|
||||||
|
|
||||||
# Now fetch any missing groups from the database
|
|
||||||
|
|
||||||
incomplete_groups = incomplete_groups_m | incomplete_groups_nm
|
incomplete_groups = incomplete_groups_m | incomplete_groups_nm
|
||||||
|
|
||||||
if not incomplete_groups:
|
if not incomplete_groups:
|
||||||
return state
|
return state
|
||||||
|
|
||||||
cache_sequence_nm = self._state_group_cache.sequence
|
# QUERY THE IN-FLIGHT CACHE
|
||||||
cache_sequence_m = self._state_group_members_cache.sequence
|
# list (group ID -> Deferred that will contain a result for that group)
|
||||||
|
inflight_requests: List[Tuple[int, Deferred[Dict[int, StateMap[str]]]]] = []
|
||||||
|
inflight_cache_misses: List[int] = []
|
||||||
|
|
||||||
# Help the cache hit ratio by expanding the filter a bit
|
# When we get around to requesting state from the database, we help the
|
||||||
|
# cache hit ratio by expanding the filter a bit.
|
||||||
|
# However, we need to know this now so that we can properly query the
|
||||||
|
# in-flight cache where include_others is concerned.
|
||||||
db_state_filter = state_filter.return_expanded()
|
db_state_filter = state_filter.return_expanded()
|
||||||
|
|
||||||
group_to_state_dict = await self._get_state_groups_from_groups(
|
for group in incomplete_groups:
|
||||||
list(incomplete_groups), state_filter=db_state_filter
|
event_type: str
|
||||||
)
|
state_keys: Optional[FrozenSet[str]]
|
||||||
|
|
||||||
# Now lets update the caches
|
# First check if our exact state filter is being looked up.
|
||||||
self._insert_into_cache(
|
result = self._state_group_inflight_cache.get((group, db_state_filter))
|
||||||
group_to_state_dict,
|
if result is not None:
|
||||||
db_state_filter,
|
inflight_requests.append((group, make_deferred_yieldable(result)))
|
||||||
cache_seq_num_members=cache_sequence_m,
|
continue
|
||||||
cache_seq_num_non_members=cache_sequence_nm,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
# Then check if the universal state filter is being looked up.
|
||||||
|
result = self._state_group_inflight_cache.get((group, StateFilter.all()))
|
||||||
|
if result is not None:
|
||||||
|
inflight_requests.append((group, make_deferred_yieldable(result)))
|
||||||
|
continue
|
||||||
|
|
||||||
|
if state_filter.include_others:
|
||||||
|
# if the state filter includes others, we only match against the
|
||||||
|
# state filter directly, so we give up here.
|
||||||
|
# This is because it's too complex to cache this case properly.
|
||||||
|
inflight_cache_misses.append(group)
|
||||||
|
continue
|
||||||
|
elif not db_state_filter.include_others:
|
||||||
|
# Try looking to see if the same filter but with include_others
|
||||||
|
# is being looked up.
|
||||||
|
result = self._state_group_inflight_cache.get(
|
||||||
|
(group, attr.evolve(db_state_filter, include_others=True))
|
||||||
|
)
|
||||||
|
if result is not None:
|
||||||
|
inflight_requests.append((group, make_deferred_yieldable(result)))
|
||||||
|
continue
|
||||||
|
|
||||||
|
if try_combine_inflight_requests(group, state_filter, inflight_requests):
|
||||||
|
# succeeded in finding in-flight requests that could be combined
|
||||||
|
# together to give all the state we need for this group.
|
||||||
|
continue
|
||||||
|
|
||||||
|
inflight_cache_misses.append(group)
|
||||||
|
|
||||||
|
# SERVICE CACHE MISSES
|
||||||
|
if inflight_cache_misses:
|
||||||
|
cache_sequence_nm = self._state_group_cache.sequence
|
||||||
|
cache_sequence_m = self._state_group_members_cache.sequence
|
||||||
|
|
||||||
|
async def get_state_groups_from_groups_then_add_to_cache() -> Dict[
|
||||||
|
int, StateMap[str]
|
||||||
|
]:
|
||||||
|
groups_to_state_dict = await self._get_state_groups_from_groups(
|
||||||
|
list(inflight_cache_misses), state_filter=db_state_filter
|
||||||
|
)
|
||||||
|
|
||||||
|
# Now let's update the caches.
|
||||||
|
self._insert_into_cache(
|
||||||
|
groups_to_state_dict,
|
||||||
|
db_state_filter,
|
||||||
|
cache_seq_num_members=cache_sequence_m,
|
||||||
|
cache_seq_num_non_members=cache_sequence_nm,
|
||||||
|
)
|
||||||
|
|
||||||
|
return groups_to_state_dict
|
||||||
|
|
||||||
|
# Make a list of keys for us to store in the in-flight cache.
|
||||||
|
# This should list all the keys that the request will pick up from
|
||||||
|
# the database.
|
||||||
|
keys: List[InflightStateGroupCacheKey] = []
|
||||||
|
for group in inflight_cache_misses:
|
||||||
|
if db_state_filter.include_others:
|
||||||
|
# We can't properly add cache keys for all the 'other'
|
||||||
|
# state keys that `include_others` specifies (since there are
|
||||||
|
# an unlimited number of 'other' state keys), but we can
|
||||||
|
# add a cache key with the exact state filter in use
|
||||||
|
# (in addition to cache keys specifying the definite state
|
||||||
|
# keys we are requesting).
|
||||||
|
keys.append((group, db_state_filter))
|
||||||
|
|
||||||
|
for event_type, state_keys in db_state_filter.types.items():
|
||||||
|
if state_keys is None:
|
||||||
|
keys.append((group, event_type, None))
|
||||||
|
else:
|
||||||
|
for state_key in state_keys:
|
||||||
|
keys.append((group, event_type, state_key))
|
||||||
|
|
||||||
|
spawned_request = self._state_group_inflight_cache.set_and_compute(
|
||||||
|
tuple(keys), get_state_groups_from_groups_then_add_to_cache
|
||||||
|
)
|
||||||
|
for group in inflight_cache_misses:
|
||||||
|
inflight_requests.append((group, spawned_request))
|
||||||
|
|
||||||
|
# WAIT FOR IN-FLIGHT REQUESTS TO FINISH
|
||||||
|
for group, inflight_request in inflight_requests:
|
||||||
|
request_result = await inflight_request
|
||||||
|
state[group].update(request_result[group])
|
||||||
|
|
||||||
|
# ASSEMBLE
|
||||||
# And finally update the result dict, by filtering out any extra
|
# And finally update the result dict, by filtering out any extra
|
||||||
# stuff we pulled out of the database.
|
# stuff we pulled out of the database.
|
||||||
for group, group_state_dict in group_to_state_dict.items():
|
for group in groups:
|
||||||
# We just replace any existing entries, as we will have loaded
|
# We just replace any existing entries, as we will have loaded
|
||||||
# everything we need from the database anyway.
|
# everything we need from the database anyway.
|
||||||
state[group] = state_filter.filter_state(group_state_dict)
|
state[group] = state_filter.filter_state(state[group])
|
||||||
|
|
||||||
return state
|
return state
|
||||||
|
|
||||||
def _get_state_for_groups_using_cache(
|
def _get_state_for_groups_using_cache(
|
||||||
self, groups: Iterable[int], cache: DictionaryCache, state_filter: StateFilter
|
self,
|
||||||
) -> Tuple[Dict[int, StateMap[str]], Set[int]]:
|
groups: Iterable[int],
|
||||||
|
cache: DictionaryCache[int, StateKey],
|
||||||
|
state_filter: StateFilter,
|
||||||
|
) -> Tuple[Dict[int, MutableStateMap[str]], Set[int]]:
|
||||||
"""Gets the state at each of a list of state groups, optionally
|
"""Gets the state at each of a list of state groups, optionally
|
||||||
filtering by type/state_key, querying from a specific cache.
|
filtering by type/state_key, querying from a specific cache.
|
||||||
|
|
||||||
|
|||||||
@@ -25,12 +25,15 @@ from typing import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
from frozendict import frozendict
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
from synapse.types import MutableStateMap, StateMap
|
from synapse.types import MutableStateMap, StateMap
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
from typing import FrozenSet # noqa: used within quoted type hint; flake8 sad
|
||||||
|
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.storage.databases import Databases
|
from synapse.storage.databases import Databases
|
||||||
|
|
||||||
@@ -40,7 +43,7 @@ logger = logging.getLogger(__name__)
|
|||||||
T = TypeVar("T")
|
T = TypeVar("T")
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True)
|
@attr.s(slots=True, frozen=True)
|
||||||
class StateFilter:
|
class StateFilter:
|
||||||
"""A filter used when querying for state.
|
"""A filter used when querying for state.
|
||||||
|
|
||||||
@@ -53,14 +56,19 @@ class StateFilter:
|
|||||||
appear in `types`.
|
appear in `types`.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
types = attr.ib(type=Dict[str, Optional[Set[str]]])
|
types = attr.ib(type="frozendict[str, Optional[FrozenSet[str]]]")
|
||||||
include_others = attr.ib(default=False, type=bool)
|
include_others = attr.ib(default=False, type=bool)
|
||||||
|
|
||||||
def __attrs_post_init__(self):
|
def __attrs_post_init__(self):
|
||||||
# If `include_others` is set we canonicalise the filter by removing
|
# If `include_others` is set we canonicalise the filter by removing
|
||||||
# wildcards from the types dictionary
|
# wildcards from the types dictionary
|
||||||
if self.include_others:
|
if self.include_others:
|
||||||
self.types = {k: v for k, v in self.types.items() if v is not None}
|
# REVIEW: yucky
|
||||||
|
object.__setattr__(
|
||||||
|
self,
|
||||||
|
"types",
|
||||||
|
frozendict({k: v for k, v in self.types.items() if v is not None}),
|
||||||
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def all() -> "StateFilter":
|
def all() -> "StateFilter":
|
||||||
@@ -69,7 +77,7 @@ class StateFilter:
|
|||||||
Returns:
|
Returns:
|
||||||
The new state filter.
|
The new state filter.
|
||||||
"""
|
"""
|
||||||
return StateFilter(types={}, include_others=True)
|
return StateFilter(types=frozendict(), include_others=True)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def none() -> "StateFilter":
|
def none() -> "StateFilter":
|
||||||
@@ -78,7 +86,7 @@ class StateFilter:
|
|||||||
Returns:
|
Returns:
|
||||||
The new state filter.
|
The new state filter.
|
||||||
"""
|
"""
|
||||||
return StateFilter(types={}, include_others=False)
|
return StateFilter(types=frozendict(), include_others=False)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def from_types(types: Iterable[Tuple[str, Optional[str]]]) -> "StateFilter":
|
def from_types(types: Iterable[Tuple[str, Optional[str]]]) -> "StateFilter":
|
||||||
@@ -103,7 +111,12 @@ class StateFilter:
|
|||||||
|
|
||||||
type_dict.setdefault(typ, set()).add(s) # type: ignore
|
type_dict.setdefault(typ, set()).add(s) # type: ignore
|
||||||
|
|
||||||
return StateFilter(types=type_dict)
|
return StateFilter(
|
||||||
|
types=frozendict(
|
||||||
|
(k, frozenset(v) if v is not None else None)
|
||||||
|
for k, v in type_dict.items()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def from_lazy_load_member_list(members: Iterable[str]) -> "StateFilter":
|
def from_lazy_load_member_list(members: Iterable[str]) -> "StateFilter":
|
||||||
@@ -116,7 +129,10 @@ class StateFilter:
|
|||||||
Returns:
|
Returns:
|
||||||
The new state filter
|
The new state filter
|
||||||
"""
|
"""
|
||||||
return StateFilter(types={EventTypes.Member: set(members)}, include_others=True)
|
return StateFilter(
|
||||||
|
types=frozendict({EventTypes.Member: frozenset(members)}),
|
||||||
|
include_others=True,
|
||||||
|
)
|
||||||
|
|
||||||
def return_expanded(self) -> "StateFilter":
|
def return_expanded(self) -> "StateFilter":
|
||||||
"""Creates a new StateFilter where type wild cards have been removed
|
"""Creates a new StateFilter where type wild cards have been removed
|
||||||
@@ -173,7 +189,7 @@ class StateFilter:
|
|||||||
# We want to return all non-members, but only particular
|
# We want to return all non-members, but only particular
|
||||||
# memberships
|
# memberships
|
||||||
return StateFilter(
|
return StateFilter(
|
||||||
types={EventTypes.Member: self.types[EventTypes.Member]},
|
types=frozendict({EventTypes.Member: self.types[EventTypes.Member]}),
|
||||||
include_others=True,
|
include_others=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -245,14 +261,15 @@ class StateFilter:
|
|||||||
|
|
||||||
return len(self.concrete_types())
|
return len(self.concrete_types())
|
||||||
|
|
||||||
def filter_state(self, state_dict: StateMap[T]) -> StateMap[T]:
|
def filter_state(self, state_dict: StateMap[T]) -> MutableStateMap[T]:
|
||||||
"""Returns the state filtered with by this StateFilter
|
"""Returns the state filtered with by this StateFilter.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
state: The state map to filter
|
state: The state map to filter
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The filtered state map
|
The filtered state map.
|
||||||
|
This is a copy, so it's safe to mutate.
|
||||||
"""
|
"""
|
||||||
if self.is_full():
|
if self.is_full():
|
||||||
return dict(state_dict)
|
return dict(state_dict)
|
||||||
@@ -324,14 +341,16 @@ class StateFilter:
|
|||||||
if state_keys is None:
|
if state_keys is None:
|
||||||
member_filter = StateFilter.all()
|
member_filter = StateFilter.all()
|
||||||
else:
|
else:
|
||||||
member_filter = StateFilter({EventTypes.Member: state_keys})
|
member_filter = StateFilter(frozendict({EventTypes.Member: state_keys}))
|
||||||
elif self.include_others:
|
elif self.include_others:
|
||||||
member_filter = StateFilter.all()
|
member_filter = StateFilter.all()
|
||||||
else:
|
else:
|
||||||
member_filter = StateFilter.none()
|
member_filter = StateFilter.none()
|
||||||
|
|
||||||
non_member_filter = StateFilter(
|
non_member_filter = StateFilter(
|
||||||
types={k: v for k, v in self.types.items() if k != EventTypes.Member},
|
types=frozendict(
|
||||||
|
{k: v for k, v in self.types.items() if k != EventTypes.Member}
|
||||||
|
),
|
||||||
include_others=self.include_others,
|
include_others=self.include_others,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
210
synapse/util/caches/multi_key_response_cache.py
Normal file
210
synapse/util/caches/multi_key_response_cache.py
Normal file
@@ -0,0 +1,210 @@
|
|||||||
|
# Copyright 2016 OpenMarket Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
import logging
|
||||||
|
from typing import Any, Awaitable, Callable, Dict, Generic, Optional, Tuple, TypeVar
|
||||||
|
|
||||||
|
import attr
|
||||||
|
|
||||||
|
from twisted.internet import defer
|
||||||
|
|
||||||
|
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||||
|
from synapse.util import Clock
|
||||||
|
from synapse.util.async_helpers import ObservableDeferred
|
||||||
|
from synapse.util.caches import register_cache
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# the type of the key in the cache
|
||||||
|
KV = TypeVar("KV")
|
||||||
|
|
||||||
|
# the type of the result from the operation
|
||||||
|
RV = TypeVar("RV")
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s(auto_attribs=True)
|
||||||
|
class MultiKeyResponseCacheContext(Generic[KV]):
|
||||||
|
"""Information about a missed MultiKeyResponseCache hit
|
||||||
|
|
||||||
|
This object can be passed into the callback for additional feedback
|
||||||
|
"""
|
||||||
|
|
||||||
|
cache_keys: Tuple[KV, ...]
|
||||||
|
"""The cache key that caused the cache miss
|
||||||
|
|
||||||
|
This should be considered read-only.
|
||||||
|
|
||||||
|
TODO: in attrs 20.1, make it frozen with an on_setattr.
|
||||||
|
"""
|
||||||
|
|
||||||
|
should_cache: bool = True
|
||||||
|
"""Whether the result should be cached once the request completes.
|
||||||
|
|
||||||
|
This can be modified by the callback if it decides its result should not be cached.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class MultiKeyResponseCache(Generic[KV, RV]):
|
||||||
|
"""
|
||||||
|
This caches a deferred response. Until the deferred completes it will be
|
||||||
|
returned from the cache. This means that if the client retries the request
|
||||||
|
while the response is still being computed, that original response will be
|
||||||
|
used rather than trying to compute a new response.
|
||||||
|
|
||||||
|
Unlike the plain ResponseCache, this cache admits multiple keys to the
|
||||||
|
deferred response.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, clock: Clock, name: str, timeout_ms: float = 0):
|
||||||
|
# This is poorly-named: it includes both complete and incomplete results.
|
||||||
|
# We keep complete results rather than switching to absolute values because
|
||||||
|
# that makes it easier to cache Failure results.
|
||||||
|
self.pending_result_cache: Dict[KV, ObservableDeferred[RV]] = {}
|
||||||
|
|
||||||
|
self.clock = clock
|
||||||
|
self.timeout_sec = timeout_ms / 1000.0
|
||||||
|
|
||||||
|
self._name = name
|
||||||
|
self._metrics = register_cache(
|
||||||
|
"multikey_response_cache", name, self, resizable=False
|
||||||
|
)
|
||||||
|
|
||||||
|
def size(self) -> int:
|
||||||
|
return len(self.pending_result_cache)
|
||||||
|
|
||||||
|
def __len__(self) -> int:
|
||||||
|
return self.size()
|
||||||
|
|
||||||
|
def get(self, key: KV) -> "Optional[defer.Deferred[RV]]":
|
||||||
|
"""Look up the given key.
|
||||||
|
|
||||||
|
Returns a new Deferred (which also doesn't follow the synapse
|
||||||
|
logcontext rules). You will probably want to make_deferred_yieldable the result.
|
||||||
|
|
||||||
|
If there is no entry for the key, returns None.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key: key to get/set in the cache
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
None if there is no entry for this key; otherwise a deferred which
|
||||||
|
resolves to the result.
|
||||||
|
"""
|
||||||
|
result = self.pending_result_cache.get(key)
|
||||||
|
if result is not None:
|
||||||
|
self._metrics.inc_hits()
|
||||||
|
return result.observe()
|
||||||
|
else:
|
||||||
|
self._metrics.inc_misses()
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _set(
|
||||||
|
self, context: MultiKeyResponseCacheContext[KV], deferred: "defer.Deferred[RV]"
|
||||||
|
) -> "defer.Deferred[RV]":
|
||||||
|
"""Set the entry for the given key to the given deferred.
|
||||||
|
|
||||||
|
*deferred* should run its callbacks in the sentinel logcontext (ie,
|
||||||
|
you should wrap normal synapse deferreds with
|
||||||
|
synapse.logging.context.run_in_background).
|
||||||
|
|
||||||
|
Returns a new Deferred (which also doesn't follow the synapse logcontext rules).
|
||||||
|
You will probably want to make_deferred_yieldable the result.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
context: Information about the cache miss
|
||||||
|
deferred: The deferred which resolves to the result.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A new deferred which resolves to the actual result.
|
||||||
|
"""
|
||||||
|
result = ObservableDeferred(deferred, consumeErrors=True)
|
||||||
|
keys = context.cache_keys
|
||||||
|
for key in keys:
|
||||||
|
if key not in self.pending_result_cache:
|
||||||
|
# we only add the key if it's not already there, since we assume
|
||||||
|
# that we won't overtake prior entries.
|
||||||
|
self.pending_result_cache[key] = result
|
||||||
|
|
||||||
|
def on_complete(r):
|
||||||
|
# if this cache has a non-zero timeout, and the callback has not cleared
|
||||||
|
# the should_cache bit, we leave it in the cache for now and schedule
|
||||||
|
# its removal later.
|
||||||
|
if self.timeout_sec and context.should_cache:
|
||||||
|
for key in keys:
|
||||||
|
# TODO sketch, should do this in only one call_later.
|
||||||
|
self.clock.call_later(
|
||||||
|
self.timeout_sec, self.pending_result_cache.pop, key, None
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
for key in keys:
|
||||||
|
# otherwise, remove the result immediately.
|
||||||
|
self.pending_result_cache.pop(key, None)
|
||||||
|
return r
|
||||||
|
|
||||||
|
# make sure we do this *after* adding the entry to pending_result_cache,
|
||||||
|
# in case the result is already complete (in which case flipping the order would
|
||||||
|
# leave us with a stuck entry in the cache).
|
||||||
|
result.addBoth(on_complete)
|
||||||
|
return result.observe()
|
||||||
|
|
||||||
|
def set_and_compute(
|
||||||
|
self,
|
||||||
|
keys: Tuple[KV, ...],
|
||||||
|
callback: Callable[..., Awaitable[RV]],
|
||||||
|
*args: Any,
|
||||||
|
cache_context: bool = False,
|
||||||
|
**kwargs: Any,
|
||||||
|
) -> "defer.Deferred[RV]":
|
||||||
|
"""Perform a *set* call, taking care of logcontexts
|
||||||
|
|
||||||
|
Makes a call to *callback(*args, **kwargs)*, which should
|
||||||
|
follow the synapse logcontext rules, and adds the result to the cache.
|
||||||
|
|
||||||
|
Example usage:
|
||||||
|
|
||||||
|
async def handle_request(request):
|
||||||
|
# etc
|
||||||
|
return result
|
||||||
|
|
||||||
|
result = await response_cache.wrap(
|
||||||
|
key,
|
||||||
|
handle_request,
|
||||||
|
request,
|
||||||
|
)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
keys: keys to get/set in the cache
|
||||||
|
|
||||||
|
callback: function to call
|
||||||
|
|
||||||
|
*args: positional parameters to pass to the callback, if it is used
|
||||||
|
|
||||||
|
cache_context: if set, the callback will be given a `cache_context` kw arg,
|
||||||
|
which will be a ResponseCacheContext object.
|
||||||
|
|
||||||
|
**kwargs: named parameters to pass to the callback, if it is used
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The result of the callback (from the cache, or otherwise)
|
||||||
|
"""
|
||||||
|
|
||||||
|
# TODO sketch logger.debug(
|
||||||
|
# "[%s]: no cached result for [%s], calculating new one", self._name, key
|
||||||
|
# )
|
||||||
|
context = MultiKeyResponseCacheContext(cache_keys=keys)
|
||||||
|
if cache_context:
|
||||||
|
kwargs["cache_context"] = context
|
||||||
|
d = run_in_background(callback, *args, **kwargs)
|
||||||
|
result = self._set(context, d)
|
||||||
|
|
||||||
|
return make_deferred_yieldable(result)
|
||||||
@@ -14,6 +14,8 @@
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
from frozendict import frozendict
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes, Membership
|
from synapse.api.constants import EventTypes, Membership
|
||||||
from synapse.api.room_versions import RoomVersions
|
from synapse.api.room_versions import RoomVersions
|
||||||
from synapse.storage.state import StateFilter
|
from synapse.storage.state import StateFilter
|
||||||
@@ -183,7 +185,9 @@ class StateStoreTestCase(HomeserverTestCase):
|
|||||||
self.storage.state.get_state_for_event(
|
self.storage.state.get_state_for_event(
|
||||||
e5.event_id,
|
e5.event_id,
|
||||||
state_filter=StateFilter(
|
state_filter=StateFilter(
|
||||||
types={EventTypes.Member: {self.u_alice.to_string()}},
|
types=frozendict(
|
||||||
|
{EventTypes.Member: frozenset({self.u_alice.to_string()})}
|
||||||
|
),
|
||||||
include_others=True,
|
include_others=True,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
@@ -203,7 +207,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
|||||||
self.storage.state.get_state_for_event(
|
self.storage.state.get_state_for_event(
|
||||||
e5.event_id,
|
e5.event_id,
|
||||||
state_filter=StateFilter(
|
state_filter=StateFilter(
|
||||||
types={EventTypes.Member: set()}, include_others=True
|
types=frozendict({EventTypes.Member: frozenset()}),
|
||||||
|
include_others=True,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
@@ -228,7 +233,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
|||||||
self.state_datastore._state_group_cache,
|
self.state_datastore._state_group_cache,
|
||||||
group,
|
group,
|
||||||
state_filter=StateFilter(
|
state_filter=StateFilter(
|
||||||
types={EventTypes.Member: set()}, include_others=True
|
types=frozendict({EventTypes.Member: frozenset()}), include_others=True
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -245,7 +250,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
|||||||
self.state_datastore._state_group_members_cache,
|
self.state_datastore._state_group_members_cache,
|
||||||
group,
|
group,
|
||||||
state_filter=StateFilter(
|
state_filter=StateFilter(
|
||||||
types={EventTypes.Member: set()}, include_others=True
|
types=frozendict({EventTypes.Member: frozenset()}), include_others=True
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -258,7 +263,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
|||||||
self.state_datastore._state_group_cache,
|
self.state_datastore._state_group_cache,
|
||||||
group,
|
group,
|
||||||
state_filter=StateFilter(
|
state_filter=StateFilter(
|
||||||
types={EventTypes.Member: None}, include_others=True
|
types=frozendict({EventTypes.Member: None}), include_others=True
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -275,7 +280,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
|||||||
self.state_datastore._state_group_members_cache,
|
self.state_datastore._state_group_members_cache,
|
||||||
group,
|
group,
|
||||||
state_filter=StateFilter(
|
state_filter=StateFilter(
|
||||||
types={EventTypes.Member: None}, include_others=True
|
types=frozendict({EventTypes.Member: None}), include_others=True
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -295,7 +300,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
|||||||
self.state_datastore._state_group_cache,
|
self.state_datastore._state_group_cache,
|
||||||
group,
|
group,
|
||||||
state_filter=StateFilter(
|
state_filter=StateFilter(
|
||||||
types={EventTypes.Member: {e5.state_key}}, include_others=True
|
types=frozendict({EventTypes.Member: frozenset({e5.state_key})}),
|
||||||
|
include_others=True,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -312,7 +318,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
|||||||
self.state_datastore._state_group_members_cache,
|
self.state_datastore._state_group_members_cache,
|
||||||
group,
|
group,
|
||||||
state_filter=StateFilter(
|
state_filter=StateFilter(
|
||||||
types={EventTypes.Member: {e5.state_key}}, include_others=True
|
types=frozendict({EventTypes.Member: frozenset({e5.state_key})}),
|
||||||
|
include_others=True,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -325,7 +332,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
|||||||
self.state_datastore._state_group_members_cache,
|
self.state_datastore._state_group_members_cache,
|
||||||
group,
|
group,
|
||||||
state_filter=StateFilter(
|
state_filter=StateFilter(
|
||||||
types={EventTypes.Member: {e5.state_key}}, include_others=False
|
types=frozendict({EventTypes.Member: frozenset({e5.state_key})}),
|
||||||
|
include_others=False,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -375,7 +383,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
|||||||
self.state_datastore._state_group_cache,
|
self.state_datastore._state_group_cache,
|
||||||
group,
|
group,
|
||||||
state_filter=StateFilter(
|
state_filter=StateFilter(
|
||||||
types={EventTypes.Member: set()}, include_others=True
|
types=frozendict({EventTypes.Member: frozenset()}), include_others=True
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -387,7 +395,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
|||||||
self.state_datastore._state_group_members_cache,
|
self.state_datastore._state_group_members_cache,
|
||||||
group,
|
group,
|
||||||
state_filter=StateFilter(
|
state_filter=StateFilter(
|
||||||
types={EventTypes.Member: set()}, include_others=True
|
types=frozendict({EventTypes.Member: frozenset()}), include_others=True
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -400,7 +408,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
|||||||
self.state_datastore._state_group_cache,
|
self.state_datastore._state_group_cache,
|
||||||
group,
|
group,
|
||||||
state_filter=StateFilter(
|
state_filter=StateFilter(
|
||||||
types={EventTypes.Member: None}, include_others=True
|
types=frozendict({EventTypes.Member: None}), include_others=True
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -411,7 +419,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
|||||||
self.state_datastore._state_group_members_cache,
|
self.state_datastore._state_group_members_cache,
|
||||||
group,
|
group,
|
||||||
state_filter=StateFilter(
|
state_filter=StateFilter(
|
||||||
types={EventTypes.Member: None}, include_others=True
|
types=frozendict({EventTypes.Member: None}), include_others=True
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -430,7 +438,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
|||||||
self.state_datastore._state_group_cache,
|
self.state_datastore._state_group_cache,
|
||||||
group,
|
group,
|
||||||
state_filter=StateFilter(
|
state_filter=StateFilter(
|
||||||
types={EventTypes.Member: {e5.state_key}}, include_others=True
|
types=frozendict({EventTypes.Member: frozenset({e5.state_key})}),
|
||||||
|
include_others=True,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -441,7 +450,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
|||||||
self.state_datastore._state_group_members_cache,
|
self.state_datastore._state_group_members_cache,
|
||||||
group,
|
group,
|
||||||
state_filter=StateFilter(
|
state_filter=StateFilter(
|
||||||
types={EventTypes.Member: {e5.state_key}}, include_others=True
|
types=frozendict({EventTypes.Member: frozenset({e5.state_key})}),
|
||||||
|
include_others=True,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -454,7 +464,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
|||||||
self.state_datastore._state_group_cache,
|
self.state_datastore._state_group_cache,
|
||||||
group,
|
group,
|
||||||
state_filter=StateFilter(
|
state_filter=StateFilter(
|
||||||
types={EventTypes.Member: {e5.state_key}}, include_others=False
|
types=frozendict({EventTypes.Member: frozenset({e5.state_key})}),
|
||||||
|
include_others=False,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -465,7 +476,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
|||||||
self.state_datastore._state_group_members_cache,
|
self.state_datastore._state_group_members_cache,
|
||||||
group,
|
group,
|
||||||
state_filter=StateFilter(
|
state_filter=StateFilter(
|
||||||
types={EventTypes.Member: {e5.state_key}}, include_others=False
|
types=frozendict({EventTypes.Member: frozenset({e5.state_key})}),
|
||||||
|
include_others=False,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user