Compare commits

...

17 Commits

Author SHA1 Message Date
David Robertson
6c7aecd67f Use _clock again and note the typing problem 2021-11-24 12:35:19 +00:00
David Robertson
4686190945 SlavedEventStore doesn't need StreamWorkerStore
already has it via SlavedEventStore -> BaseEventStore ->
CacheInvalidationWorkerStore -> StreamWorkerStore
2021-11-18 18:48:08 +00:00
David Robertson
e5d15cc677 Fix typo 2021-11-18 18:42:56 +00:00
David Robertson
1138f18cde Use _clock; weaken StatsStore to StateDeltasStore
And while we're at it, StatsStore doesn't needs its own clock attribute.
2021-11-18 18:42:43 +00:00
David Robertson
dfe4779f43 Fixup portdb script
Being honest I just tried to do the minimal thing possible to get it to
work.
2021-11-16 18:47:57 +00:00
David Robertson
7f6c2aab57 Fixup imports 2021-11-16 18:47:57 +00:00
David Robertson
ee1803551d Mypy now passes. 2021-11-16 18:47:46 +00:00
David Robertson
13b9509e76 Pull in RoomMemberWorkerStore
for get_rooms_for_user_with_stream_ordering.

This was the most painful MRO problem to resolve.

Remove BaseSlavedStore and SlavedEventStore from
GenericWorkerSlavedStore. These are already pulled in indirectly by
inheriting from SlavedPushRuleStore.
2021-11-16 18:46:52 +00:00
David Robertson
ea34499c9b Pull in StatsStore
I need StateDeltasStore for _curr_state_delta_stream_cache.
I also need the `clock` attribute, which pathologically only seems to be
defined on StatsStore. StatsStore is a subclass of StateDeltasStore, so
let's just use that to keep the list of parents minimal.

To be explicit: I think it's very very odd that only StatsStore provides
a `clock`. I think pulling it in here is silly: why should
CacheInvalidationWorkerStore care about stats? Consider this a protest
commit to raise attention to the bizarre status quo.
2021-11-16 18:46:52 +00:00
David Robertson
d251ab8785 Cache needs StreamWorkerStore
for _membership_stream_cache
2021-11-16 18:46:51 +00:00
David Robertson
0e7d79b61d Pull in EventPushActionsWorkerStore 2021-11-16 18:46:51 +00:00
David Robertson
1dea20ca75 CacheInvalidationWorkerStore needs RelationsWorkerStore
for e.g. get_relations_for_event.

Remove it from SlavedEventsStore. (Still there via SlavedEventsStore ->
BaseSlavedStore -> CacheInvalidationWorkerStore -> RelationsWorkerStore.
2021-11-16 18:46:51 +00:00
David Robertson
58c20bbcb7 LoggingDatabaseConnection.__enter__ returns itself
Could use a fancy TypeVar here but I restrained myself.
2021-11-16 18:46:51 +00:00
David Robertson
f7111a9805 mark _cache_id_gen as a required mixin property 2021-11-16 18:46:47 +00:00
David Robertson
9a316ecd3c SCARY: Pull in EventFederationWorkerStore
for e.g. get_latest_event_ids_in_room

Needed on CacheInvalidationWorkerStore for many functions, e.g.
`have_seen_event`.

Fixup MRO in CensorEventsStore.

The BaseSlavedStore is trickier. The removals should be safe because the
removed classes are in parents, via SlavedEventStore -> BaseSlavedStore
-> CacheInvalidationWorkerStore
2021-11-16 11:49:11 +00:00
David Robertson
e140beeb56 Easier function annotations 2021-11-16 11:48:45 +00:00
David Robertson
bb150be1ad Annotate get_all_updates_caches_txn 2021-11-15 13:00:04 +00:00
17 changed files with 86 additions and 68 deletions

1
changelog.d/11354.misc Normal file
View File

@@ -0,0 +1 @@
Add type hints to storage classes.

View File

@@ -26,7 +26,6 @@ exclude = (?x)
|synapse/storage/databases/__init__.py
|synapse/storage/databases/main/__init__.py
|synapse/storage/databases/main/account_data.py
|synapse/storage/databases/main/cache.py
|synapse/storage/databases/main/devices.py
|synapse/storage/databases/main/e2e_room_keys.py
|synapse/storage/databases/main/end_to_end_keys.py

View File

@@ -35,6 +35,7 @@ from synapse.logging.context import (
make_deferred_yieldable,
run_in_background,
)
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.storage.database import DatabasePool, make_conn
from synapse.storage.databases.main.client_ips import ClientIpBackgroundUpdateStore
from synapse.storage.databases.main.deviceinbox import DeviceInboxBackgroundUpdateStore
@@ -57,7 +58,6 @@ from synapse.storage.databases.main.room import RoomBackgroundUpdateStore
from synapse.storage.databases.main.roommember import RoomMemberBackgroundUpdateStore
from synapse.storage.databases.main.search import SearchBackgroundUpdateStore
from synapse.storage.databases.main.state import MainStateBackgroundUpdateStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.user_directory import (
UserDirectoryBackgroundUpdateStore,
)
@@ -179,10 +179,10 @@ class Store(
MainStateBackgroundUpdateStore,
UserDirectoryBackgroundUpdateStore,
EndToEndKeyBackgroundStore,
StatsStore,
PusherWorkerStore,
PresenceBackgroundUpdateStore,
GroupServerWorkerStore,
SlavedEventStore,
):
def execute(self, f, *args, **kwargs):
return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
@@ -229,6 +229,10 @@ class MockHomeserver:
def get_instance_name(self):
return "master"
def should_send_federation(self) -> bool:
"Should this server be sending federation traffic directly?"
return False
class Porter(object):
def __init__(self, **kwargs):

View File

@@ -28,7 +28,6 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.events import EventBase
from synapse.handlers.admin import ExfiltrationWriter
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@@ -61,7 +60,6 @@ class AdminCmdSlavedStore(
SlavedPushRuleStore,
SlavedEventStore,
SlavedClientIpStore,
BaseSlavedStore,
RoomWorkerStore,
):
pass

View File

@@ -47,14 +47,12 @@ from synapse.http.site import SynapseRequest, SynapseSite
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
@@ -114,7 +112,6 @@ from synapse.storage.databases.main.presence import PresenceStore
from synapse.storage.databases.main.room import RoomWorkerStore
from synapse.storage.databases.main.search import SearchStore
from synapse.storage.databases.main.session import SessionStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
@@ -223,7 +220,6 @@ class GenericWorkerSlavedStore(
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
# rather than going via the correct worker.
UserDirectoryStore,
StatsStore,
UIAuthWorkerStore,
EndToEndRoomKeyStore,
PresenceStore,
@@ -236,7 +232,6 @@ class GenericWorkerSlavedStore(
SlavedPusherStore,
CensorEventsStore,
ClientIpWorkerStore,
SlavedEventStore,
SlavedKeyStore,
RoomWorkerStore,
DirectoryStore,
@@ -252,7 +247,6 @@ class GenericWorkerSlavedStore(
TransactionWorkerStore,
LockStore,
SessionStore,
BaseSlavedStore,
):
# Properties that multiple storage classes define. Tell mypy what the
# expected type is.

View File

@@ -13,7 +13,7 @@
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
@@ -30,9 +30,7 @@ class BaseSlavedStore(CacheInvalidationWorkerStore):
def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
super().__init__(database, db_conn, hs)
if isinstance(self.database_engine, PostgresEngine):
self._cache_id_gen: Optional[
MultiWriterIdGenerator
] = MultiWriterIdGenerator(
self._cache_id_gen = MultiWriterIdGenerator(
db_conn,
database,
stream_name="caches",

View File

@@ -16,16 +16,7 @@ import logging
from typing import TYPE_CHECKING
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.event_federation import EventFederationWorkerStore
from synapse.storage.databases.main.event_push_actions import (
EventPushActionsWorkerStore,
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.relations import RelationsWorkerStore
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
from synapse.storage.databases.main.state import StateGroupWorkerStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -47,15 +38,8 @@ logger = logging.getLogger(__name__)
class SlavedEventStore(
EventFederationWorkerStore,
RoomMemberWorkerStore,
EventPushActionsWorkerStore,
StreamWorkerStore,
StateGroupWorkerStore,
EventsWorkerStore,
SignatureWorkerStore,
UserErasureWorkerStore,
RelationsWorkerStore,
BaseSlavedStore,
):
def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):

View File

@@ -20,7 +20,7 @@ from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
from .events import SlavedEventStore
class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
class SlavedPushRuleStore(PushRulesWorkerStore, SlavedEventStore):
def get_max_push_rules_stream_id(self):
return self._push_rules_stream_id_gen.get_current_token()

View File

@@ -175,7 +175,7 @@ class LoggingDatabaseConnection:
def rollback(self) -> None:
self.conn.rollback()
def __enter__(self) -> "Connection":
def __enter__(self) -> "LoggingDatabaseConnection":
self.conn.__enter__()
return self

View File

@@ -67,7 +67,6 @@ from .search import SearchStore
from .session import SessionStore
from .signatures import SignatureStore
from .state import StateStore
from .stats import StatsStore
from .stream import StreamStore
from .tags import TagsStore
from .transactions import TransactionWorkerStore
@@ -119,7 +118,6 @@ class DataStore(
GroupServerStore,
UserErasureStore,
MonthlyActiveUsersStore,
StatsStore,
RelationsStore,
CensorEventsStore,
UIAuthStore,
@@ -154,7 +152,6 @@ class DataStore(
db_conn, "local_group_updates", "stream_id"
)
self._cache_id_gen: Optional[MultiWriterIdGenerator]
if isinstance(self.database_engine, PostgresEngine):
# We set the `writers` to an empty list here as we don't care about
# missing updates over restarts, as we'll not have anything in our

View File

@@ -15,7 +15,7 @@
import itertools
import logging
from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Tuple
from typing import TYPE_CHECKING, Collection, Iterable, List, Optional, Tuple
from synapse.api.constants import EventTypes
from synapse.replication.tcp.streams import BackfillStream, CachesStream
@@ -24,9 +24,22 @@ from synapse.replication.tcp.streams.events import (
EventsStreamCurrentStateRow,
EventsStreamEventRow,
)
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.databases.main.event_federation import EventFederationWorkerStore
from synapse.storage.databases.main.event_push_actions import (
EventPushActionsWorkerStore,
)
from synapse.storage.databases.main.relations import RelationsWorkerStore
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.databases.main.state_deltas import StateDeltasStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.util.caches.descriptors import _CachedFunction
from synapse.util.iterutils import batch_iter
if TYPE_CHECKING:
@@ -39,16 +52,35 @@ logger = logging.getLogger(__name__)
# based on the current state when notifying workers over replication.
CURRENT_STATE_CACHE_NAME = "cs_cache_fake"
# Corresponds to the (cache_func, keys, invalidation_ts) db columns.
_CacheData = Tuple[str, Optional[List[str]], Optional[int]]
class CacheInvalidationWorkerStore(SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
class CacheInvalidationWorkerStore(
EventFederationWorkerStore,
RelationsWorkerStore,
EventPushActionsWorkerStore,
StreamWorkerStore,
StateDeltasStore,
RoomMemberWorkerStore,
):
# This class must be mixed in with a child class which provides the following
# attribute. TODO: can we get static analysis to enforce this?
_cache_id_gen: Optional[MultiWriterIdGenerator]
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
self._instance_name = hs.get_instance_name()
async def get_all_updated_caches(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
) -> Tuple[List[Tuple[int, _CacheData]], int, bool]:
"""Get updates for caches replication stream.
Args:
@@ -73,7 +105,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
if last_id == current_id:
return [], current_id, False
def get_all_updated_caches_txn(txn):
def get_all_updated_caches_txn(
txn: LoggingTransaction,
) -> Tuple[List[Tuple[int, _CacheData]], int, bool]:
# We purposefully don't bound by the current token, as we want to
# send across cache invalidations as quickly as possible. Cache
# invalidations are idempotent, so duplicates are fine.
@@ -85,7 +119,13 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
LIMIT ?
"""
txn.execute(sql, (last_id, instance_name, limit))
updates = [(row[0], row[1:]) for row in txn]
updates: List[Tuple[int, _CacheData]] = []
row: Tuple[int, str, Optional[List[str]], Optional[int]]
# Type safety: iterating over `txn` yields `Tuple`, i.e.
# `Tuple[Any, ...]` of arbitrary length. Mypy detects assigning a
# variadic tuple to a fixed length tuple and flags it up as an error.
for row in txn: # type: ignore[assignment]
updates.append((row[0], row[1:]))
limited = False
upto_token = current_id
if len(updates) >= limit:
@@ -192,7 +232,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self.get_aggregation_groups_for_event.invalidate((relates_to,))
self.get_applicable_edit.invalidate((relates_to,))
async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]):
async def invalidate_cache_and_stream(
self, cache_name: str, keys: Tuple[str, ...]
) -> None:
"""Invalidates the cache and adds it to the cache stream so slaves
will know to invalidate their caches.
@@ -212,7 +254,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
keys,
)
def _invalidate_cache_and_stream(self, txn, cache_func, keys):
def _invalidate_cache_and_stream(
self, txn: LoggingTransaction, cache_func: _CachedFunction, keys: Iterable[str]
) -> None:
"""Invalidates the cache and adds it to the cache stream so slaves
will know to invalidate their caches.
@@ -223,7 +267,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
txn.call_after(cache_func.invalidate, keys)
self._send_invalidation_to_replication(txn, cache_func.__name__, keys)
def _invalidate_all_cache_and_stream(self, txn, cache_func):
def _invalidate_all_cache_and_stream(
self, txn: LoggingTransaction, cache_func: _CachedFunction
) -> None:
"""Invalidates the entire cache and adds it to the cache stream so slaves
will know to invalidate their caches.
"""
@@ -231,7 +277,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
txn.call_after(cache_func.invalidate_all)
self._send_invalidation_to_replication(txn, cache_func.__name__, None)
def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed):
def _invalidate_state_caches_and_stream(
self, txn: LoggingTransaction, room_id: str, members_changed: Collection[str]
) -> None:
"""Special case invalidation of caches based on current state.
We special case this so that we can batch the cache invalidations into a
@@ -239,8 +287,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
Args:
txn
room_id (str): Room where state changed
members_changed (iterable[str]): The user_ids of members that have changed
room_id: Room where state changed
members_changed: The user_ids of members that have changed
"""
txn.call_after(self._invalidate_state_caches, room_id, members_changed)
@@ -262,8 +310,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
)
def _send_invalidation_to_replication(
self, txn, cache_name: str, keys: Optional[Iterable[Any]]
):
self, txn: LoggingTransaction, cache_name: str, keys: Optional[Iterable[str]]
) -> None:
"""Notifies replication that given cache has been invalidated.
Note that this does *not* invalidate the cache locally.
@@ -284,6 +332,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
# the transaction. However, we want to only get an ID when we want
# to use it, here, so we need to call __enter__ manually, and have
# __exit__ called after the transaction finishes.
assert self._cache_id_gen is not None
stream_id = self._cache_id_gen.get_next_txn(txn)
txn.call_after(self.hs.get_notifier().on_new_replication_data)
@@ -298,7 +347,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
"instance_name": self._instance_name,
"cache_func": cache_name,
"keys": keys,
"invalidation_ts": self.clock.time_msec(),
"invalidation_ts": self._clock.time_msec(),
},
)

View File

@@ -17,10 +17,8 @@ from typing import TYPE_CHECKING, Optional
from synapse.events.utils import prune_event_dict
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.util import json_encoder
if TYPE_CHECKING:
@@ -30,7 +28,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBaseStore):
class CensorEventsStore(CacheInvalidationWorkerStore):
def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
super().__init__(database, db_conn, hs)

View File

@@ -18,15 +18,11 @@ from typing import Any, Dict, List
from synapse.api.errors import SynapseError
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main import CacheInvalidationWorkerStore
from synapse.storage.databases.main.event_federation import EventFederationWorkerStore
logger = logging.getLogger(__name__)
class EventForwardExtremitiesStore(
EventFederationWorkerStore,
CacheInvalidationWorkerStore,
):
class EventForwardExtremitiesStore(CacheInvalidationWorkerStore):
async def delete_forward_extremities_for_room(self, room_id: str) -> int:
"""Delete any extra forward extremities for a room.

View File

@@ -19,7 +19,7 @@ from typing import TYPE_CHECKING, Dict, List, Tuple, Union
from synapse.api.errors import NotFoundError, StoreError
from synapse.push.baserules import list_with_base_rules
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage._base import db_to_json
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.appservice import ApplicationServiceWorkerStore
from synapse.storage.databases.main.events_worker import EventsWorkerStore
@@ -71,7 +71,6 @@ class PushRulesWorkerStore(
PusherWorkerStore,
RoomMemberWorkerStore,
EventsWorkerStore,
SQLBaseStore,
metaclass=abc.ABCMeta,
):
"""This is an abstract base class where subclasses must implement

View File

@@ -1877,7 +1877,7 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
return res if res else False
class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
class RegistrationStore(RegistrationBackgroundUpdateStore, StatsStore):
def __init__(
self,
database: DatabasePool,

View File

@@ -100,7 +100,6 @@ class StatsStore(StateDeltasStore):
super().__init__(database, db_conn, hs)
self.server_name = hs.hostname
self.clock = self.hs.get_clock()
self.stats_enabled = hs.config.stats.stats_enabled
self.stats_delta_processing_lock = DeferredLock()
@@ -601,7 +600,7 @@ class StatsStore(StateDeltasStore):
local_users_in_room = [u for u in users_in_room if self.hs.is_mine_id(u)]
await self.update_stats_delta(
ts=self.clock.time_msec(),
ts=self._clock.time_msec(),
stats_type="room",
stats_id=room_id,
fields={},
@@ -638,7 +637,7 @@ class StatsStore(StateDeltasStore):
)
await self.update_stats_delta(
ts=self.clock.time_msec(),
ts=self._clock.time_msec(),
stats_type="user",
stats_id=user_id,
fields={},

View File

@@ -146,7 +146,9 @@ async def filter_events_for_client(
max_lifetime = retention_policy.get("max_lifetime")
if max_lifetime is not None:
oldest_allowed_ts = storage.main.clock.time_msec() - max_lifetime
# TODO: reveal_type(storage.main) yields Any. Can we find a way of
# telling mypy that storage.main is a generic `DataStoreT`?
oldest_allowed_ts = storage.main._clock.time_msec() - max_lifetime
if event.origin_server_ts < oldest_allowed_ts:
return None