mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-11 01:40:27 +00:00
Compare commits
12 Commits
v1.92.1
...
erikj/debu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6b2d6fdd33 | ||
|
|
d263a4de02 | ||
|
|
66c1dff3ba | ||
|
|
96b6023e3b | ||
|
|
452019064c | ||
|
|
7c8e09bcf1 | ||
|
|
e7f5ac4ed8 | ||
|
|
208ab7b135 | ||
|
|
41f558ccf7 | ||
|
|
342796d6ac | ||
|
|
bc3fc3927f | ||
|
|
d67a8b5455 |
1
changelog.d/7491.misc
Normal file
1
changelog.d/7491.misc
Normal file
@@ -0,0 +1 @@
|
|||||||
|
Move event stream handling out of slave store.
|
||||||
@@ -47,6 +47,7 @@ from synapse.http.site import SynapseSite
|
|||||||
from synapse.logging.context import LoggingContext
|
from synapse.logging.context import LoggingContext
|
||||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
|
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
|
||||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||||
@@ -122,6 +123,7 @@ from synapse.storage.data_stores.main.monthly_active_users import (
|
|||||||
MonthlyActiveUsersWorkerStore,
|
MonthlyActiveUsersWorkerStore,
|
||||||
)
|
)
|
||||||
from synapse.storage.data_stores.main.presence import UserPresenceState
|
from synapse.storage.data_stores.main.presence import UserPresenceState
|
||||||
|
from synapse.storage.data_stores.main.search import SearchWorkerStore
|
||||||
from synapse.storage.data_stores.main.ui_auth import UIAuthWorkerStore
|
from synapse.storage.data_stores.main.ui_auth import UIAuthWorkerStore
|
||||||
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
|
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
|
||||||
from synapse.types import ReadReceipt
|
from synapse.types import ReadReceipt
|
||||||
@@ -451,6 +453,7 @@ class GenericWorkerSlavedStore(
|
|||||||
SlavedFilteringStore,
|
SlavedFilteringStore,
|
||||||
MonthlyActiveUsersWorkerStore,
|
MonthlyActiveUsersWorkerStore,
|
||||||
MediaRepositoryStore,
|
MediaRepositoryStore,
|
||||||
|
SearchWorkerStore,
|
||||||
BaseSlavedStore,
|
BaseSlavedStore,
|
||||||
):
|
):
|
||||||
def __init__(self, database, db_conn, hs):
|
def __init__(self, database, db_conn, hs):
|
||||||
@@ -568,6 +571,9 @@ class GenericWorkerServer(HomeServer):
|
|||||||
if name in ["keys", "federation"]:
|
if name in ["keys", "federation"]:
|
||||||
resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self)
|
resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self)
|
||||||
|
|
||||||
|
if name == "replication":
|
||||||
|
resources[REPLICATION_PREFIX] = ReplicationRestResource(self)
|
||||||
|
|
||||||
root_resource = create_resource_tree(resources, NoResource())
|
root_resource = create_resource_tree(resources, NoResource())
|
||||||
|
|
||||||
_base.listen_tcp(
|
_base.listen_tcp(
|
||||||
|
|||||||
@@ -257,5 +257,6 @@ def setup_logging(
|
|||||||
logging.warning("***** STARTING SERVER *****")
|
logging.warning("***** STARTING SERVER *****")
|
||||||
logging.warning("Server %s version %s", sys.argv[0], get_version_string(synapse))
|
logging.warning("Server %s version %s", sys.argv[0], get_version_string(synapse))
|
||||||
logging.info("Server hostname: %s", config.server_name)
|
logging.info("Server hostname: %s", config.server_name)
|
||||||
|
logging.info("Instance name: %s", hs.get_instance_name())
|
||||||
|
|
||||||
return logger
|
return logger
|
||||||
|
|||||||
@@ -27,6 +27,17 @@ class InstanceLocationConfig:
|
|||||||
port = attr.ib(type=int)
|
port = attr.ib(type=int)
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s
|
||||||
|
class WriterLocations:
|
||||||
|
"""Specifies the instances that write various streams.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
events: The instance that writes to the event and backfill streams.
|
||||||
|
"""
|
||||||
|
|
||||||
|
events = attr.ib(default="master", type=str)
|
||||||
|
|
||||||
|
|
||||||
class WorkerConfig(Config):
|
class WorkerConfig(Config):
|
||||||
"""The workers are processes run separately to the main synapse process.
|
"""The workers are processes run separately to the main synapse process.
|
||||||
They have their own pid_file and listener configuration. They use the
|
They have their own pid_file and listener configuration. They use the
|
||||||
@@ -88,6 +99,10 @@ class WorkerConfig(Config):
|
|||||||
name: InstanceLocationConfig(**c) for name, c in instance_map.items()
|
name: InstanceLocationConfig(**c) for name, c in instance_map.items()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Map from type of streams to source, c.f. WriterLocations.
|
||||||
|
writers = config.get("writers", {}) or {}
|
||||||
|
self.writers = WriterLocations(**writers)
|
||||||
|
|
||||||
def read_arguments(self, args):
|
def read_arguments(self, args):
|
||||||
# We support a bunch of command line arguments that override options in
|
# We support a bunch of command line arguments that override options in
|
||||||
# the config. A lot of these options have a worker_* prefix when running
|
# the config. A lot of these options have a worker_* prefix when running
|
||||||
|
|||||||
@@ -125,10 +125,9 @@ class FederationHandler(BaseHandler):
|
|||||||
self._server_notices_mxid = hs.config.server_notices_mxid
|
self._server_notices_mxid = hs.config.server_notices_mxid
|
||||||
self.config = hs.config
|
self.config = hs.config
|
||||||
self.http_client = hs.get_simple_http_client()
|
self.http_client = hs.get_simple_http_client()
|
||||||
|
self._instance_name = hs.get_instance_name()
|
||||||
|
|
||||||
self._send_events_to_master = ReplicationFederationSendEventsRestServlet.make_client(
|
self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
|
||||||
hs
|
|
||||||
)
|
|
||||||
self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client(
|
self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client(
|
||||||
hs
|
hs
|
||||||
)
|
)
|
||||||
@@ -2837,8 +2836,9 @@ class FederationHandler(BaseHandler):
|
|||||||
backfilled: Whether these events are a result of
|
backfilled: Whether these events are a result of
|
||||||
backfilling or not
|
backfilling or not
|
||||||
"""
|
"""
|
||||||
if self.config.worker_app:
|
if self.config.worker.writers.events != self._instance_name:
|
||||||
await self._send_events_to_master(
|
await self._send_events(
|
||||||
|
instance_name=self.config.worker.writers.events,
|
||||||
store=self.store,
|
store=self.store,
|
||||||
event_and_contexts=event_and_contexts,
|
event_and_contexts=event_and_contexts,
|
||||||
backfilled=backfilled,
|
backfilled=backfilled,
|
||||||
|
|||||||
@@ -365,10 +365,11 @@ class EventCreationHandler(object):
|
|||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
self.config = hs.config
|
self.config = hs.config
|
||||||
self.require_membership_for_aliases = hs.config.require_membership_for_aliases
|
self.require_membership_for_aliases = hs.config.require_membership_for_aliases
|
||||||
|
self._instance_name = hs.get_instance_name()
|
||||||
|
|
||||||
self.room_invite_state_types = self.hs.config.room_invite_state_types
|
self.room_invite_state_types = self.hs.config.room_invite_state_types
|
||||||
|
|
||||||
self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
|
self.send_event = ReplicationSendEventRestServlet.make_client(hs)
|
||||||
|
|
||||||
# This is only used to get at ratelimit function, and maybe_kick_guest_users
|
# This is only used to get at ratelimit function, and maybe_kick_guest_users
|
||||||
self.base_handler = BaseHandler(hs)
|
self.base_handler = BaseHandler(hs)
|
||||||
@@ -822,8 +823,9 @@ class EventCreationHandler(object):
|
|||||||
success = False
|
success = False
|
||||||
try:
|
try:
|
||||||
# If we're a worker we need to hit out to the master.
|
# If we're a worker we need to hit out to the master.
|
||||||
if self.config.worker_app:
|
if self.config.worker.writers.events != self._instance_name:
|
||||||
await self.send_event_to_master(
|
await self.send_event(
|
||||||
|
instance_name=self.config.worker.writers.events,
|
||||||
event_id=event.event_id,
|
event_id=event.event_id,
|
||||||
store=self.store,
|
store=self.store,
|
||||||
requester=requester,
|
requester=requester,
|
||||||
@@ -888,7 +890,7 @@ class EventCreationHandler(object):
|
|||||||
|
|
||||||
This should only be run on master.
|
This should only be run on master.
|
||||||
"""
|
"""
|
||||||
assert not self.config.worker_app
|
assert self.config.worker.writers.events == self._instance_name
|
||||||
|
|
||||||
if ratelimit:
|
if ratelimit:
|
||||||
# We check if this is a room admin redacting an event so that we
|
# We check if this is a room admin redacting an event so that we
|
||||||
|
|||||||
@@ -34,9 +34,11 @@ class ReplicationRestResource(JsonResource):
|
|||||||
|
|
||||||
def register_servlets(self, hs):
|
def register_servlets(self, hs):
|
||||||
send_event.register_servlets(hs, self)
|
send_event.register_servlets(hs, self)
|
||||||
membership.register_servlets(hs, self)
|
|
||||||
federation.register_servlets(hs, self)
|
federation.register_servlets(hs, self)
|
||||||
login.register_servlets(hs, self)
|
|
||||||
register.register_servlets(hs, self)
|
if hs.config.worker.worker_app is None:
|
||||||
devices.register_servlets(hs, self)
|
membership.register_servlets(hs, self)
|
||||||
streams.register_servlets(hs, self)
|
login.register_servlets(hs, self)
|
||||||
|
register.register_servlets(hs, self)
|
||||||
|
devices.register_servlets(hs, self)
|
||||||
|
streams.register_servlets(hs, self)
|
||||||
|
|||||||
@@ -142,6 +142,7 @@ class ReplicationEndpoint(object):
|
|||||||
"""
|
"""
|
||||||
clock = hs.get_clock()
|
clock = hs.get_clock()
|
||||||
client = hs.get_simple_http_client()
|
client = hs.get_simple_http_client()
|
||||||
|
local_instance_name = hs.get_instance_name()
|
||||||
|
|
||||||
master_host = hs.config.worker_replication_host
|
master_host = hs.config.worker_replication_host
|
||||||
master_port = hs.config.worker_replication_http_port
|
master_port = hs.config.worker_replication_http_port
|
||||||
@@ -151,6 +152,8 @@ class ReplicationEndpoint(object):
|
|||||||
@trace(opname="outgoing_replication_request")
|
@trace(opname="outgoing_replication_request")
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def send_request(instance_name="master", **kwargs):
|
def send_request(instance_name="master", **kwargs):
|
||||||
|
if instance_name == local_instance_name:
|
||||||
|
raise Exception("Trying to send HTTP request to self")
|
||||||
if instance_name == "master":
|
if instance_name == "master":
|
||||||
host = master_host
|
host = master_host
|
||||||
port = master_port
|
port = master_port
|
||||||
|
|||||||
@@ -15,11 +15,6 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes
|
|
||||||
from synapse.replication.tcp.streams.events import (
|
|
||||||
EventsStreamCurrentStateRow,
|
|
||||||
EventsStreamEventRow,
|
|
||||||
)
|
|
||||||
from synapse.storage.data_stores.main.event_federation import EventFederationWorkerStore
|
from synapse.storage.data_stores.main.event_federation import EventFederationWorkerStore
|
||||||
from synapse.storage.data_stores.main.event_push_actions import (
|
from synapse.storage.data_stores.main.event_push_actions import (
|
||||||
EventPushActionsWorkerStore,
|
EventPushActionsWorkerStore,
|
||||||
@@ -35,7 +30,6 @@ from synapse.storage.database import Database
|
|||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
|
|
||||||
from ._base import BaseSlavedStore
|
from ._base import BaseSlavedStore
|
||||||
from ._slaved_id_tracker import SlavedIdTracker
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -62,11 +56,6 @@ class SlavedEventStore(
|
|||||||
BaseSlavedStore,
|
BaseSlavedStore,
|
||||||
):
|
):
|
||||||
def __init__(self, database: Database, db_conn, hs):
|
def __init__(self, database: Database, db_conn, hs):
|
||||||
self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
|
|
||||||
self._backfill_id_gen = SlavedIdTracker(
|
|
||||||
db_conn, "events", "stream_ordering", step=-1
|
|
||||||
)
|
|
||||||
|
|
||||||
super(SlavedEventStore, self).__init__(database, db_conn, hs)
|
super(SlavedEventStore, self).__init__(database, db_conn, hs)
|
||||||
|
|
||||||
events_max = self._stream_id_gen.get_current_token()
|
events_max = self._stream_id_gen.get_current_token()
|
||||||
@@ -92,81 +81,3 @@ class SlavedEventStore(
|
|||||||
|
|
||||||
def get_room_min_stream_ordering(self):
|
def get_room_min_stream_ordering(self):
|
||||||
return self._backfill_id_gen.get_current_token()
|
return self._backfill_id_gen.get_current_token()
|
||||||
|
|
||||||
def process_replication_rows(self, stream_name, instance_name, token, rows):
|
|
||||||
if stream_name == "events":
|
|
||||||
self._stream_id_gen.advance(token)
|
|
||||||
for row in rows:
|
|
||||||
self._process_event_stream_row(token, row)
|
|
||||||
elif stream_name == "backfill":
|
|
||||||
self._backfill_id_gen.advance(-token)
|
|
||||||
for row in rows:
|
|
||||||
self.invalidate_caches_for_event(
|
|
||||||
-token,
|
|
||||||
row.event_id,
|
|
||||||
row.room_id,
|
|
||||||
row.type,
|
|
||||||
row.state_key,
|
|
||||||
row.redacts,
|
|
||||||
row.relates_to,
|
|
||||||
backfilled=True,
|
|
||||||
)
|
|
||||||
return super().process_replication_rows(stream_name, instance_name, token, rows)
|
|
||||||
|
|
||||||
def _process_event_stream_row(self, token, row):
|
|
||||||
data = row.data
|
|
||||||
|
|
||||||
if row.type == EventsStreamEventRow.TypeId:
|
|
||||||
self.invalidate_caches_for_event(
|
|
||||||
token,
|
|
||||||
data.event_id,
|
|
||||||
data.room_id,
|
|
||||||
data.type,
|
|
||||||
data.state_key,
|
|
||||||
data.redacts,
|
|
||||||
data.relates_to,
|
|
||||||
backfilled=False,
|
|
||||||
)
|
|
||||||
elif row.type == EventsStreamCurrentStateRow.TypeId:
|
|
||||||
self._curr_state_delta_stream_cache.entity_has_changed(
|
|
||||||
row.data.room_id, token
|
|
||||||
)
|
|
||||||
|
|
||||||
if data.type == EventTypes.Member:
|
|
||||||
self.get_rooms_for_user_with_stream_ordering.invalidate(
|
|
||||||
(data.state_key,)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
raise Exception("Unknown events stream row type %s" % (row.type,))
|
|
||||||
|
|
||||||
def invalidate_caches_for_event(
|
|
||||||
self,
|
|
||||||
stream_ordering,
|
|
||||||
event_id,
|
|
||||||
room_id,
|
|
||||||
etype,
|
|
||||||
state_key,
|
|
||||||
redacts,
|
|
||||||
relates_to,
|
|
||||||
backfilled,
|
|
||||||
):
|
|
||||||
self._invalidate_get_event_cache(event_id)
|
|
||||||
|
|
||||||
self.get_latest_event_ids_in_room.invalidate((room_id,))
|
|
||||||
|
|
||||||
self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))
|
|
||||||
|
|
||||||
if not backfilled:
|
|
||||||
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
|
|
||||||
|
|
||||||
if redacts:
|
|
||||||
self._invalidate_get_event_cache(redacts)
|
|
||||||
|
|
||||||
if etype == EventTypes.Member:
|
|
||||||
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
|
|
||||||
self.get_invited_rooms_for_local_user.invalidate((state_key,))
|
|
||||||
|
|
||||||
if relates_to:
|
|
||||||
self.get_relations_for_event.invalidate_many((relates_to,))
|
|
||||||
self.get_aggregation_groups_for_event.invalidate_many((relates_to,))
|
|
||||||
self.get_applicable_edit.invalidate((relates_to,))
|
|
||||||
|
|||||||
@@ -15,19 +15,11 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from synapse.storage.data_stores.main.push_rule import PushRulesWorkerStore
|
from synapse.storage.data_stores.main.push_rule import PushRulesWorkerStore
|
||||||
from synapse.storage.database import Database
|
|
||||||
|
|
||||||
from ._slaved_id_tracker import SlavedIdTracker
|
|
||||||
from .events import SlavedEventStore
|
from .events import SlavedEventStore
|
||||||
|
|
||||||
|
|
||||||
class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
|
class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
|
||||||
def __init__(self, database: Database, db_conn, hs):
|
|
||||||
self._push_rules_stream_id_gen = SlavedIdTracker(
|
|
||||||
db_conn, "push_rules_stream", "stream_id"
|
|
||||||
)
|
|
||||||
super(SlavedPushRuleStore, self).__init__(database, db_conn, hs)
|
|
||||||
|
|
||||||
def get_push_rules_stream_token(self):
|
def get_push_rules_stream_token(self):
|
||||||
return (
|
return (
|
||||||
self._push_rules_stream_id_gen.get_current_token(),
|
self._push_rules_stream_id_gen.get_current_token(),
|
||||||
|
|||||||
@@ -38,7 +38,9 @@ from synapse.replication.tcp.commands import (
|
|||||||
from synapse.replication.tcp.protocol import AbstractConnection
|
from synapse.replication.tcp.protocol import AbstractConnection
|
||||||
from synapse.replication.tcp.streams import (
|
from synapse.replication.tcp.streams import (
|
||||||
STREAMS_MAP,
|
STREAMS_MAP,
|
||||||
|
BackfillStream,
|
||||||
CachesStream,
|
CachesStream,
|
||||||
|
EventsStream,
|
||||||
FederationStream,
|
FederationStream,
|
||||||
Stream,
|
Stream,
|
||||||
)
|
)
|
||||||
@@ -87,6 +89,12 @@ class ReplicationCommandHandler:
|
|||||||
self._streams_to_replicate.append(stream)
|
self._streams_to_replicate.append(stream)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
if (
|
||||||
|
isinstance(stream, (EventsStream, BackfillStream))
|
||||||
|
and hs.config.worker.writers.events == hs.get_instance_name()
|
||||||
|
):
|
||||||
|
self._streams_to_replicate.append(stream)
|
||||||
|
|
||||||
# Only add any other streams if we're on master.
|
# Only add any other streams if we're on master.
|
||||||
if hs.config.worker_app is not None:
|
if hs.config.worker_app is not None:
|
||||||
continue
|
continue
|
||||||
|
|||||||
@@ -66,9 +66,9 @@ class DataStores(object):
|
|||||||
|
|
||||||
self.main = main_store_class(database, db_conn, hs)
|
self.main = main_store_class(database, db_conn, hs)
|
||||||
|
|
||||||
# If we're on a process that can persist events (currently
|
# If we're on a process that can persist events also
|
||||||
# master), also instantiate a `PersistEventsStore`
|
# instansiate a `PersistEventsStore`
|
||||||
if hs.config.worker.worker_app is None:
|
if hs.config.worker.writers.events == hs.get_instance_name():
|
||||||
self.persist_events = PersistEventsStore(
|
self.persist_events = PersistEventsStore(
|
||||||
hs, database, self.main
|
hs, database, self.main
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -24,7 +24,6 @@ from synapse.config.homeserver import HomeServerConfig
|
|||||||
from synapse.storage.database import Database
|
from synapse.storage.database import Database
|
||||||
from synapse.storage.engines import PostgresEngine
|
from synapse.storage.engines import PostgresEngine
|
||||||
from synapse.storage.util.id_generators import (
|
from synapse.storage.util.id_generators import (
|
||||||
ChainedIdGenerator,
|
|
||||||
IdGenerator,
|
IdGenerator,
|
||||||
MultiWriterIdGenerator,
|
MultiWriterIdGenerator,
|
||||||
StreamIdGenerator,
|
StreamIdGenerator,
|
||||||
@@ -125,19 +124,6 @@ class DataStore(
|
|||||||
self._clock = hs.get_clock()
|
self._clock = hs.get_clock()
|
||||||
self.database_engine = database.engine
|
self.database_engine = database.engine
|
||||||
|
|
||||||
self._stream_id_gen = StreamIdGenerator(
|
|
||||||
db_conn,
|
|
||||||
"events",
|
|
||||||
"stream_ordering",
|
|
||||||
extra_tables=[("local_invites", "stream_id")],
|
|
||||||
)
|
|
||||||
self._backfill_id_gen = StreamIdGenerator(
|
|
||||||
db_conn,
|
|
||||||
"events",
|
|
||||||
"stream_ordering",
|
|
||||||
step=-1,
|
|
||||||
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
|
|
||||||
)
|
|
||||||
self._presence_id_gen = StreamIdGenerator(
|
self._presence_id_gen = StreamIdGenerator(
|
||||||
db_conn, "presence_stream", "stream_id"
|
db_conn, "presence_stream", "stream_id"
|
||||||
)
|
)
|
||||||
@@ -164,9 +150,6 @@ class DataStore(
|
|||||||
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
|
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
|
||||||
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
|
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
|
||||||
self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
|
self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
|
||||||
self._push_rules_stream_id_gen = ChainedIdGenerator(
|
|
||||||
self._stream_id_gen, db_conn, "push_rules_stream", "stream_id"
|
|
||||||
)
|
|
||||||
self._pushers_id_gen = StreamIdGenerator(
|
self._pushers_id_gen = StreamIdGenerator(
|
||||||
db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
|
db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -16,8 +16,13 @@
|
|||||||
|
|
||||||
import itertools
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
from typing import Any, Iterable, Optional
|
from typing import Any, Iterable, Optional, Tuple
|
||||||
|
|
||||||
|
from synapse.api.constants import EventTypes
|
||||||
|
from synapse.replication.tcp.streams.events import (
|
||||||
|
EventsStreamCurrentStateRow,
|
||||||
|
EventsStreamEventRow,
|
||||||
|
)
|
||||||
from synapse.storage._base import SQLBaseStore
|
from synapse.storage._base import SQLBaseStore
|
||||||
from synapse.storage.database import Database
|
from synapse.storage.database import Database
|
||||||
from synapse.storage.engines import PostgresEngine
|
from synapse.storage.engines import PostgresEngine
|
||||||
@@ -66,7 +71,24 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def process_replication_rows(self, stream_name, instance_name, token, rows):
|
def process_replication_rows(self, stream_name, instance_name, token, rows):
|
||||||
if stream_name == "caches":
|
if stream_name == "events":
|
||||||
|
self._stream_id_gen.advance(token)
|
||||||
|
for row in rows:
|
||||||
|
self._process_event_stream_row(token, row)
|
||||||
|
elif stream_name == "backfill":
|
||||||
|
self._backfill_id_gen.advance(-token)
|
||||||
|
for row in rows:
|
||||||
|
self._invalidate_caches_for_event(
|
||||||
|
-token,
|
||||||
|
row.event_id,
|
||||||
|
row.room_id,
|
||||||
|
row.type,
|
||||||
|
row.state_key,
|
||||||
|
row.redacts,
|
||||||
|
row.relates_to,
|
||||||
|
backfilled=True,
|
||||||
|
)
|
||||||
|
elif stream_name == "caches":
|
||||||
if self._cache_id_gen:
|
if self._cache_id_gen:
|
||||||
self._cache_id_gen.advance(instance_name, token)
|
self._cache_id_gen.advance(instance_name, token)
|
||||||
|
|
||||||
@@ -85,6 +107,84 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
|||||||
|
|
||||||
super().process_replication_rows(stream_name, instance_name, token, rows)
|
super().process_replication_rows(stream_name, instance_name, token, rows)
|
||||||
|
|
||||||
|
def _process_event_stream_row(self, token, row):
|
||||||
|
data = row.data
|
||||||
|
|
||||||
|
if row.type == EventsStreamEventRow.TypeId:
|
||||||
|
self._invalidate_caches_for_event(
|
||||||
|
token,
|
||||||
|
data.event_id,
|
||||||
|
data.room_id,
|
||||||
|
data.type,
|
||||||
|
data.state_key,
|
||||||
|
data.redacts,
|
||||||
|
data.relates_to,
|
||||||
|
backfilled=False,
|
||||||
|
)
|
||||||
|
elif row.type == EventsStreamCurrentStateRow.TypeId:
|
||||||
|
self._curr_state_delta_stream_cache.entity_has_changed(
|
||||||
|
row.data.room_id, token
|
||||||
|
)
|
||||||
|
|
||||||
|
if data.type == EventTypes.Member:
|
||||||
|
self.get_rooms_for_user_with_stream_ordering.invalidate(
|
||||||
|
(data.state_key,)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise Exception("Unknown events stream row type %s" % (row.type,))
|
||||||
|
|
||||||
|
def _invalidate_caches_for_event(
|
||||||
|
self,
|
||||||
|
stream_ordering,
|
||||||
|
event_id,
|
||||||
|
room_id,
|
||||||
|
etype,
|
||||||
|
state_key,
|
||||||
|
redacts,
|
||||||
|
relates_to,
|
||||||
|
backfilled,
|
||||||
|
):
|
||||||
|
self._invalidate_get_event_cache(event_id)
|
||||||
|
|
||||||
|
self.get_latest_event_ids_in_room.invalidate((room_id,))
|
||||||
|
|
||||||
|
self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))
|
||||||
|
|
||||||
|
if not backfilled:
|
||||||
|
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
|
||||||
|
|
||||||
|
if redacts:
|
||||||
|
self._invalidate_get_event_cache(redacts)
|
||||||
|
|
||||||
|
if etype == EventTypes.Member:
|
||||||
|
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
|
||||||
|
self.get_invited_rooms_for_local_user.invalidate((state_key,))
|
||||||
|
|
||||||
|
if relates_to:
|
||||||
|
self.get_relations_for_event.invalidate_many((relates_to,))
|
||||||
|
self.get_aggregation_groups_for_event.invalidate_many((relates_to,))
|
||||||
|
self.get_applicable_edit.invalidate((relates_to,))
|
||||||
|
|
||||||
|
async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]):
|
||||||
|
"""Invalidates the cache and adds it to the cache stream so slaves
|
||||||
|
will know to invalidate their caches.
|
||||||
|
|
||||||
|
This should only be used to invalidate caches where slaves won't
|
||||||
|
otherwise know from other replication streams that the cache should
|
||||||
|
be invalidated.
|
||||||
|
"""
|
||||||
|
cache_func = getattr(self, cache_name, None)
|
||||||
|
if not cache_func:
|
||||||
|
return
|
||||||
|
|
||||||
|
cache_func.invalidate(keys)
|
||||||
|
await self.db.runInteraction(
|
||||||
|
"invalidate_cache_and_stream",
|
||||||
|
self._send_invalidation_to_replication,
|
||||||
|
cache_func.__name__,
|
||||||
|
keys,
|
||||||
|
)
|
||||||
|
|
||||||
def _invalidate_cache_and_stream(self, txn, cache_func, keys):
|
def _invalidate_cache_and_stream(self, txn, cache_func, keys):
|
||||||
"""Invalidates the cache and adds it to the cache stream so slaves
|
"""Invalidates the cache and adds it to the cache stream so slaves
|
||||||
will know to invalidate their caches.
|
will know to invalidate their caches.
|
||||||
|
|||||||
@@ -138,10 +138,10 @@ class PersistEventsStore:
|
|||||||
self._backfill_id_gen = self.store._backfill_id_gen # type: StreamIdGenerator
|
self._backfill_id_gen = self.store._backfill_id_gen # type: StreamIdGenerator
|
||||||
self._stream_id_gen = self.store._stream_id_gen # type: StreamIdGenerator
|
self._stream_id_gen = self.store._stream_id_gen # type: StreamIdGenerator
|
||||||
|
|
||||||
# This should only exist on master for now
|
# This should only exist on instances that are configured to write
|
||||||
assert (
|
assert (
|
||||||
hs.config.worker.worker_app is None
|
hs.config.worker.writers.events == hs.get_instance_name()
|
||||||
), "Can only instantiate PersistEventsStore on master"
|
), "Can only instantiate EventsStore on master"
|
||||||
|
|
||||||
@_retry_on_integrity_error
|
@_retry_on_integrity_error
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
|||||||
@@ -37,8 +37,10 @@ from synapse.events import make_event_from_dict
|
|||||||
from synapse.events.utils import prune_event
|
from synapse.events.utils import prune_event
|
||||||
from synapse.logging.context import PreserveLoggingContext, current_context
|
from synapse.logging.context import PreserveLoggingContext, current_context
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
|
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
|
||||||
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
|
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
|
||||||
from synapse.storage.database import Database
|
from synapse.storage.database import Database
|
||||||
|
from synapse.storage.util.id_generators import StreamIdGenerator
|
||||||
from synapse.types import get_domain_from_id
|
from synapse.types import get_domain_from_id
|
||||||
from synapse.util.caches.descriptors import Cache, cached, cachedInlineCallbacks
|
from synapse.util.caches.descriptors import Cache, cached, cachedInlineCallbacks
|
||||||
from synapse.util.iterutils import batch_iter
|
from synapse.util.iterutils import batch_iter
|
||||||
@@ -74,6 +76,26 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
def __init__(self, database: Database, db_conn, hs):
|
def __init__(self, database: Database, db_conn, hs):
|
||||||
super(EventsWorkerStore, self).__init__(database, db_conn, hs)
|
super(EventsWorkerStore, self).__init__(database, db_conn, hs)
|
||||||
|
|
||||||
|
if hs.config.worker.writers.events == hs.get_instance_name():
|
||||||
|
self._stream_id_gen = StreamIdGenerator(
|
||||||
|
db_conn,
|
||||||
|
"events",
|
||||||
|
"stream_ordering",
|
||||||
|
extra_tables=[("local_invites", "stream_id")],
|
||||||
|
)
|
||||||
|
self._backfill_id_gen = StreamIdGenerator(
|
||||||
|
db_conn,
|
||||||
|
"events",
|
||||||
|
"stream_ordering",
|
||||||
|
step=-1,
|
||||||
|
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
|
||||||
|
self._backfill_id_gen = SlavedIdTracker(
|
||||||
|
db_conn, "events", "stream_ordering", step=-1
|
||||||
|
)
|
||||||
|
|
||||||
self._get_event_cache = Cache(
|
self._get_event_cache = Cache(
|
||||||
"*getEvent*",
|
"*getEvent*",
|
||||||
keylen=3,
|
keylen=3,
|
||||||
|
|||||||
@@ -16,12 +16,14 @@
|
|||||||
|
|
||||||
import abc
|
import abc
|
||||||
import logging
|
import logging
|
||||||
|
from typing import Union
|
||||||
|
|
||||||
from canonicaljson import json
|
from canonicaljson import json
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.push.baserules import list_with_base_rules
|
from synapse.push.baserules import list_with_base_rules
|
||||||
|
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
|
||||||
from synapse.storage._base import SQLBaseStore
|
from synapse.storage._base import SQLBaseStore
|
||||||
from synapse.storage.data_stores.main.appservice import ApplicationServiceWorkerStore
|
from synapse.storage.data_stores.main.appservice import ApplicationServiceWorkerStore
|
||||||
from synapse.storage.data_stores.main.pusher import PusherWorkerStore
|
from synapse.storage.data_stores.main.pusher import PusherWorkerStore
|
||||||
@@ -29,6 +31,7 @@ from synapse.storage.data_stores.main.receipts import ReceiptsWorkerStore
|
|||||||
from synapse.storage.data_stores.main.roommember import RoomMemberWorkerStore
|
from synapse.storage.data_stores.main.roommember import RoomMemberWorkerStore
|
||||||
from synapse.storage.database import Database
|
from synapse.storage.database import Database
|
||||||
from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException
|
from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException
|
||||||
|
from synapse.storage.util.id_generators import ChainedIdGenerator
|
||||||
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
|
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
|
|
||||||
@@ -77,6 +80,15 @@ class PushRulesWorkerStore(
|
|||||||
def __init__(self, database: Database, db_conn, hs):
|
def __init__(self, database: Database, db_conn, hs):
|
||||||
super(PushRulesWorkerStore, self).__init__(database, db_conn, hs)
|
super(PushRulesWorkerStore, self).__init__(database, db_conn, hs)
|
||||||
|
|
||||||
|
if hs.config.worker.worker_app is None:
|
||||||
|
self._push_rules_stream_id_gen = ChainedIdGenerator(
|
||||||
|
self._stream_id_gen, db_conn, "push_rules_stream", "stream_id"
|
||||||
|
) # type: Union[ChainedIdGenerator, SlavedIdTracker]
|
||||||
|
else:
|
||||||
|
self._push_rules_stream_id_gen = SlavedIdTracker(
|
||||||
|
db_conn, "push_rules_stream", "stream_id"
|
||||||
|
)
|
||||||
|
|
||||||
push_rules_prefill, push_rules_id = self.db.get_cache_dict(
|
push_rules_prefill, push_rules_id = self.db.get_cache_dict(
|
||||||
db_conn,
|
db_conn,
|
||||||
"push_rules_stream",
|
"push_rules_stream",
|
||||||
|
|||||||
@@ -37,7 +37,55 @@ SearchEntry = namedtuple(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class SearchBackgroundUpdateStore(SQLBaseStore):
|
class SearchWorkerStore(SQLBaseStore):
|
||||||
|
def store_search_entries_txn(self, txn, entries):
|
||||||
|
"""Add entries to the search table
|
||||||
|
|
||||||
|
Args:
|
||||||
|
txn (cursor):
|
||||||
|
entries (iterable[SearchEntry]):
|
||||||
|
entries to be added to the table
|
||||||
|
"""
|
||||||
|
if not self.hs.config.enable_search:
|
||||||
|
return
|
||||||
|
if isinstance(self.database_engine, PostgresEngine):
|
||||||
|
sql = (
|
||||||
|
"INSERT INTO event_search"
|
||||||
|
" (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
|
||||||
|
" VALUES (?,?,?,to_tsvector('english', ?),?,?)"
|
||||||
|
)
|
||||||
|
|
||||||
|
args = (
|
||||||
|
(
|
||||||
|
entry.event_id,
|
||||||
|
entry.room_id,
|
||||||
|
entry.key,
|
||||||
|
entry.value,
|
||||||
|
entry.stream_ordering,
|
||||||
|
entry.origin_server_ts,
|
||||||
|
)
|
||||||
|
for entry in entries
|
||||||
|
)
|
||||||
|
|
||||||
|
txn.executemany(sql, args)
|
||||||
|
|
||||||
|
elif isinstance(self.database_engine, Sqlite3Engine):
|
||||||
|
sql = (
|
||||||
|
"INSERT INTO event_search (event_id, room_id, key, value)"
|
||||||
|
" VALUES (?,?,?,?)"
|
||||||
|
)
|
||||||
|
args = (
|
||||||
|
(entry.event_id, entry.room_id, entry.key, entry.value)
|
||||||
|
for entry in entries
|
||||||
|
)
|
||||||
|
|
||||||
|
txn.executemany(sql, args)
|
||||||
|
else:
|
||||||
|
# This should be unreachable.
|
||||||
|
raise Exception("Unrecognized database engine")
|
||||||
|
|
||||||
|
|
||||||
|
class SearchBackgroundUpdateStore(SearchWorkerStore):
|
||||||
|
|
||||||
EVENT_SEARCH_UPDATE_NAME = "event_search"
|
EVENT_SEARCH_UPDATE_NAME = "event_search"
|
||||||
EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order"
|
EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order"
|
||||||
@@ -296,52 +344,6 @@ class SearchBackgroundUpdateStore(SQLBaseStore):
|
|||||||
|
|
||||||
return num_rows
|
return num_rows
|
||||||
|
|
||||||
def store_search_entries_txn(self, txn, entries):
|
|
||||||
"""Add entries to the search table
|
|
||||||
|
|
||||||
Args:
|
|
||||||
txn (cursor):
|
|
||||||
entries (iterable[SearchEntry]):
|
|
||||||
entries to be added to the table
|
|
||||||
"""
|
|
||||||
if not self.hs.config.enable_search:
|
|
||||||
return
|
|
||||||
if isinstance(self.database_engine, PostgresEngine):
|
|
||||||
sql = (
|
|
||||||
"INSERT INTO event_search"
|
|
||||||
" (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
|
|
||||||
" VALUES (?,?,?,to_tsvector('english', ?),?,?)"
|
|
||||||
)
|
|
||||||
|
|
||||||
args = (
|
|
||||||
(
|
|
||||||
entry.event_id,
|
|
||||||
entry.room_id,
|
|
||||||
entry.key,
|
|
||||||
entry.value,
|
|
||||||
entry.stream_ordering,
|
|
||||||
entry.origin_server_ts,
|
|
||||||
)
|
|
||||||
for entry in entries
|
|
||||||
)
|
|
||||||
|
|
||||||
txn.executemany(sql, args)
|
|
||||||
|
|
||||||
elif isinstance(self.database_engine, Sqlite3Engine):
|
|
||||||
sql = (
|
|
||||||
"INSERT INTO event_search (event_id, room_id, key, value)"
|
|
||||||
" VALUES (?,?,?,?)"
|
|
||||||
)
|
|
||||||
args = (
|
|
||||||
(entry.event_id, entry.room_id, entry.key, entry.value)
|
|
||||||
for entry in entries
|
|
||||||
)
|
|
||||||
|
|
||||||
txn.executemany(sql, args)
|
|
||||||
else:
|
|
||||||
# This should be unreachable.
|
|
||||||
raise Exception("Unrecognized database engine")
|
|
||||||
|
|
||||||
|
|
||||||
class SearchStore(SearchBackgroundUpdateStore):
|
class SearchStore(SearchBackgroundUpdateStore):
|
||||||
def __init__(self, database: Database, db_conn, hs):
|
def __init__(self, database: Database, db_conn, hs):
|
||||||
|
|||||||
@@ -166,6 +166,7 @@ class ChainedIdGenerator(object):
|
|||||||
|
|
||||||
def __init__(self, chained_generator, db_conn, table, column):
|
def __init__(self, chained_generator, db_conn, table, column):
|
||||||
self.chained_generator = chained_generator
|
self.chained_generator = chained_generator
|
||||||
|
self._table = table
|
||||||
self._lock = threading.Lock()
|
self._lock = threading.Lock()
|
||||||
self._current_max = _load_current_id(db_conn, table, column)
|
self._current_max = _load_current_id(db_conn, table, column)
|
||||||
self._unfinished_ids = deque() # type: Deque[Tuple[int, int]]
|
self._unfinished_ids = deque() # type: Deque[Tuple[int, int]]
|
||||||
@@ -204,6 +205,16 @@ class ChainedIdGenerator(object):
|
|||||||
|
|
||||||
return self._current_max, self.chained_generator.get_current_token()
|
return self._current_max, self.chained_generator.get_current_token()
|
||||||
|
|
||||||
|
def advance(self, token: int):
|
||||||
|
"""Stub implementation for advancing the token when receiving updates
|
||||||
|
over replication; raises an exception as this instance should be the
|
||||||
|
only source of updates.
|
||||||
|
"""
|
||||||
|
|
||||||
|
raise Exception(
|
||||||
|
"Attempted to advance token on source for table %r", self._table
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class MultiWriterIdGenerator:
|
class MultiWriterIdGenerator:
|
||||||
"""An ID generator that tracks a stream that can have multiple writers.
|
"""An ID generator that tracks a stream that can have multiple writers.
|
||||||
|
|||||||
Reference in New Issue
Block a user