Compare commits

...

12 Commits

Author SHA1 Message Date
Erik Johnston
6b2d6fdd33 Add some debugging 2020-05-15 15:33:11 +01:00
Erik Johnston
d263a4de02 Enable moving event persistence off of master 2020-05-14 17:25:42 +01:00
Erik Johnston
66c1dff3ba Use new writers config 2020-05-14 17:25:41 +01:00
Erik Johnston
96b6023e3b Make location of events writer configurable 2020-05-14 17:25:09 +01:00
Erik Johnston
452019064c Allow ReplicationRestResource to be added to workers 2020-05-14 17:25:09 +01:00
Erik Johnston
7c8e09bcf1 Add a worker store for search insertion 2020-05-14 17:25:09 +01:00
Erik Johnston
e7f5ac4ed8 Fix lint 2020-05-14 17:09:58 +01:00
Erik Johnston
208ab7b135 Fix typing and add assertion. 2020-05-14 17:09:58 +01:00
Erik Johnston
41f558ccf7 Newsfile 2020-05-14 17:09:58 +01:00
Erik Johnston
342796d6ac Move push rules ID gen to push rules worker 2020-05-14 17:09:58 +01:00
Erik Johnston
bc3fc3927f Move events ID gens to EventWorkerStore 2020-05-14 17:09:58 +01:00
Erik Johnston
d67a8b5455 Move repliction event stream handling out of slave store 2020-05-14 17:09:58 +01:00
19 changed files with 254 additions and 183 deletions

1
changelog.d/7491.misc Normal file
View File

@@ -0,0 +1 @@
Move event stream handling out of slave store.

View File

@@ -47,6 +47,7 @@ from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
@@ -122,6 +123,7 @@ from synapse.storage.data_stores.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore, MonthlyActiveUsersWorkerStore,
) )
from synapse.storage.data_stores.main.presence import UserPresenceState from synapse.storage.data_stores.main.presence import UserPresenceState
from synapse.storage.data_stores.main.search import SearchWorkerStore
from synapse.storage.data_stores.main.ui_auth import UIAuthWorkerStore from synapse.storage.data_stores.main.ui_auth import UIAuthWorkerStore
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
from synapse.types import ReadReceipt from synapse.types import ReadReceipt
@@ -451,6 +453,7 @@ class GenericWorkerSlavedStore(
SlavedFilteringStore, SlavedFilteringStore,
MonthlyActiveUsersWorkerStore, MonthlyActiveUsersWorkerStore,
MediaRepositoryStore, MediaRepositoryStore,
SearchWorkerStore,
BaseSlavedStore, BaseSlavedStore,
): ):
def __init__(self, database, db_conn, hs): def __init__(self, database, db_conn, hs):
@@ -568,6 +571,9 @@ class GenericWorkerServer(HomeServer):
if name in ["keys", "federation"]: if name in ["keys", "federation"]:
resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self) resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self)
if name == "replication":
resources[REPLICATION_PREFIX] = ReplicationRestResource(self)
root_resource = create_resource_tree(resources, NoResource()) root_resource = create_resource_tree(resources, NoResource())
_base.listen_tcp( _base.listen_tcp(

View File

@@ -257,5 +257,6 @@ def setup_logging(
logging.warning("***** STARTING SERVER *****") logging.warning("***** STARTING SERVER *****")
logging.warning("Server %s version %s", sys.argv[0], get_version_string(synapse)) logging.warning("Server %s version %s", sys.argv[0], get_version_string(synapse))
logging.info("Server hostname: %s", config.server_name) logging.info("Server hostname: %s", config.server_name)
logging.info("Instance name: %s", hs.get_instance_name())
return logger return logger

View File

@@ -27,6 +27,17 @@ class InstanceLocationConfig:
port = attr.ib(type=int) port = attr.ib(type=int)
@attr.s
class WriterLocations:
"""Specifies the instances that write various streams.
Attributes:
events: The instance that writes to the event and backfill streams.
"""
events = attr.ib(default="master", type=str)
class WorkerConfig(Config): class WorkerConfig(Config):
"""The workers are processes run separately to the main synapse process. """The workers are processes run separately to the main synapse process.
They have their own pid_file and listener configuration. They use the They have their own pid_file and listener configuration. They use the
@@ -88,6 +99,10 @@ class WorkerConfig(Config):
name: InstanceLocationConfig(**c) for name, c in instance_map.items() name: InstanceLocationConfig(**c) for name, c in instance_map.items()
} }
# Map from type of streams to source, c.f. WriterLocations.
writers = config.get("writers", {}) or {}
self.writers = WriterLocations(**writers)
def read_arguments(self, args): def read_arguments(self, args):
# We support a bunch of command line arguments that override options in # We support a bunch of command line arguments that override options in
# the config. A lot of these options have a worker_* prefix when running # the config. A lot of these options have a worker_* prefix when running

View File

@@ -125,10 +125,9 @@ class FederationHandler(BaseHandler):
self._server_notices_mxid = hs.config.server_notices_mxid self._server_notices_mxid = hs.config.server_notices_mxid
self.config = hs.config self.config = hs.config
self.http_client = hs.get_simple_http_client() self.http_client = hs.get_simple_http_client()
self._instance_name = hs.get_instance_name()
self._send_events_to_master = ReplicationFederationSendEventsRestServlet.make_client( self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
hs
)
self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client( self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client(
hs hs
) )
@@ -2837,8 +2836,9 @@ class FederationHandler(BaseHandler):
backfilled: Whether these events are a result of backfilled: Whether these events are a result of
backfilling or not backfilling or not
""" """
if self.config.worker_app: if self.config.worker.writers.events != self._instance_name:
await self._send_events_to_master( await self._send_events(
instance_name=self.config.worker.writers.events,
store=self.store, store=self.store,
event_and_contexts=event_and_contexts, event_and_contexts=event_and_contexts,
backfilled=backfilled, backfilled=backfilled,

View File

@@ -365,10 +365,11 @@ class EventCreationHandler(object):
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
self.config = hs.config self.config = hs.config
self.require_membership_for_aliases = hs.config.require_membership_for_aliases self.require_membership_for_aliases = hs.config.require_membership_for_aliases
self._instance_name = hs.get_instance_name()
self.room_invite_state_types = self.hs.config.room_invite_state_types self.room_invite_state_types = self.hs.config.room_invite_state_types
self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs) self.send_event = ReplicationSendEventRestServlet.make_client(hs)
# This is only used to get at ratelimit function, and maybe_kick_guest_users # This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs) self.base_handler = BaseHandler(hs)
@@ -822,8 +823,9 @@ class EventCreationHandler(object):
success = False success = False
try: try:
# If we're a worker we need to hit out to the master. # If we're a worker we need to hit out to the master.
if self.config.worker_app: if self.config.worker.writers.events != self._instance_name:
await self.send_event_to_master( await self.send_event(
instance_name=self.config.worker.writers.events,
event_id=event.event_id, event_id=event.event_id,
store=self.store, store=self.store,
requester=requester, requester=requester,
@@ -888,7 +890,7 @@ class EventCreationHandler(object):
This should only be run on master. This should only be run on master.
""" """
assert not self.config.worker_app assert self.config.worker.writers.events == self._instance_name
if ratelimit: if ratelimit:
# We check if this is a room admin redacting an event so that we # We check if this is a room admin redacting an event so that we

View File

@@ -34,9 +34,11 @@ class ReplicationRestResource(JsonResource):
def register_servlets(self, hs): def register_servlets(self, hs):
send_event.register_servlets(hs, self) send_event.register_servlets(hs, self)
membership.register_servlets(hs, self)
federation.register_servlets(hs, self) federation.register_servlets(hs, self)
login.register_servlets(hs, self)
register.register_servlets(hs, self) if hs.config.worker.worker_app is None:
devices.register_servlets(hs, self) membership.register_servlets(hs, self)
streams.register_servlets(hs, self) login.register_servlets(hs, self)
register.register_servlets(hs, self)
devices.register_servlets(hs, self)
streams.register_servlets(hs, self)

View File

@@ -142,6 +142,7 @@ class ReplicationEndpoint(object):
""" """
clock = hs.get_clock() clock = hs.get_clock()
client = hs.get_simple_http_client() client = hs.get_simple_http_client()
local_instance_name = hs.get_instance_name()
master_host = hs.config.worker_replication_host master_host = hs.config.worker_replication_host
master_port = hs.config.worker_replication_http_port master_port = hs.config.worker_replication_http_port
@@ -151,6 +152,8 @@ class ReplicationEndpoint(object):
@trace(opname="outgoing_replication_request") @trace(opname="outgoing_replication_request")
@defer.inlineCallbacks @defer.inlineCallbacks
def send_request(instance_name="master", **kwargs): def send_request(instance_name="master", **kwargs):
if instance_name == local_instance_name:
raise Exception("Trying to send HTTP request to self")
if instance_name == "master": if instance_name == "master":
host = master_host host = master_host
port = master_port port = master_port

View File

@@ -15,11 +15,6 @@
# limitations under the License. # limitations under the License.
import logging import logging
from synapse.api.constants import EventTypes
from synapse.replication.tcp.streams.events import (
EventsStreamCurrentStateRow,
EventsStreamEventRow,
)
from synapse.storage.data_stores.main.event_federation import EventFederationWorkerStore from synapse.storage.data_stores.main.event_federation import EventFederationWorkerStore
from synapse.storage.data_stores.main.event_push_actions import ( from synapse.storage.data_stores.main.event_push_actions import (
EventPushActionsWorkerStore, EventPushActionsWorkerStore,
@@ -35,7 +30,6 @@ from synapse.storage.database import Database
from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.caches.stream_change_cache import StreamChangeCache
from ._base import BaseSlavedStore from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -62,11 +56,6 @@ class SlavedEventStore(
BaseSlavedStore, BaseSlavedStore,
): ):
def __init__(self, database: Database, db_conn, hs): def __init__(self, database: Database, db_conn, hs):
self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
self._backfill_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering", step=-1
)
super(SlavedEventStore, self).__init__(database, db_conn, hs) super(SlavedEventStore, self).__init__(database, db_conn, hs)
events_max = self._stream_id_gen.get_current_token() events_max = self._stream_id_gen.get_current_token()
@@ -92,81 +81,3 @@ class SlavedEventStore(
def get_room_min_stream_ordering(self): def get_room_min_stream_ordering(self):
return self._backfill_id_gen.get_current_token() return self._backfill_id_gen.get_current_token()
def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == "events":
self._stream_id_gen.advance(token)
for row in rows:
self._process_event_stream_row(token, row)
elif stream_name == "backfill":
self._backfill_id_gen.advance(-token)
for row in rows:
self.invalidate_caches_for_event(
-token,
row.event_id,
row.room_id,
row.type,
row.state_key,
row.redacts,
row.relates_to,
backfilled=True,
)
return super().process_replication_rows(stream_name, instance_name, token, rows)
def _process_event_stream_row(self, token, row):
data = row.data
if row.type == EventsStreamEventRow.TypeId:
self.invalidate_caches_for_event(
token,
data.event_id,
data.room_id,
data.type,
data.state_key,
data.redacts,
data.relates_to,
backfilled=False,
)
elif row.type == EventsStreamCurrentStateRow.TypeId:
self._curr_state_delta_stream_cache.entity_has_changed(
row.data.room_id, token
)
if data.type == EventTypes.Member:
self.get_rooms_for_user_with_stream_ordering.invalidate(
(data.state_key,)
)
else:
raise Exception("Unknown events stream row type %s" % (row.type,))
def invalidate_caches_for_event(
self,
stream_ordering,
event_id,
room_id,
etype,
state_key,
redacts,
relates_to,
backfilled,
):
self._invalidate_get_event_cache(event_id)
self.get_latest_event_ids_in_room.invalidate((room_id,))
self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))
if not backfilled:
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
if redacts:
self._invalidate_get_event_cache(redacts)
if etype == EventTypes.Member:
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
self.get_invited_rooms_for_local_user.invalidate((state_key,))
if relates_to:
self.get_relations_for_event.invalidate_many((relates_to,))
self.get_aggregation_groups_for_event.invalidate_many((relates_to,))
self.get_applicable_edit.invalidate((relates_to,))

View File

@@ -15,19 +15,11 @@
# limitations under the License. # limitations under the License.
from synapse.storage.data_stores.main.push_rule import PushRulesWorkerStore from synapse.storage.data_stores.main.push_rule import PushRulesWorkerStore
from synapse.storage.database import Database
from ._slaved_id_tracker import SlavedIdTracker
from .events import SlavedEventStore from .events import SlavedEventStore
class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore): class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
def __init__(self, database: Database, db_conn, hs):
self._push_rules_stream_id_gen = SlavedIdTracker(
db_conn, "push_rules_stream", "stream_id"
)
super(SlavedPushRuleStore, self).__init__(database, db_conn, hs)
def get_push_rules_stream_token(self): def get_push_rules_stream_token(self):
return ( return (
self._push_rules_stream_id_gen.get_current_token(), self._push_rules_stream_id_gen.get_current_token(),

View File

@@ -38,7 +38,9 @@ from synapse.replication.tcp.commands import (
from synapse.replication.tcp.protocol import AbstractConnection from synapse.replication.tcp.protocol import AbstractConnection
from synapse.replication.tcp.streams import ( from synapse.replication.tcp.streams import (
STREAMS_MAP, STREAMS_MAP,
BackfillStream,
CachesStream, CachesStream,
EventsStream,
FederationStream, FederationStream,
Stream, Stream,
) )
@@ -87,6 +89,12 @@ class ReplicationCommandHandler:
self._streams_to_replicate.append(stream) self._streams_to_replicate.append(stream)
continue continue
if (
isinstance(stream, (EventsStream, BackfillStream))
and hs.config.worker.writers.events == hs.get_instance_name()
):
self._streams_to_replicate.append(stream)
# Only add any other streams if we're on master. # Only add any other streams if we're on master.
if hs.config.worker_app is not None: if hs.config.worker_app is not None:
continue continue

View File

@@ -66,9 +66,9 @@ class DataStores(object):
self.main = main_store_class(database, db_conn, hs) self.main = main_store_class(database, db_conn, hs)
# If we're on a process that can persist events (currently # If we're on a process that can persist events also
# master), also instantiate a `PersistEventsStore` # instansiate a `PersistEventsStore`
if hs.config.worker.worker_app is None: if hs.config.worker.writers.events == hs.get_instance_name():
self.persist_events = PersistEventsStore( self.persist_events = PersistEventsStore(
hs, database, self.main hs, database, self.main
) )

View File

@@ -24,7 +24,6 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import Database from synapse.storage.database import Database
from synapse.storage.engines import PostgresEngine from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import ( from synapse.storage.util.id_generators import (
ChainedIdGenerator,
IdGenerator, IdGenerator,
MultiWriterIdGenerator, MultiWriterIdGenerator,
StreamIdGenerator, StreamIdGenerator,
@@ -125,19 +124,6 @@ class DataStore(
self._clock = hs.get_clock() self._clock = hs.get_clock()
self.database_engine = database.engine self.database_engine = database.engine
self._stream_id_gen = StreamIdGenerator(
db_conn,
"events",
"stream_ordering",
extra_tables=[("local_invites", "stream_id")],
)
self._backfill_id_gen = StreamIdGenerator(
db_conn,
"events",
"stream_ordering",
step=-1,
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
)
self._presence_id_gen = StreamIdGenerator( self._presence_id_gen = StreamIdGenerator(
db_conn, "presence_stream", "stream_id" db_conn, "presence_stream", "stream_id"
) )
@@ -164,9 +150,6 @@ class DataStore(
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id") self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id") self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id") self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
self._push_rules_stream_id_gen = ChainedIdGenerator(
self._stream_id_gen, db_conn, "push_rules_stream", "stream_id"
)
self._pushers_id_gen = StreamIdGenerator( self._pushers_id_gen = StreamIdGenerator(
db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")] db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
) )

View File

@@ -16,8 +16,13 @@
import itertools import itertools
import logging import logging
from typing import Any, Iterable, Optional from typing import Any, Iterable, Optional, Tuple
from synapse.api.constants import EventTypes
from synapse.replication.tcp.streams.events import (
EventsStreamCurrentStateRow,
EventsStreamEventRow,
)
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore
from synapse.storage.database import Database from synapse.storage.database import Database
from synapse.storage.engines import PostgresEngine from synapse.storage.engines import PostgresEngine
@@ -66,7 +71,24 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
) )
def process_replication_rows(self, stream_name, instance_name, token, rows): def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == "caches": if stream_name == "events":
self._stream_id_gen.advance(token)
for row in rows:
self._process_event_stream_row(token, row)
elif stream_name == "backfill":
self._backfill_id_gen.advance(-token)
for row in rows:
self._invalidate_caches_for_event(
-token,
row.event_id,
row.room_id,
row.type,
row.state_key,
row.redacts,
row.relates_to,
backfilled=True,
)
elif stream_name == "caches":
if self._cache_id_gen: if self._cache_id_gen:
self._cache_id_gen.advance(instance_name, token) self._cache_id_gen.advance(instance_name, token)
@@ -85,6 +107,84 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
super().process_replication_rows(stream_name, instance_name, token, rows) super().process_replication_rows(stream_name, instance_name, token, rows)
def _process_event_stream_row(self, token, row):
data = row.data
if row.type == EventsStreamEventRow.TypeId:
self._invalidate_caches_for_event(
token,
data.event_id,
data.room_id,
data.type,
data.state_key,
data.redacts,
data.relates_to,
backfilled=False,
)
elif row.type == EventsStreamCurrentStateRow.TypeId:
self._curr_state_delta_stream_cache.entity_has_changed(
row.data.room_id, token
)
if data.type == EventTypes.Member:
self.get_rooms_for_user_with_stream_ordering.invalidate(
(data.state_key,)
)
else:
raise Exception("Unknown events stream row type %s" % (row.type,))
def _invalidate_caches_for_event(
self,
stream_ordering,
event_id,
room_id,
etype,
state_key,
redacts,
relates_to,
backfilled,
):
self._invalidate_get_event_cache(event_id)
self.get_latest_event_ids_in_room.invalidate((room_id,))
self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))
if not backfilled:
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
if redacts:
self._invalidate_get_event_cache(redacts)
if etype == EventTypes.Member:
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
self.get_invited_rooms_for_local_user.invalidate((state_key,))
if relates_to:
self.get_relations_for_event.invalidate_many((relates_to,))
self.get_aggregation_groups_for_event.invalidate_many((relates_to,))
self.get_applicable_edit.invalidate((relates_to,))
async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]):
"""Invalidates the cache and adds it to the cache stream so slaves
will know to invalidate their caches.
This should only be used to invalidate caches where slaves won't
otherwise know from other replication streams that the cache should
be invalidated.
"""
cache_func = getattr(self, cache_name, None)
if not cache_func:
return
cache_func.invalidate(keys)
await self.db.runInteraction(
"invalidate_cache_and_stream",
self._send_invalidation_to_replication,
cache_func.__name__,
keys,
)
def _invalidate_cache_and_stream(self, txn, cache_func, keys): def _invalidate_cache_and_stream(self, txn, cache_func, keys):
"""Invalidates the cache and adds it to the cache stream so slaves """Invalidates the cache and adds it to the cache stream so slaves
will know to invalidate their caches. will know to invalidate their caches.

View File

@@ -138,10 +138,10 @@ class PersistEventsStore:
self._backfill_id_gen = self.store._backfill_id_gen # type: StreamIdGenerator self._backfill_id_gen = self.store._backfill_id_gen # type: StreamIdGenerator
self._stream_id_gen = self.store._stream_id_gen # type: StreamIdGenerator self._stream_id_gen = self.store._stream_id_gen # type: StreamIdGenerator
# This should only exist on master for now # This should only exist on instances that are configured to write
assert ( assert (
hs.config.worker.worker_app is None hs.config.worker.writers.events == hs.get_instance_name()
), "Can only instantiate PersistEventsStore on master" ), "Can only instantiate EventsStore on master"
@_retry_on_integrity_error @_retry_on_integrity_error
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@@ -37,8 +37,10 @@ from synapse.events import make_event_from_dict
from synapse.events.utils import prune_event from synapse.events.utils import prune_event
from synapse.logging.context import PreserveLoggingContext, current_context from synapse.logging.context import PreserveLoggingContext, current_context
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage.database import Database from synapse.storage.database import Database
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import get_domain_from_id from synapse.types import get_domain_from_id
from synapse.util.caches.descriptors import Cache, cached, cachedInlineCallbacks from synapse.util.caches.descriptors import Cache, cached, cachedInlineCallbacks
from synapse.util.iterutils import batch_iter from synapse.util.iterutils import batch_iter
@@ -74,6 +76,26 @@ class EventsWorkerStore(SQLBaseStore):
def __init__(self, database: Database, db_conn, hs): def __init__(self, database: Database, db_conn, hs):
super(EventsWorkerStore, self).__init__(database, db_conn, hs) super(EventsWorkerStore, self).__init__(database, db_conn, hs)
if hs.config.worker.writers.events == hs.get_instance_name():
self._stream_id_gen = StreamIdGenerator(
db_conn,
"events",
"stream_ordering",
extra_tables=[("local_invites", "stream_id")],
)
self._backfill_id_gen = StreamIdGenerator(
db_conn,
"events",
"stream_ordering",
step=-1,
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
)
else:
self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
self._backfill_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering", step=-1
)
self._get_event_cache = Cache( self._get_event_cache = Cache(
"*getEvent*", "*getEvent*",
keylen=3, keylen=3,

View File

@@ -16,12 +16,14 @@
import abc import abc
import logging import logging
from typing import Union
from canonicaljson import json from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.push.baserules import list_with_base_rules from synapse.push.baserules import list_with_base_rules
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore
from synapse.storage.data_stores.main.appservice import ApplicationServiceWorkerStore from synapse.storage.data_stores.main.appservice import ApplicationServiceWorkerStore
from synapse.storage.data_stores.main.pusher import PusherWorkerStore from synapse.storage.data_stores.main.pusher import PusherWorkerStore
@@ -29,6 +31,7 @@ from synapse.storage.data_stores.main.receipts import ReceiptsWorkerStore
from synapse.storage.data_stores.main.roommember import RoomMemberWorkerStore from synapse.storage.data_stores.main.roommember import RoomMemberWorkerStore
from synapse.storage.database import Database from synapse.storage.database import Database
from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException
from synapse.storage.util.id_generators import ChainedIdGenerator
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -77,6 +80,15 @@ class PushRulesWorkerStore(
def __init__(self, database: Database, db_conn, hs): def __init__(self, database: Database, db_conn, hs):
super(PushRulesWorkerStore, self).__init__(database, db_conn, hs) super(PushRulesWorkerStore, self).__init__(database, db_conn, hs)
if hs.config.worker.worker_app is None:
self._push_rules_stream_id_gen = ChainedIdGenerator(
self._stream_id_gen, db_conn, "push_rules_stream", "stream_id"
) # type: Union[ChainedIdGenerator, SlavedIdTracker]
else:
self._push_rules_stream_id_gen = SlavedIdTracker(
db_conn, "push_rules_stream", "stream_id"
)
push_rules_prefill, push_rules_id = self.db.get_cache_dict( push_rules_prefill, push_rules_id = self.db.get_cache_dict(
db_conn, db_conn,
"push_rules_stream", "push_rules_stream",

View File

@@ -37,7 +37,55 @@ SearchEntry = namedtuple(
) )
class SearchBackgroundUpdateStore(SQLBaseStore): class SearchWorkerStore(SQLBaseStore):
def store_search_entries_txn(self, txn, entries):
"""Add entries to the search table
Args:
txn (cursor):
entries (iterable[SearchEntry]):
entries to be added to the table
"""
if not self.hs.config.enable_search:
return
if isinstance(self.database_engine, PostgresEngine):
sql = (
"INSERT INTO event_search"
" (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
" VALUES (?,?,?,to_tsvector('english', ?),?,?)"
)
args = (
(
entry.event_id,
entry.room_id,
entry.key,
entry.value,
entry.stream_ordering,
entry.origin_server_ts,
)
for entry in entries
)
txn.executemany(sql, args)
elif isinstance(self.database_engine, Sqlite3Engine):
sql = (
"INSERT INTO event_search (event_id, room_id, key, value)"
" VALUES (?,?,?,?)"
)
args = (
(entry.event_id, entry.room_id, entry.key, entry.value)
for entry in entries
)
txn.executemany(sql, args)
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")
class SearchBackgroundUpdateStore(SearchWorkerStore):
EVENT_SEARCH_UPDATE_NAME = "event_search" EVENT_SEARCH_UPDATE_NAME = "event_search"
EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order" EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order"
@@ -296,52 +344,6 @@ class SearchBackgroundUpdateStore(SQLBaseStore):
return num_rows return num_rows
def store_search_entries_txn(self, txn, entries):
"""Add entries to the search table
Args:
txn (cursor):
entries (iterable[SearchEntry]):
entries to be added to the table
"""
if not self.hs.config.enable_search:
return
if isinstance(self.database_engine, PostgresEngine):
sql = (
"INSERT INTO event_search"
" (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
" VALUES (?,?,?,to_tsvector('english', ?),?,?)"
)
args = (
(
entry.event_id,
entry.room_id,
entry.key,
entry.value,
entry.stream_ordering,
entry.origin_server_ts,
)
for entry in entries
)
txn.executemany(sql, args)
elif isinstance(self.database_engine, Sqlite3Engine):
sql = (
"INSERT INTO event_search (event_id, room_id, key, value)"
" VALUES (?,?,?,?)"
)
args = (
(entry.event_id, entry.room_id, entry.key, entry.value)
for entry in entries
)
txn.executemany(sql, args)
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")
class SearchStore(SearchBackgroundUpdateStore): class SearchStore(SearchBackgroundUpdateStore):
def __init__(self, database: Database, db_conn, hs): def __init__(self, database: Database, db_conn, hs):

View File

@@ -166,6 +166,7 @@ class ChainedIdGenerator(object):
def __init__(self, chained_generator, db_conn, table, column): def __init__(self, chained_generator, db_conn, table, column):
self.chained_generator = chained_generator self.chained_generator = chained_generator
self._table = table
self._lock = threading.Lock() self._lock = threading.Lock()
self._current_max = _load_current_id(db_conn, table, column) self._current_max = _load_current_id(db_conn, table, column)
self._unfinished_ids = deque() # type: Deque[Tuple[int, int]] self._unfinished_ids = deque() # type: Deque[Tuple[int, int]]
@@ -204,6 +205,16 @@ class ChainedIdGenerator(object):
return self._current_max, self.chained_generator.get_current_token() return self._current_max, self.chained_generator.get_current_token()
def advance(self, token: int):
"""Stub implementation for advancing the token when receiving updates
over replication; raises an exception as this instance should be the
only source of updates.
"""
raise Exception(
"Attempted to advance token on source for table %r", self._table
)
class MultiWriterIdGenerator: class MultiWriterIdGenerator:
"""An ID generator that tracks a stream that can have multiple writers. """An ID generator that tracks a stream that can have multiple writers.