mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-09 01:30:18 +00:00
Compare commits
3 Commits
erikj/calc
...
erikj/move
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
83bc79d2cc | ||
|
|
47d7f2035d | ||
|
|
883a5306e1 |
1
changelog.d/9483.misc
Normal file
1
changelog.d/9483.misc
Normal file
@@ -0,0 +1 @@
|
||||
Refactor to make `SQLBaseStore` to be specific to the main database.
|
||||
1
mypy.ini
1
mypy.ini
@@ -45,6 +45,7 @@ files =
|
||||
synapse/storage/__init__.py,
|
||||
synapse/storage/_base.py,
|
||||
synapse/storage/background_updates.py,
|
||||
synapse/storage/databases/main/_base.py,
|
||||
synapse/storage/databases/main/appservice.py,
|
||||
synapse/storage/databases/main/events.py,
|
||||
synapse/storage/databases/main/keys.py,
|
||||
|
||||
@@ -8,7 +8,7 @@ from synapse.crypto.event_signing import (
|
||||
compute_pdu_event_reference_hash,
|
||||
)
|
||||
from synapse.federation.units import Pdu
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.pdu import PduStore
|
||||
from synapse.storage.signatures import SignatureStore
|
||||
|
||||
|
||||
@@ -15,90 +15,15 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
import random
|
||||
from abc import ABCMeta
|
||||
from typing import TYPE_CHECKING, Any, Iterable, Optional, Union
|
||||
from typing import Any, Union
|
||||
|
||||
from synapse.storage.database import LoggingTransaction # noqa: F401
|
||||
from synapse.storage.database import make_in_list_sql_clause # noqa: F401
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.types import Connection
|
||||
from synapse.types import Collection, StreamToken, get_domain_from_id
|
||||
from synapse.util import json_decoder
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.app.homeserver import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# some of our subclasses have abstract methods, so we use the ABCMeta metaclass.
|
||||
class SQLBaseStore(metaclass=ABCMeta):
|
||||
"""Base class for data stores that holds helper functions.
|
||||
|
||||
Note that multiple instances of this class will exist as there will be one
|
||||
per data store (and not one per physical database).
|
||||
"""
|
||||
|
||||
def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self._clock = hs.get_clock()
|
||||
self.database_engine = database.engine
|
||||
self.db_pool = database
|
||||
self.rand = random.SystemRandom()
|
||||
|
||||
def process_replication_rows(
|
||||
self,
|
||||
stream_name: str,
|
||||
instance_name: str,
|
||||
token: StreamToken,
|
||||
rows: Iterable[Any],
|
||||
) -> None:
|
||||
pass
|
||||
|
||||
def _invalidate_state_caches(
|
||||
self, room_id: str, members_changed: Iterable[str]
|
||||
) -> None:
|
||||
"""Invalidates caches that are based on the current state, but does
|
||||
not stream invalidations down replication.
|
||||
|
||||
Args:
|
||||
room_id: Room where state changed
|
||||
members_changed: The user_ids of members that have changed
|
||||
"""
|
||||
for host in {get_domain_from_id(u) for u in members_changed}:
|
||||
self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
|
||||
|
||||
self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,))
|
||||
|
||||
def _attempt_to_invalidate_cache(
|
||||
self, cache_name: str, key: Optional[Collection[Any]]
|
||||
) -> None:
|
||||
"""Attempts to invalidate the cache of the given name, ignoring if the
|
||||
cache doesn't exist. Mainly used for invalidating caches on workers,
|
||||
where they may not have the cache.
|
||||
|
||||
Args:
|
||||
cache_name
|
||||
key: Entry to invalidate. If None then invalidates the entire
|
||||
cache.
|
||||
"""
|
||||
|
||||
try:
|
||||
cache = getattr(self, cache_name)
|
||||
except AttributeError:
|
||||
# We probably haven't pulled in the cache in this worker,
|
||||
# which is fine.
|
||||
return
|
||||
|
||||
if key is None:
|
||||
cache.invalidate_all()
|
||||
else:
|
||||
cache.invalidate(tuple(key))
|
||||
|
||||
|
||||
def db_to_json(db_content: Union[memoryview, bytes, bytearray, str]) -> Any:
|
||||
"""
|
||||
Take some data from a database row and return a JSON-decoded object.
|
||||
@@ -122,5 +47,5 @@ def db_to_json(db_content: Union[memoryview, bytes, bytearray, str]) -> Any:
|
||||
try:
|
||||
return json_decoder.decode(db_content)
|
||||
except Exception:
|
||||
logging.warning("Tried to decode '%r' as JSON and failed", db_content)
|
||||
logger.warning("Tried to decode '%r' as JSON and failed", db_content)
|
||||
raise
|
||||
|
||||
96
synapse/storage/databases/main/_base.py
Normal file
96
synapse/storage/databases/main/_base.py
Normal file
@@ -0,0 +1,96 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2017-2018 New Vector Ltd
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
import random
|
||||
from abc import ABCMeta
|
||||
from typing import TYPE_CHECKING, Any, Iterable, Optional
|
||||
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.types import Connection
|
||||
from synapse.types import Collection, StreamToken, get_domain_from_id
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.app.homeserver import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# some of our subclasses have abstract methods, so we use the ABCMeta metaclass.
|
||||
class SQLBaseStore(metaclass=ABCMeta):
|
||||
"""Base class for data stores that holds helper functions.
|
||||
|
||||
Note that multiple instances of this class will exist as there will be one
|
||||
per data store (and not one per physical database).
|
||||
"""
|
||||
|
||||
def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self._clock = hs.get_clock()
|
||||
self.database_engine = database.engine
|
||||
self.db_pool = database
|
||||
self.rand = random.SystemRandom()
|
||||
|
||||
def process_replication_rows(
|
||||
self,
|
||||
stream_name: str,
|
||||
instance_name: str,
|
||||
token: StreamToken,
|
||||
rows: Iterable[Any],
|
||||
) -> None:
|
||||
pass
|
||||
|
||||
def _invalidate_state_caches(
|
||||
self, room_id: str, members_changed: Iterable[str]
|
||||
) -> None:
|
||||
"""Invalidates caches that are based on the current state, but does
|
||||
not stream invalidations down replication.
|
||||
|
||||
Args:
|
||||
room_id: Room where state changed
|
||||
members_changed: The user_ids of members that have changed
|
||||
"""
|
||||
for host in {get_domain_from_id(u) for u in members_changed}:
|
||||
self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
|
||||
|
||||
self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,))
|
||||
|
||||
def _attempt_to_invalidate_cache(
|
||||
self, cache_name: str, key: Optional[Collection[Any]]
|
||||
) -> None:
|
||||
"""Attempts to invalidate the cache of the given name, ignoring if the
|
||||
cache doesn't exist. Mainly used for invalidating caches on workers,
|
||||
where they may not have the cache.
|
||||
|
||||
Args:
|
||||
cache_name
|
||||
key: Entry to invalidate. If None then invalidates the entire
|
||||
cache.
|
||||
"""
|
||||
|
||||
try:
|
||||
cache = getattr(self, cache_name)
|
||||
except AttributeError:
|
||||
# We probably haven't pulled in the cache in this worker,
|
||||
# which is fine.
|
||||
return
|
||||
|
||||
if key is None:
|
||||
cache.invalidate_all()
|
||||
else:
|
||||
cache.invalidate(tuple(key))
|
||||
@@ -20,8 +20,9 @@ from typing import Dict, List, Optional, Set, Tuple
|
||||
from synapse.api.constants import AccountDataTypes
|
||||
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
|
||||
from synapse.replication.tcp.streams import AccountDataStream, TagAccountDataStream
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage._base import db_to_json
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
|
||||
from synapse.types import JsonDict
|
||||
|
||||
@@ -24,8 +24,9 @@ from synapse.appservice import (
|
||||
)
|
||||
from synapse.config.appservice import load_appservices
|
||||
from synapse.events import EventBase
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage._base import db_to_json
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.types import Connection
|
||||
from synapse.types import JsonDict
|
||||
|
||||
@@ -25,8 +25,8 @@ from synapse.replication.tcp.streams.events import (
|
||||
EventsStreamCurrentStateRow,
|
||||
EventsStreamEventRow,
|
||||
)
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.util.iterutils import batch_iter
|
||||
|
||||
|
||||
@@ -18,8 +18,8 @@ from typing import TYPE_CHECKING
|
||||
|
||||
from synapse.events.utils import prune_event_dict
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
|
||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||
from synapse.util import json_encoder
|
||||
|
||||
@@ -17,8 +17,8 @@ import logging
|
||||
from typing import Dict, List, Optional, Tuple, Union
|
||||
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import DatabasePool, make_tuple_comparison_clause
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.types import UserID
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
|
||||
|
||||
@@ -18,8 +18,9 @@ from typing import List, Tuple
|
||||
|
||||
from synapse.logging.opentracing import log_kv, set_tag, trace
|
||||
from synapse.replication.tcp.streams import ToDeviceStream
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage._base import db_to_json
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
|
||||
from synapse.util import json_encoder
|
||||
|
||||
@@ -26,12 +26,13 @@ from synapse.logging.opentracing import (
|
||||
whitelisted_homeserver,
|
||||
)
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
|
||||
from synapse.storage._base import db_to_json, make_in_list_sql_clause
|
||||
from synapse.storage.database import (
|
||||
DatabasePool,
|
||||
LoggingTransaction,
|
||||
make_tuple_comparison_clause,
|
||||
)
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.types import Collection, JsonDict, get_verify_key_from_cross_signing_key
|
||||
from synapse.util import json_decoder, json_encoder
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
|
||||
@@ -17,7 +17,7 @@ from collections import namedtuple
|
||||
from typing import Iterable, List, Optional
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.types import RoomAlias
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
|
||||
@@ -18,7 +18,8 @@ from typing import Optional
|
||||
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.logging.opentracing import log_kv, trace
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage._base import db_to_json
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.util import json_encoder
|
||||
|
||||
|
||||
|
||||
@@ -23,8 +23,9 @@ from canonicaljson import encode_canonical_json
|
||||
from twisted.enterprise.adbapi import Connection
|
||||
|
||||
from synapse.logging.opentracing import log_kv, set_tag, trace
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage._base import db_to_json
|
||||
from synapse.storage.database import DatabasePool, make_in_list_sql_clause
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.types import JsonDict
|
||||
|
||||
@@ -20,8 +20,9 @@ from typing import Dict, Iterable, List, Set, Tuple
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.events import EventBase
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
|
||||
from synapse.storage._base import make_in_list_sql_clause
|
||||
from synapse.storage.database import DatabasePool, LoggingTransaction
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.databases.main.signatures import SignatureWorkerStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
|
||||
@@ -19,8 +19,9 @@ from typing import Dict, List, Optional, Tuple, Union
|
||||
import attr
|
||||
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage._base import db_to_json
|
||||
from synapse.storage.database import DatabasePool, LoggingTransaction
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.util import json_encoder
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
|
||||
@@ -21,8 +21,9 @@ import attr
|
||||
from synapse.api.constants import EventContentFields
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
from synapse.events import make_event_from_dict
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
|
||||
from synapse.storage._base import db_to_json, make_in_list_sql_clause
|
||||
from synapse.storage.database import DatabasePool, make_tuple_comparison_clause
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.databases.main.events import PersistEventsStore
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.types import JsonDict
|
||||
|
||||
@@ -17,7 +17,7 @@ import logging
|
||||
from typing import Dict, List
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -41,8 +41,9 @@ from synapse.metrics.background_process_metrics import (
|
||||
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
|
||||
from synapse.replication.tcp.streams import BackfillStream
|
||||
from synapse.replication.tcp.streams.events import EventsStream
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
|
||||
from synapse.storage._base import db_to_json, make_in_list_sql_clause
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
|
||||
from synapse.storage.util.sequence import build_sequence_generator
|
||||
|
||||
@@ -16,7 +16,8 @@
|
||||
from canonicaljson import encode_canonical_json
|
||||
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage._base import db_to_json
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
|
||||
@@ -19,7 +19,8 @@ from typing import Any, Dict, List, Optional, Tuple
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage._base import db_to_json
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import json_encoder
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ from typing import Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
from signedjson.key import decode_verify_key_bytes
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.keys import FetchKeyResult
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
|
||||
@@ -16,8 +16,8 @@
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
|
||||
BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD = (
|
||||
"media_repository_drop_index_wo_method"
|
||||
|
||||
@@ -19,8 +19,8 @@ from typing import Dict
|
||||
|
||||
from synapse.metrics import GaugeBucketCollector
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.databases.main.event_push_actions import (
|
||||
EventPushActionsWorkerStore,
|
||||
)
|
||||
|
||||
@@ -16,8 +16,8 @@ import logging
|
||||
from typing import Dict, List
|
||||
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import DatabasePool, make_in_list_sql_clause
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from typing import Optional
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
|
||||
|
||||
class OpenIdStore(SQLBaseStore):
|
||||
|
||||
@@ -16,7 +16,8 @@
|
||||
from typing import List, Tuple
|
||||
|
||||
from synapse.api.presence import UserPresenceState
|
||||
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
|
||||
from synapse.storage._base import make_in_list_sql_clause
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
from synapse.util.iterutils import batch_iter
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.databases.main.roommember import ProfileInfo
|
||||
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ import logging
|
||||
from typing import Any, List, Set, Tuple
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.databases.main.state import StateGroupWorkerStore
|
||||
from synapse.types import RoomStreamToken
|
||||
|
||||
|
||||
@@ -20,8 +20,9 @@ from typing import List, Tuple, Union
|
||||
from synapse.api.errors import NotFoundError, StoreError
|
||||
from synapse.push.baserules import list_with_base_rules
|
||||
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage._base import db_to_json
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.databases.main.appservice import ApplicationServiceWorkerStore
|
||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.databases.main.pusher import PusherWorkerStore
|
||||
|
||||
@@ -18,8 +18,9 @@ import logging
|
||||
from typing import TYPE_CHECKING, Any, Dict, Iterable, Iterator, List, Optional, Tuple
|
||||
|
||||
from synapse.push import PusherConfig, ThrottleParams
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage._base import db_to_json
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.types import Connection
|
||||
from synapse.storage.util.id_generators import StreamIdGenerator
|
||||
from synapse.types import JsonDict
|
||||
|
||||
@@ -21,8 +21,9 @@ from twisted.internet import defer
|
||||
|
||||
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
|
||||
from synapse.replication.tcp.streams import ReceiptsStream
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
|
||||
from synapse.storage._base import db_to_json, make_in_list_sql_clause
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
|
||||
from synapse.types import JsonDict
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ import attr
|
||||
|
||||
from synapse.api.constants import RelationTypes
|
||||
from synapse.events import EventBase
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.databases.main.stream import generate_pagination_where_clause
|
||||
from synapse.storage.relations import (
|
||||
AggregationPaginationToken,
|
||||
|
||||
@@ -23,8 +23,9 @@ from typing import Any, Dict, List, Optional, Tuple
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.api.room_versions import RoomVersion, RoomVersions
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage._base import db_to_json
|
||||
from synapse.storage.database import DatabasePool, LoggingTransaction
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.databases.main.search import SearchStore
|
||||
from synapse.types import JsonDict, ThirdPartyInstanceID
|
||||
from synapse.util import json_encoder
|
||||
|
||||
@@ -24,8 +24,9 @@ from synapse.metrics.background_process_metrics import (
|
||||
run_as_background_process,
|
||||
wrap_as_background_process,
|
||||
)
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
|
||||
from synapse.storage._base import db_to_json, make_in_list_sql_clause
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.engines import Sqlite3Engine
|
||||
from synapse.storage.roommember import (
|
||||
|
||||
@@ -20,8 +20,9 @@ from typing import List, Optional, Set
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.events import EventBase
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
|
||||
from synapse.storage._base import db_to_json, make_in_list_sql_clause
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
||||
from synapse.types import Collection
|
||||
|
||||
@@ -17,7 +17,7 @@ from typing import Dict, Iterable, List, Tuple
|
||||
|
||||
from unpaddedbase64 import encode_base64
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
|
||||
|
||||
@@ -22,8 +22,8 @@ from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
|
||||
from synapse.events import EventBase
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
|
||||
from synapse.storage.state import StateFilter
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
import logging
|
||||
from typing import Any, Dict, List, Tuple
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -45,12 +45,12 @@ from twisted.internet import defer
|
||||
from synapse.api.filtering import Filter
|
||||
from synapse.events import EventBase
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import (
|
||||
DatabasePool,
|
||||
LoggingTransaction,
|
||||
make_in_list_sql_clause,
|
||||
)
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
|
||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
||||
|
||||
@@ -20,8 +20,9 @@ from typing import Iterable, List, Optional, Tuple
|
||||
from canonicaljson import encode_canonical_json
|
||||
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage._base import db_to_json
|
||||
from synapse.storage.database import DatabasePool, LoggingTransaction
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
|
||||
@@ -17,8 +17,9 @@ from typing import Any, Dict, List, Optional, Tuple, Union
|
||||
import attr
|
||||
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage._base import db_to_json
|
||||
from synapse.storage.database import LoggingTransaction
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import json_encoder, stringutils
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
|
||||
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
|
||||
import logging
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.state import StateFilter
|
||||
@@ -26,17 +25,20 @@ logger = logging.getLogger(__name__)
|
||||
MAX_STATE_DELTA_HOPS = 100
|
||||
|
||||
|
||||
class StateGroupBackgroundUpdateStore(SQLBaseStore):
|
||||
class StateGroupBackgroundUpdateStore:
|
||||
"""Defines functions related to state groups needed to run the state background
|
||||
updates.
|
||||
"""
|
||||
|
||||
def __init__(self, database: DatabasePool, db_conn, hs):
|
||||
self.db_pool = database
|
||||
|
||||
def _count_state_group_hops_txn(self, txn, state_group):
|
||||
"""Given a state group, count how many hops there are in the tree.
|
||||
|
||||
This is used to ensure the delta chains don't get too long.
|
||||
"""
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
if isinstance(self.db_pool.engine, PostgresEngine):
|
||||
sql = """
|
||||
WITH RECURSIVE state(state_group) AS (
|
||||
VALUES(?::bigint)
|
||||
@@ -84,7 +86,7 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
|
||||
if where_clause:
|
||||
where_clause = " AND (%s)" % (where_clause,)
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
if isinstance(self.db_pool.engine, PostgresEngine):
|
||||
# Temporarily disable sequential scans in this transaction. This is
|
||||
# a temporary hack until we can add the right indices in
|
||||
txn.execute("SET LOCAL enable_seqscan=off")
|
||||
@@ -341,7 +343,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
|
||||
async def _background_index_state(self, progress, batch_size):
|
||||
def reindex_txn(conn):
|
||||
conn.rollback()
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
if isinstance(self.db_pool.engine, PostgresEngine):
|
||||
# postgres insists on autocommit for the index
|
||||
conn.set_session(autocommit=True)
|
||||
try:
|
||||
|
||||
@@ -18,7 +18,6 @@ from collections import namedtuple
|
||||
from typing import Dict, Iterable, List, Set, Tuple
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStore
|
||||
from synapse.storage.state import StateFilter
|
||||
@@ -47,7 +46,7 @@ class _GetStateGroupDelta(
|
||||
return len(self.delta_ids) if self.delta_ids else 0
|
||||
|
||||
|
||||
class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
class StateGroupDataStore(StateBackgroundUpdateStore):
|
||||
"""A data store for fetching/storing state groups."""
|
||||
|
||||
def __init__(self, database: DatabasePool, db_conn, hs):
|
||||
@@ -98,7 +97,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
|
||||
self._state_group_seq_gen = build_sequence_generator(
|
||||
db_conn,
|
||||
self.database_engine,
|
||||
self.db_pool.engine,
|
||||
get_max_state_group_txn,
|
||||
"state_group_id_seq",
|
||||
table="state_groups",
|
||||
|
||||
@@ -20,8 +20,8 @@ from mock import Mock
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main._base import SQLBaseStore
|
||||
from synapse.storage.engines import create_engine
|
||||
|
||||
from tests import unittest
|
||||
|
||||
Reference in New Issue
Block a user