mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-07 01:20:16 +00:00
Compare commits
17 Commits
erikj/dock
...
dmr/typing
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6c7aecd67f | ||
|
|
4686190945 | ||
|
|
e5d15cc677 | ||
|
|
1138f18cde | ||
|
|
dfe4779f43 | ||
|
|
7f6c2aab57 | ||
|
|
ee1803551d | ||
|
|
13b9509e76 | ||
|
|
ea34499c9b | ||
|
|
d251ab8785 | ||
|
|
0e7d79b61d | ||
|
|
1dea20ca75 | ||
|
|
58c20bbcb7 | ||
|
|
f7111a9805 | ||
|
|
9a316ecd3c | ||
|
|
e140beeb56 | ||
|
|
bb150be1ad |
1
changelog.d/11354.misc
Normal file
1
changelog.d/11354.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add type hints to storage classes.
|
||||
1
mypy.ini
1
mypy.ini
@@ -26,7 +26,6 @@ exclude = (?x)
|
||||
|synapse/storage/databases/__init__.py
|
||||
|synapse/storage/databases/main/__init__.py
|
||||
|synapse/storage/databases/main/account_data.py
|
||||
|synapse/storage/databases/main/cache.py
|
||||
|synapse/storage/databases/main/devices.py
|
||||
|synapse/storage/databases/main/e2e_room_keys.py
|
||||
|synapse/storage/databases/main/end_to_end_keys.py
|
||||
|
||||
@@ -35,6 +35,7 @@ from synapse.logging.context import (
|
||||
make_deferred_yieldable,
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.storage.database import DatabasePool, make_conn
|
||||
from synapse.storage.databases.main.client_ips import ClientIpBackgroundUpdateStore
|
||||
from synapse.storage.databases.main.deviceinbox import DeviceInboxBackgroundUpdateStore
|
||||
@@ -57,7 +58,6 @@ from synapse.storage.databases.main.room import RoomBackgroundUpdateStore
|
||||
from synapse.storage.databases.main.roommember import RoomMemberBackgroundUpdateStore
|
||||
from synapse.storage.databases.main.search import SearchBackgroundUpdateStore
|
||||
from synapse.storage.databases.main.state import MainStateBackgroundUpdateStore
|
||||
from synapse.storage.databases.main.stats import StatsStore
|
||||
from synapse.storage.databases.main.user_directory import (
|
||||
UserDirectoryBackgroundUpdateStore,
|
||||
)
|
||||
@@ -179,10 +179,10 @@ class Store(
|
||||
MainStateBackgroundUpdateStore,
|
||||
UserDirectoryBackgroundUpdateStore,
|
||||
EndToEndKeyBackgroundStore,
|
||||
StatsStore,
|
||||
PusherWorkerStore,
|
||||
PresenceBackgroundUpdateStore,
|
||||
GroupServerWorkerStore,
|
||||
SlavedEventStore,
|
||||
):
|
||||
def execute(self, f, *args, **kwargs):
|
||||
return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
|
||||
@@ -229,6 +229,10 @@ class MockHomeserver:
|
||||
def get_instance_name(self):
|
||||
return "master"
|
||||
|
||||
def should_send_federation(self) -> bool:
|
||||
"Should this server be sending federation traffic directly?"
|
||||
return False
|
||||
|
||||
|
||||
class Porter(object):
|
||||
def __init__(self, **kwargs):
|
||||
|
||||
@@ -28,7 +28,6 @@ from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.events import EventBase
|
||||
from synapse.handlers.admin import ExfiltrationWriter
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
@@ -61,7 +60,6 @@ class AdminCmdSlavedStore(
|
||||
SlavedPushRuleStore,
|
||||
SlavedEventStore,
|
||||
SlavedClientIpStore,
|
||||
BaseSlavedStore,
|
||||
RoomWorkerStore,
|
||||
):
|
||||
pass
|
||||
|
||||
@@ -47,14 +47,12 @@ from synapse.http.site import SynapseRequest, SynapseSite
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
|
||||
from synapse.replication.slave.storage.devices import SlavedDeviceStore
|
||||
from synapse.replication.slave.storage.directory import DirectoryStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
|
||||
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
|
||||
from synapse.replication.slave.storage.keys import SlavedKeyStore
|
||||
@@ -114,7 +112,6 @@ from synapse.storage.databases.main.presence import PresenceStore
|
||||
from synapse.storage.databases.main.room import RoomWorkerStore
|
||||
from synapse.storage.databases.main.search import SearchStore
|
||||
from synapse.storage.databases.main.session import SessionStore
|
||||
from synapse.storage.databases.main.stats import StatsStore
|
||||
from synapse.storage.databases.main.transactions import TransactionWorkerStore
|
||||
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
|
||||
from synapse.storage.databases.main.user_directory import UserDirectoryStore
|
||||
@@ -223,7 +220,6 @@ class GenericWorkerSlavedStore(
|
||||
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
|
||||
# rather than going via the correct worker.
|
||||
UserDirectoryStore,
|
||||
StatsStore,
|
||||
UIAuthWorkerStore,
|
||||
EndToEndRoomKeyStore,
|
||||
PresenceStore,
|
||||
@@ -236,7 +232,6 @@ class GenericWorkerSlavedStore(
|
||||
SlavedPusherStore,
|
||||
CensorEventsStore,
|
||||
ClientIpWorkerStore,
|
||||
SlavedEventStore,
|
||||
SlavedKeyStore,
|
||||
RoomWorkerStore,
|
||||
DirectoryStore,
|
||||
@@ -252,7 +247,6 @@ class GenericWorkerSlavedStore(
|
||||
TransactionWorkerStore,
|
||||
LockStore,
|
||||
SessionStore,
|
||||
BaseSlavedStore,
|
||||
):
|
||||
# Properties that multiple storage classes define. Tell mypy what the
|
||||
# expected type is.
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
|
||||
@@ -30,9 +30,7 @@ class BaseSlavedStore(CacheInvalidationWorkerStore):
|
||||
def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
|
||||
super().__init__(database, db_conn, hs)
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
self._cache_id_gen: Optional[
|
||||
MultiWriterIdGenerator
|
||||
] = MultiWriterIdGenerator(
|
||||
self._cache_id_gen = MultiWriterIdGenerator(
|
||||
db_conn,
|
||||
database,
|
||||
stream_name="caches",
|
||||
|
||||
@@ -16,16 +16,7 @@ import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main.event_federation import EventFederationWorkerStore
|
||||
from synapse.storage.databases.main.event_push_actions import (
|
||||
EventPushActionsWorkerStore,
|
||||
)
|
||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.databases.main.relations import RelationsWorkerStore
|
||||
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
|
||||
from synapse.storage.databases.main.signatures import SignatureWorkerStore
|
||||
from synapse.storage.databases.main.state import StateGroupWorkerStore
|
||||
from synapse.storage.databases.main.stream import StreamWorkerStore
|
||||
from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
||||
@@ -47,15 +38,8 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SlavedEventStore(
|
||||
EventFederationWorkerStore,
|
||||
RoomMemberWorkerStore,
|
||||
EventPushActionsWorkerStore,
|
||||
StreamWorkerStore,
|
||||
StateGroupWorkerStore,
|
||||
EventsWorkerStore,
|
||||
SignatureWorkerStore,
|
||||
UserErasureWorkerStore,
|
||||
RelationsWorkerStore,
|
||||
BaseSlavedStore,
|
||||
):
|
||||
def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
|
||||
|
||||
@@ -20,7 +20,7 @@ from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
|
||||
from .events import SlavedEventStore
|
||||
|
||||
|
||||
class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
|
||||
class SlavedPushRuleStore(PushRulesWorkerStore, SlavedEventStore):
|
||||
def get_max_push_rules_stream_id(self):
|
||||
return self._push_rules_stream_id_gen.get_current_token()
|
||||
|
||||
|
||||
@@ -175,7 +175,7 @@ class LoggingDatabaseConnection:
|
||||
def rollback(self) -> None:
|
||||
self.conn.rollback()
|
||||
|
||||
def __enter__(self) -> "Connection":
|
||||
def __enter__(self) -> "LoggingDatabaseConnection":
|
||||
self.conn.__enter__()
|
||||
return self
|
||||
|
||||
|
||||
@@ -67,7 +67,6 @@ from .search import SearchStore
|
||||
from .session import SessionStore
|
||||
from .signatures import SignatureStore
|
||||
from .state import StateStore
|
||||
from .stats import StatsStore
|
||||
from .stream import StreamStore
|
||||
from .tags import TagsStore
|
||||
from .transactions import TransactionWorkerStore
|
||||
@@ -119,7 +118,6 @@ class DataStore(
|
||||
GroupServerStore,
|
||||
UserErasureStore,
|
||||
MonthlyActiveUsersStore,
|
||||
StatsStore,
|
||||
RelationsStore,
|
||||
CensorEventsStore,
|
||||
UIAuthStore,
|
||||
@@ -154,7 +152,6 @@ class DataStore(
|
||||
db_conn, "local_group_updates", "stream_id"
|
||||
)
|
||||
|
||||
self._cache_id_gen: Optional[MultiWriterIdGenerator]
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
# We set the `writers` to an empty list here as we don't care about
|
||||
# missing updates over restarts, as we'll not have anything in our
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Tuple
|
||||
from typing import TYPE_CHECKING, Collection, Iterable, List, Optional, Tuple
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.replication.tcp.streams import BackfillStream, CachesStream
|
||||
@@ -24,9 +24,22 @@ from synapse.replication.tcp.streams.events import (
|
||||
EventsStreamCurrentStateRow,
|
||||
EventsStreamEventRow,
|
||||
)
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.database import (
|
||||
DatabasePool,
|
||||
LoggingDatabaseConnection,
|
||||
LoggingTransaction,
|
||||
)
|
||||
from synapse.storage.databases.main.event_federation import EventFederationWorkerStore
|
||||
from synapse.storage.databases.main.event_push_actions import (
|
||||
EventPushActionsWorkerStore,
|
||||
)
|
||||
from synapse.storage.databases.main.relations import RelationsWorkerStore
|
||||
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
|
||||
from synapse.storage.databases.main.state_deltas import StateDeltasStore
|
||||
from synapse.storage.databases.main.stream import StreamWorkerStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
||||
from synapse.util.caches.descriptors import _CachedFunction
|
||||
from synapse.util.iterutils import batch_iter
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -39,16 +52,35 @@ logger = logging.getLogger(__name__)
|
||||
# based on the current state when notifying workers over replication.
|
||||
CURRENT_STATE_CACHE_NAME = "cs_cache_fake"
|
||||
|
||||
# Corresponds to the (cache_func, keys, invalidation_ts) db columns.
|
||||
_CacheData = Tuple[str, Optional[List[str]], Optional[int]]
|
||||
|
||||
class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
|
||||
|
||||
class CacheInvalidationWorkerStore(
|
||||
EventFederationWorkerStore,
|
||||
RelationsWorkerStore,
|
||||
EventPushActionsWorkerStore,
|
||||
StreamWorkerStore,
|
||||
StateDeltasStore,
|
||||
RoomMemberWorkerStore,
|
||||
):
|
||||
# This class must be mixed in with a child class which provides the following
|
||||
# attribute. TODO: can we get static analysis to enforce this?
|
||||
_cache_id_gen: Optional[MultiWriterIdGenerator]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
database: DatabasePool,
|
||||
db_conn: LoggingDatabaseConnection,
|
||||
hs: "HomeServer",
|
||||
):
|
||||
super().__init__(database, db_conn, hs)
|
||||
|
||||
self._instance_name = hs.get_instance_name()
|
||||
|
||||
async def get_all_updated_caches(
|
||||
self, instance_name: str, last_id: int, current_id: int, limit: int
|
||||
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
|
||||
) -> Tuple[List[Tuple[int, _CacheData]], int, bool]:
|
||||
"""Get updates for caches replication stream.
|
||||
|
||||
Args:
|
||||
@@ -73,7 +105,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
if last_id == current_id:
|
||||
return [], current_id, False
|
||||
|
||||
def get_all_updated_caches_txn(txn):
|
||||
def get_all_updated_caches_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Tuple[List[Tuple[int, _CacheData]], int, bool]:
|
||||
# We purposefully don't bound by the current token, as we want to
|
||||
# send across cache invalidations as quickly as possible. Cache
|
||||
# invalidations are idempotent, so duplicates are fine.
|
||||
@@ -85,7 +119,13 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
LIMIT ?
|
||||
"""
|
||||
txn.execute(sql, (last_id, instance_name, limit))
|
||||
updates = [(row[0], row[1:]) for row in txn]
|
||||
updates: List[Tuple[int, _CacheData]] = []
|
||||
row: Tuple[int, str, Optional[List[str]], Optional[int]]
|
||||
# Type safety: iterating over `txn` yields `Tuple`, i.e.
|
||||
# `Tuple[Any, ...]` of arbitrary length. Mypy detects assigning a
|
||||
# variadic tuple to a fixed length tuple and flags it up as an error.
|
||||
for row in txn: # type: ignore[assignment]
|
||||
updates.append((row[0], row[1:]))
|
||||
limited = False
|
||||
upto_token = current_id
|
||||
if len(updates) >= limit:
|
||||
@@ -192,7 +232,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
self.get_aggregation_groups_for_event.invalidate((relates_to,))
|
||||
self.get_applicable_edit.invalidate((relates_to,))
|
||||
|
||||
async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]):
|
||||
async def invalidate_cache_and_stream(
|
||||
self, cache_name: str, keys: Tuple[str, ...]
|
||||
) -> None:
|
||||
"""Invalidates the cache and adds it to the cache stream so slaves
|
||||
will know to invalidate their caches.
|
||||
|
||||
@@ -212,7 +254,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
keys,
|
||||
)
|
||||
|
||||
def _invalidate_cache_and_stream(self, txn, cache_func, keys):
|
||||
def _invalidate_cache_and_stream(
|
||||
self, txn: LoggingTransaction, cache_func: _CachedFunction, keys: Iterable[str]
|
||||
) -> None:
|
||||
"""Invalidates the cache and adds it to the cache stream so slaves
|
||||
will know to invalidate their caches.
|
||||
|
||||
@@ -223,7 +267,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
txn.call_after(cache_func.invalidate, keys)
|
||||
self._send_invalidation_to_replication(txn, cache_func.__name__, keys)
|
||||
|
||||
def _invalidate_all_cache_and_stream(self, txn, cache_func):
|
||||
def _invalidate_all_cache_and_stream(
|
||||
self, txn: LoggingTransaction, cache_func: _CachedFunction
|
||||
) -> None:
|
||||
"""Invalidates the entire cache and adds it to the cache stream so slaves
|
||||
will know to invalidate their caches.
|
||||
"""
|
||||
@@ -231,7 +277,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
txn.call_after(cache_func.invalidate_all)
|
||||
self._send_invalidation_to_replication(txn, cache_func.__name__, None)
|
||||
|
||||
def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed):
|
||||
def _invalidate_state_caches_and_stream(
|
||||
self, txn: LoggingTransaction, room_id: str, members_changed: Collection[str]
|
||||
) -> None:
|
||||
"""Special case invalidation of caches based on current state.
|
||||
|
||||
We special case this so that we can batch the cache invalidations into a
|
||||
@@ -239,8 +287,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
|
||||
Args:
|
||||
txn
|
||||
room_id (str): Room where state changed
|
||||
members_changed (iterable[str]): The user_ids of members that have changed
|
||||
room_id: Room where state changed
|
||||
members_changed: The user_ids of members that have changed
|
||||
"""
|
||||
txn.call_after(self._invalidate_state_caches, room_id, members_changed)
|
||||
|
||||
@@ -262,8 +310,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
)
|
||||
|
||||
def _send_invalidation_to_replication(
|
||||
self, txn, cache_name: str, keys: Optional[Iterable[Any]]
|
||||
):
|
||||
self, txn: LoggingTransaction, cache_name: str, keys: Optional[Iterable[str]]
|
||||
) -> None:
|
||||
"""Notifies replication that given cache has been invalidated.
|
||||
|
||||
Note that this does *not* invalidate the cache locally.
|
||||
@@ -284,6 +332,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
# the transaction. However, we want to only get an ID when we want
|
||||
# to use it, here, so we need to call __enter__ manually, and have
|
||||
# __exit__ called after the transaction finishes.
|
||||
assert self._cache_id_gen is not None
|
||||
stream_id = self._cache_id_gen.get_next_txn(txn)
|
||||
txn.call_after(self.hs.get_notifier().on_new_replication_data)
|
||||
|
||||
@@ -298,7 +347,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
"instance_name": self._instance_name,
|
||||
"cache_func": cache_name,
|
||||
"keys": keys,
|
||||
"invalidation_ts": self.clock.time_msec(),
|
||||
"invalidation_ts": self._clock.time_msec(),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@@ -17,10 +17,8 @@ from typing import TYPE_CHECKING, Optional
|
||||
|
||||
from synapse.events.utils import prune_event_dict
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import DatabasePool, LoggingTransaction
|
||||
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
|
||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||
from synapse.util import json_encoder
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -30,7 +28,7 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBaseStore):
|
||||
class CensorEventsStore(CacheInvalidationWorkerStore):
|
||||
def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
|
||||
super().__init__(database, db_conn, hs)
|
||||
|
||||
|
||||
@@ -18,15 +18,11 @@ from typing import Any, Dict, List
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.storage.database import LoggingTransaction
|
||||
from synapse.storage.databases.main import CacheInvalidationWorkerStore
|
||||
from synapse.storage.databases.main.event_federation import EventFederationWorkerStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EventForwardExtremitiesStore(
|
||||
EventFederationWorkerStore,
|
||||
CacheInvalidationWorkerStore,
|
||||
):
|
||||
class EventForwardExtremitiesStore(CacheInvalidationWorkerStore):
|
||||
async def delete_forward_extremities_for_room(self, room_id: str) -> int:
|
||||
"""Delete any extra forward extremities for a room.
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ from typing import TYPE_CHECKING, Dict, List, Tuple, Union
|
||||
from synapse.api.errors import NotFoundError, StoreError
|
||||
from synapse.push.baserules import list_with_base_rules
|
||||
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage._base import db_to_json
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main.appservice import ApplicationServiceWorkerStore
|
||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||
@@ -71,7 +71,6 @@ class PushRulesWorkerStore(
|
||||
PusherWorkerStore,
|
||||
RoomMemberWorkerStore,
|
||||
EventsWorkerStore,
|
||||
SQLBaseStore,
|
||||
metaclass=abc.ABCMeta,
|
||||
):
|
||||
"""This is an abstract base class where subclasses must implement
|
||||
|
||||
@@ -1877,7 +1877,7 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
|
||||
return res if res else False
|
||||
|
||||
|
||||
class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
|
||||
class RegistrationStore(RegistrationBackgroundUpdateStore, StatsStore):
|
||||
def __init__(
|
||||
self,
|
||||
database: DatabasePool,
|
||||
|
||||
@@ -100,7 +100,6 @@ class StatsStore(StateDeltasStore):
|
||||
super().__init__(database, db_conn, hs)
|
||||
|
||||
self.server_name = hs.hostname
|
||||
self.clock = self.hs.get_clock()
|
||||
self.stats_enabled = hs.config.stats.stats_enabled
|
||||
|
||||
self.stats_delta_processing_lock = DeferredLock()
|
||||
@@ -601,7 +600,7 @@ class StatsStore(StateDeltasStore):
|
||||
local_users_in_room = [u for u in users_in_room if self.hs.is_mine_id(u)]
|
||||
|
||||
await self.update_stats_delta(
|
||||
ts=self.clock.time_msec(),
|
||||
ts=self._clock.time_msec(),
|
||||
stats_type="room",
|
||||
stats_id=room_id,
|
||||
fields={},
|
||||
@@ -638,7 +637,7 @@ class StatsStore(StateDeltasStore):
|
||||
)
|
||||
|
||||
await self.update_stats_delta(
|
||||
ts=self.clock.time_msec(),
|
||||
ts=self._clock.time_msec(),
|
||||
stats_type="user",
|
||||
stats_id=user_id,
|
||||
fields={},
|
||||
|
||||
@@ -146,7 +146,9 @@ async def filter_events_for_client(
|
||||
max_lifetime = retention_policy.get("max_lifetime")
|
||||
|
||||
if max_lifetime is not None:
|
||||
oldest_allowed_ts = storage.main.clock.time_msec() - max_lifetime
|
||||
# TODO: reveal_type(storage.main) yields Any. Can we find a way of
|
||||
# telling mypy that storage.main is a generic `DataStoreT`?
|
||||
oldest_allowed_ts = storage.main._clock.time_msec() - max_lifetime
|
||||
|
||||
if event.origin_server_ts < oldest_allowed_ts:
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user