mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-13 01:50:46 +00:00
Compare commits
13 Commits
madlittlem
...
rei/gsgfg
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2ccbea4107 | ||
|
|
0736840f1e | ||
|
|
4bb7bc8ffd | ||
|
|
990f3b5003 | ||
|
|
bbb0473cd0 | ||
|
|
dcb6fc5023 | ||
|
|
215019cd66 | ||
|
|
2f7eeefa4b | ||
|
|
5fa9110c24 | ||
|
|
b09de10dff | ||
|
|
ae9d273534 | ||
|
|
cefcab7734 | ||
|
|
507cafc2c3 |
1
changelog.d/10510.misc
Normal file
1
changelog.d/10510.misc
Normal file
@@ -0,0 +1 @@
|
||||
Make _get_state_groups_from_groups use caching (for each individual group to query).
|
||||
1
mypy.ini
1
mypy.ini
@@ -88,6 +88,7 @@ files =
|
||||
tests/handlers/test_password_providers.py,
|
||||
tests/rest/client/v1/test_login.py,
|
||||
tests/rest/client/v2_alpha/test_auth.py,
|
||||
tests/storage/test_state.py,
|
||||
tests/util/test_itertools.py,
|
||||
tests/util/test_stream_change_cache.py
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import typing
|
||||
from typing import Optional
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
@@ -20,6 +21,9 @@ from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.state import StateFilter
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from synapse.types import StateMap
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -72,12 +76,12 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
|
||||
|
||||
return count
|
||||
|
||||
def _get_state_groups_from_groups_txn(
|
||||
self, txn, groups, state_filter: Optional[StateFilter] = None
|
||||
):
|
||||
def _get_state_groups_from_group_txn(
|
||||
self, txn, group: int, state_filter: Optional[StateFilter] = None
|
||||
) -> "StateMap[str]":
|
||||
state_filter = state_filter or StateFilter.all()
|
||||
|
||||
results = {group: {} for group in groups}
|
||||
result = {}
|
||||
|
||||
where_clause, where_args = state_filter.make_sql_filter_clause()
|
||||
|
||||
@@ -116,64 +120,62 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
|
||||
ORDER BY type, state_key, state_group DESC
|
||||
"""
|
||||
|
||||
for group in groups:
|
||||
args = [group]
|
||||
args.extend(where_args)
|
||||
args = [group]
|
||||
args.extend(where_args)
|
||||
|
||||
txn.execute(sql % (where_clause,), args)
|
||||
for row in txn:
|
||||
typ, state_key, event_id = row
|
||||
key = (typ, state_key)
|
||||
results[group][key] = event_id
|
||||
txn.execute(sql % (where_clause,), args)
|
||||
for row in txn:
|
||||
typ, state_key, event_id = row
|
||||
key = (typ, state_key)
|
||||
result[key] = event_id
|
||||
else:
|
||||
max_entries_returned = state_filter.max_entries_returned()
|
||||
|
||||
# We don't use WITH RECURSIVE on sqlite3 as there are distributions
|
||||
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
|
||||
for group in groups:
|
||||
next_group = group
|
||||
next_group = group
|
||||
|
||||
while next_group:
|
||||
# We did this before by getting the list of group ids, and
|
||||
# then passing that list to sqlite to get latest event for
|
||||
# each (type, state_key). However, that was terribly slow
|
||||
# without the right indices (which we can't add until
|
||||
# after we finish deduping state, which requires this func)
|
||||
args = [next_group]
|
||||
args.extend(where_args)
|
||||
while next_group:
|
||||
# We did this before by getting the list of group ids, and
|
||||
# then passing that list to sqlite to get latest event for
|
||||
# each (type, state_key). However, that was terribly slow
|
||||
# without the right indices (which we can't add until
|
||||
# after we finish deduping state, which requires this func)
|
||||
args = [next_group]
|
||||
args.extend(where_args)
|
||||
|
||||
txn.execute(
|
||||
"SELECT type, state_key, event_id FROM state_groups_state"
|
||||
" WHERE state_group = ? " + where_clause,
|
||||
args,
|
||||
)
|
||||
results[group].update(
|
||||
((typ, state_key), event_id)
|
||||
for typ, state_key, event_id in txn
|
||||
if (typ, state_key) not in results[group]
|
||||
)
|
||||
txn.execute(
|
||||
"SELECT type, state_key, event_id FROM state_groups_state"
|
||||
" WHERE state_group = ? " + where_clause,
|
||||
args,
|
||||
)
|
||||
result.update(
|
||||
((typ, state_key), event_id)
|
||||
for typ, state_key, event_id in txn
|
||||
if (typ, state_key) not in result
|
||||
)
|
||||
|
||||
# If the number of entries in the (type,state_key)->event_id dict
|
||||
# matches the number of (type,state_keys) types we were searching
|
||||
# for, then we must have found them all, so no need to go walk
|
||||
# further down the tree... UNLESS our types filter contained
|
||||
# wildcards (i.e. Nones) in which case we have to do an exhaustive
|
||||
# search
|
||||
if (
|
||||
max_entries_returned is not None
|
||||
and len(results[group]) == max_entries_returned
|
||||
):
|
||||
break
|
||||
# If the number of entries in the (type,state_key)->event_id dict
|
||||
# matches the number of (type,state_keys) types we were searching
|
||||
# for, then we must have found them all, so no need to go walk
|
||||
# further down the tree... UNLESS our types filter contained
|
||||
# wildcards (i.e. Nones) in which case we have to do an exhaustive
|
||||
# search
|
||||
if (
|
||||
max_entries_returned is not None
|
||||
and len(result) == max_entries_returned
|
||||
):
|
||||
break
|
||||
|
||||
next_group = self.db_pool.simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="state_group_edges",
|
||||
keyvalues={"state_group": next_group},
|
||||
retcol="prev_state_group",
|
||||
allow_none=True,
|
||||
)
|
||||
next_group = self.db_pool.simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="state_group_edges",
|
||||
keyvalues={"state_group": next_group},
|
||||
retcol="prev_state_group",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
return results
|
||||
return result
|
||||
|
||||
|
||||
class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
|
||||
@@ -261,14 +263,10 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
|
||||
# otherwise read performance degrades.
|
||||
continue
|
||||
|
||||
prev_state = self._get_state_groups_from_groups_txn(
|
||||
txn, [prev_group]
|
||||
)
|
||||
prev_state = self._get_state_groups_from_group_txn(txn, prev_group)
|
||||
prev_state = prev_state[prev_group]
|
||||
|
||||
curr_state = self._get_state_groups_from_groups_txn(
|
||||
txn, [state_group]
|
||||
)
|
||||
curr_state = self._get_state_groups_from_group_txn(txn, state_group)
|
||||
curr_state = curr_state[state_group]
|
||||
|
||||
if not set(prev_state.keys()) - set(curr_state.keys()):
|
||||
|
||||
@@ -26,6 +26,7 @@ from synapse.storage.util.sequence import build_sequence_generator
|
||||
from synapse.types import MutableStateMap, StateMap
|
||||
from synapse.util.caches.descriptors import cached
|
||||
from synapse.util.caches.dictionary_cache import DictionaryCache
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -91,6 +92,15 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
500000,
|
||||
)
|
||||
|
||||
self._state_group_from_group_cache = ResponseCache(
|
||||
self.hs.get_clock(),
|
||||
# REVIEW: why do the other 2 have asterisks? should this one too?
|
||||
"*stateGroupFromGroupCache*",
|
||||
# we're only using this cache to track in-flight requests;
|
||||
# the results are added to another cache once complete.
|
||||
timeout_ms=0,
|
||||
)
|
||||
|
||||
def get_max_state_group_txn(txn: Cursor):
|
||||
txn.execute("SELECT COALESCE(max(id), 0) FROM state_groups")
|
||||
return txn.fetchone()[0]
|
||||
@@ -156,19 +166,39 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
"""
|
||||
results = {}
|
||||
|
||||
chunks = [groups[i : i + 100] for i in range(0, len(groups), 100)]
|
||||
for chunk in chunks:
|
||||
res = await self.db_pool.runInteraction(
|
||||
"_get_state_groups_from_groups",
|
||||
self._get_state_groups_from_groups_txn,
|
||||
chunk,
|
||||
state_filter,
|
||||
for group in groups:
|
||||
results[group] = await self._get_state_groups_from_group(
|
||||
group, state_filter
|
||||
)
|
||||
results.update(res)
|
||||
|
||||
return results
|
||||
|
||||
def _get_state_for_group_using_cache(self, cache, group, state_filter):
|
||||
async def _get_state_groups_from_group(
|
||||
self, group: int, state_filter: StateFilter
|
||||
) -> StateMap[str]:
|
||||
"""Returns the state groups for a given group from the
|
||||
database, filtering on types of state events.
|
||||
|
||||
Args:
|
||||
group: state group ID to query
|
||||
state_filter: The state filter used to fetch state
|
||||
from the database.
|
||||
Returns:
|
||||
state map
|
||||
"""
|
||||
|
||||
return await self._state_group_from_group_cache.wrap(
|
||||
(group, state_filter),
|
||||
self.db_pool.runInteraction,
|
||||
"_get_state_groups_from_group",
|
||||
self._get_state_groups_from_group_txn,
|
||||
group,
|
||||
state_filter,
|
||||
)
|
||||
|
||||
def _get_state_for_group_using_cache(
|
||||
self, cache: DictionaryCache, group: int, state_filter: StateFilter
|
||||
) -> Tuple[StateMap, bool]:
|
||||
"""Checks if group is in cache. See `_get_state_for_groups`
|
||||
|
||||
Args:
|
||||
@@ -546,7 +576,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
# groups to non delta versions.
|
||||
for sg in remaining_state_groups:
|
||||
logger.info("[purge] de-delta-ing remaining state group %s", sg)
|
||||
curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
|
||||
curr_state = self._get_state_groups_from_group_txn(txn, sg)
|
||||
curr_state = curr_state[sg]
|
||||
|
||||
self.db_pool.simple_delete_txn(
|
||||
|
||||
@@ -25,12 +25,15 @@ from typing import (
|
||||
)
|
||||
|
||||
import attr
|
||||
from frozendict import frozendict
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.events import EventBase
|
||||
from synapse.types import MutableStateMap, StateMap
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import FrozenSet # noqa: used within quoted type hint; flake8 sad
|
||||
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.databases import Databases
|
||||
|
||||
@@ -40,7 +43,7 @@ logger = logging.getLogger(__name__)
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
@attr.s(slots=True, frozen=True)
|
||||
class StateFilter:
|
||||
"""A filter used when querying for state.
|
||||
|
||||
@@ -53,14 +56,20 @@ class StateFilter:
|
||||
appear in `types`.
|
||||
"""
|
||||
|
||||
types = attr.ib(type=Dict[str, Optional[Set[str]]])
|
||||
types = attr.ib(type="frozendict[str, Optional[FrozenSet[str]]]")
|
||||
include_others = attr.ib(default=False, type=bool)
|
||||
|
||||
def __attrs_post_init__(self):
|
||||
# If `include_others` is set we canonicalise the filter by removing
|
||||
# wildcards from the types dictionary
|
||||
if self.include_others:
|
||||
self.types = {k: v for k, v in self.types.items() if v is not None}
|
||||
# REVIEW: yucky. any better way?
|
||||
# Work around this class being frozen.
|
||||
object.__setattr__(
|
||||
self,
|
||||
"types",
|
||||
frozendict({k: v for k, v in self.types.items() if v is not None}),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def all() -> "StateFilter":
|
||||
@@ -69,7 +78,7 @@ class StateFilter:
|
||||
Returns:
|
||||
The new state filter.
|
||||
"""
|
||||
return StateFilter(types={}, include_others=True)
|
||||
return StateFilter(types=frozendict(), include_others=True)
|
||||
|
||||
@staticmethod
|
||||
def none() -> "StateFilter":
|
||||
@@ -78,7 +87,7 @@ class StateFilter:
|
||||
Returns:
|
||||
The new state filter.
|
||||
"""
|
||||
return StateFilter(types={}, include_others=False)
|
||||
return StateFilter(types=frozendict(), include_others=False)
|
||||
|
||||
@staticmethod
|
||||
def from_types(types: Iterable[Tuple[str, Optional[str]]]) -> "StateFilter":
|
||||
@@ -103,7 +112,12 @@ class StateFilter:
|
||||
|
||||
type_dict.setdefault(typ, set()).add(s) # type: ignore
|
||||
|
||||
return StateFilter(types=type_dict)
|
||||
return StateFilter(
|
||||
types=frozendict(
|
||||
(k, frozenset(v) if v is not None else None)
|
||||
for k, v in type_dict.items()
|
||||
)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def from_lazy_load_member_list(members: Iterable[str]) -> "StateFilter":
|
||||
@@ -116,7 +130,10 @@ class StateFilter:
|
||||
Returns:
|
||||
The new state filter
|
||||
"""
|
||||
return StateFilter(types={EventTypes.Member: set(members)}, include_others=True)
|
||||
return StateFilter(
|
||||
types=frozendict({EventTypes.Member: frozenset(members)}),
|
||||
include_others=True,
|
||||
)
|
||||
|
||||
def return_expanded(self) -> "StateFilter":
|
||||
"""Creates a new StateFilter where type wild cards have been removed
|
||||
@@ -173,7 +190,7 @@ class StateFilter:
|
||||
# We want to return all non-members, but only particular
|
||||
# memberships
|
||||
return StateFilter(
|
||||
types={EventTypes.Member: self.types[EventTypes.Member]},
|
||||
types=frozendict({EventTypes.Member: self.types[EventTypes.Member]}),
|
||||
include_others=True,
|
||||
)
|
||||
|
||||
@@ -324,14 +341,16 @@ class StateFilter:
|
||||
if state_keys is None:
|
||||
member_filter = StateFilter.all()
|
||||
else:
|
||||
member_filter = StateFilter({EventTypes.Member: state_keys})
|
||||
member_filter = StateFilter(frozendict({EventTypes.Member: state_keys}))
|
||||
elif self.include_others:
|
||||
member_filter = StateFilter.all()
|
||||
else:
|
||||
member_filter = StateFilter.none()
|
||||
|
||||
non_member_filter = StateFilter(
|
||||
types={k: v for k, v in self.types.items() if k != EventTypes.Member},
|
||||
types=frozendict(
|
||||
{k: v for k, v in self.types.items() if k != EventTypes.Member}
|
||||
),
|
||||
include_others=self.include_others,
|
||||
)
|
||||
|
||||
|
||||
@@ -14,6 +14,8 @@
|
||||
|
||||
import logging
|
||||
|
||||
from frozendict import frozendict
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.room_versions import RoomVersions
|
||||
from synapse.storage.state import StateFilter
|
||||
@@ -183,7 +185,9 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.storage.state.get_state_for_event(
|
||||
e5.event_id,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: {self.u_alice.to_string()}},
|
||||
types=frozendict(
|
||||
{EventTypes.Member: frozenset({self.u_alice.to_string()})}
|
||||
),
|
||||
include_others=True,
|
||||
),
|
||||
)
|
||||
@@ -203,7 +207,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.storage.state.get_state_for_event(
|
||||
e5.event_id,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: set()}, include_others=True
|
||||
types=frozendict({EventTypes.Member: frozenset()}),
|
||||
include_others=True,
|
||||
),
|
||||
)
|
||||
)
|
||||
@@ -228,7 +233,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: set()}, include_others=True
|
||||
types=frozendict({EventTypes.Member: frozenset()}), include_others=True
|
||||
),
|
||||
)
|
||||
|
||||
@@ -245,7 +250,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_members_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: set()}, include_others=True
|
||||
types=frozendict({EventTypes.Member: frozenset()}), include_others=True
|
||||
),
|
||||
)
|
||||
|
||||
@@ -258,7 +263,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: None}, include_others=True
|
||||
types=frozendict({EventTypes.Member: None}), include_others=True
|
||||
),
|
||||
)
|
||||
|
||||
@@ -275,7 +280,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_members_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: None}, include_others=True
|
||||
types=frozendict({EventTypes.Member: None}), include_others=True
|
||||
),
|
||||
)
|
||||
|
||||
@@ -295,7 +300,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: {e5.state_key}}, include_others=True
|
||||
types=frozendict({EventTypes.Member: frozenset({e5.state_key})}),
|
||||
include_others=True,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -312,7 +318,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_members_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: {e5.state_key}}, include_others=True
|
||||
types=frozendict({EventTypes.Member: frozenset({e5.state_key})}),
|
||||
include_others=True,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -325,7 +332,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_members_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: {e5.state_key}}, include_others=False
|
||||
types=frozendict({EventTypes.Member: frozenset({e5.state_key})}),
|
||||
include_others=False,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -375,7 +383,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: set()}, include_others=True
|
||||
types=frozendict({EventTypes.Member: frozenset()}), include_others=True
|
||||
),
|
||||
)
|
||||
|
||||
@@ -387,7 +395,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_members_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: set()}, include_others=True
|
||||
types=frozendict({EventTypes.Member: frozenset()}), include_others=True
|
||||
),
|
||||
)
|
||||
|
||||
@@ -400,7 +408,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: None}, include_others=True
|
||||
types=frozendict({EventTypes.Member: None}), include_others=True
|
||||
),
|
||||
)
|
||||
|
||||
@@ -411,7 +419,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_members_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: None}, include_others=True
|
||||
types=frozendict({EventTypes.Member: None}), include_others=True
|
||||
),
|
||||
)
|
||||
|
||||
@@ -430,7 +438,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: {e5.state_key}}, include_others=True
|
||||
types=frozendict({EventTypes.Member: frozenset({e5.state_key})}),
|
||||
include_others=True,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -441,7 +450,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_members_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: {e5.state_key}}, include_others=True
|
||||
types=frozendict({EventTypes.Member: frozenset({e5.state_key})}),
|
||||
include_others=True,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -454,7 +464,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: {e5.state_key}}, include_others=False
|
||||
types=frozendict({EventTypes.Member: frozenset({e5.state_key})}),
|
||||
include_others=False,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -465,7 +476,8 @@ class StateStoreTestCase(HomeserverTestCase):
|
||||
self.state_datastore._state_group_members_cache,
|
||||
group,
|
||||
state_filter=StateFilter(
|
||||
types={EventTypes.Member: {e5.state_key}}, include_others=False
|
||||
types=frozendict({EventTypes.Member: frozenset({e5.state_key})}),
|
||||
include_others=False,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user