Compare commits

...

2 Commits

Author SHA1 Message Date
Richard van der Hoff
9a9a4d9099 support for dropping reads of state_events and rejections 2022-07-07 17:46:19 +01:00
Richard van der Hoff
c3132e22fd Bg update to populate new events table columns
These columns were added back in Synapse 1.52, and have been populated for new
events since then. It's now (beyond) time to back-populate them for existing
events.
2022-07-07 17:10:19 +01:00
8 changed files with 384 additions and 93 deletions

1
changelog.d/13215.misc Normal file
View File

@@ -0,0 +1 @@
Preparation for database schema simplifications: populate `state_key` and `rejection_reason` for existing rows in the `events` table.

View File

@@ -36,6 +36,10 @@ class SQLBaseStore(metaclass=ABCMeta):
per data store (and not one per physical database).
"""
# if set to False, we will query the `state_events` and `rejections` tables when
# fetching event data. When True, we rely on it all being in the `events` table.
STATE_KEY_IN_EVENTS = False
def __init__(
self,
database: DatabasePool,

View File

@@ -257,17 +257,30 @@ class PersistEventsStore:
def _get_events_which_are_prevs_txn(
txn: LoggingTransaction, batch: Collection[str]
) -> None:
sql = """
SELECT prev_event_id, internal_metadata
FROM event_edges
INNER JOIN events USING (event_id)
LEFT JOIN rejections USING (event_id)
LEFT JOIN event_json USING (event_id)
WHERE
NOT events.outlier
AND rejections.event_id IS NULL
AND
"""
if self.store.STATE_KEY_IN_EVENTS:
sql = """
SELECT prev_event_id, internal_metadata
FROM event_edges
INNER JOIN events USING (event_id)
LEFT JOIN event_json USING (event_id)
WHERE
NOT events.outlier
AND events.rejection_reason IS NULL
AND
"""
else:
sql = """
SELECT prev_event_id, internal_metadata
FROM event_edges
INNER JOIN events USING (event_id)
LEFT JOIN rejections USING (event_id)
LEFT JOIN event_json USING (event_id)
WHERE
NOT events.outlier
AND rejections.event_id IS NULL
AND
"""
clause, args = make_in_list_sql_clause(
self.database_engine, "prev_event_id", batch
@@ -311,7 +324,19 @@ class PersistEventsStore:
) -> None:
to_recursively_check = batch
while to_recursively_check:
if self.store.STATE_KEY_IN_EVENTS:
sql = """
SELECT
event_id, prev_event_id, internal_metadata,
events.rejection_reason IS NOT NULL
FROM event_edges
INNER JOIN events USING (event_id)
LEFT JOIN event_json USING (event_id)
WHERE
NOT events.outlier
AND
"""
else:
sql = """
SELECT
event_id, prev_event_id, internal_metadata,
@@ -325,6 +350,7 @@ class PersistEventsStore:
AND
"""
while to_recursively_check:
clause, args = make_in_list_sql_clause(
self.database_engine, "event_id", to_recursively_check
)
@@ -530,6 +556,7 @@ class PersistEventsStore:
event_to_room_id = {e.event_id: e.room_id for e in state_events.values()}
self._add_chain_cover_index(
self.store.STATE_KEY_IN_EVENTS,
txn,
self.db_pool,
self.store.event_chain_id_gen,
@@ -541,6 +568,7 @@ class PersistEventsStore:
@classmethod
def _add_chain_cover_index(
cls,
state_key_in_events: bool,
txn: LoggingTransaction,
db_pool: DatabasePool,
event_chain_id_gen: SequenceGenerator,
@@ -551,6 +579,8 @@ class PersistEventsStore:
"""Calculate the chain cover index for the given events.
Args:
state_key_in_events: whether to use the `state_key` column in the `events`
table in preference to the `state_events` table
event_to_room_id: Event ID to the room ID of the event
event_to_types: Event ID to type and state_key of the event
event_to_auth_chain: Event ID to list of auth event IDs of the
@@ -610,7 +640,15 @@ class PersistEventsStore:
# We loop here in case we find an out of band membership and need to
# fetch their auth event info.
while missing_auth_chains:
if state_key_in_events:
sql = """
SELECT event_id, events.type, events.state_key, chain_id, sequence_number
FROM events
LEFT JOIN event_auth_chains USING (event_id)
WHERE
events.state_key IS NOT NULL AND
"""
else:
sql = """
SELECT event_id, events.type, se.state_key, chain_id, sequence_number
FROM events
@@ -618,6 +656,8 @@ class PersistEventsStore:
LEFT JOIN event_auth_chains USING (event_id)
WHERE
"""
while missing_auth_chains:
clause, args = make_in_list_sql_clause(
txn.database_engine,
"event_id",
@@ -1641,22 +1681,31 @@ class PersistEventsStore:
) -> None:
to_prefill = []
rows = []
ev_map = {e.event_id: e for e, _ in events_and_contexts}
if not ev_map:
return
sql = (
"SELECT "
" e.event_id as event_id, "
" r.redacts as redacts,"
" rej.event_id as rejects "
" FROM events as e"
" LEFT JOIN rejections as rej USING (event_id)"
" LEFT JOIN redactions as r ON e.event_id = r.redacts"
" WHERE "
)
if self.store.STATE_KEY_IN_EVENTS:
sql = (
"SELECT "
" e.event_id as event_id, "
" r.redacts as redacts,"
" e.rejection_reason as rejects "
" FROM events as e"
" LEFT JOIN redactions as r ON e.event_id = r.redacts"
" WHERE "
)
else:
sql = (
"SELECT "
" e.event_id as event_id, "
" r.redacts as redacts,"
" rej.event_id as rejects "
" FROM events as e"
" LEFT JOIN rejections as rej USING (event_id)"
" LEFT JOIN redactions as r ON e.event_id = r.redacts"
" WHERE "
)
clause, args = make_in_list_sql_clause(
self.database_engine, "e.event_id", list(ev_map)

View File

@@ -67,6 +67,8 @@ class _BackgroundUpdates:
EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"
EVENTS_POPULATE_STATE_KEY_REJECTIONS = "events_populate_state_key_rejections"
@attr.s(slots=True, frozen=True, auto_attribs=True)
class _CalculateChainCover:
@@ -253,6 +255,11 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
replaces_index="ev_edges_id",
)
self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS,
self._background_events_populate_state_key_rejections,
)
async def _background_reindex_fields_sender(
self, progress: JsonDict, batch_size: int
) -> int:
@@ -441,6 +448,10 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
# First, we get `batch_size` events from the table, pulling out
# their successor events, if any, and the successor events'
# rejection status.
# this should happen before the bg update which drops 'rejections'
assert not self.STATE_KEY_IN_EVENTS
txn.execute(
"""SELECT prev_event_id, event_id, internal_metadata,
rejections.event_id IS NOT NULL, events.outlier
@@ -966,6 +977,9 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
extra_clause = "AND events.room_id = ?"
tuple_args.append(last_room_id)
# this should happen before the bg update which drops 'state_events'
assert not self.STATE_KEY_IN_EVENTS
sql = """
SELECT
event_id, state_events.type, state_events.state_key,
@@ -1034,9 +1048,10 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
# Calculate and persist the chain cover index for this set of events.
#
# Annoyingly we need to gut wrench into the persit event store so that
# Annoyingly we need to gut wrench into the persist event store so that
# we can reuse the function to calculate the chain cover for rooms.
PersistEventsStore._add_chain_cover_index(
False,
txn,
self.db_pool,
self.event_chain_id_gen, # type: ignore[attr-defined]
@@ -1399,3 +1414,95 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
)
return batch_size
async def _background_events_populate_state_key_rejections(
self, progress: JsonDict, batch_size: int
) -> int:
"""Back-populate `events.state_key` and `events.rejection_reason"""
min_stream_ordering_exclusive = progress["min_stream_ordering_exclusive"]
max_stream_ordering_inclusive = progress["max_stream_ordering_inclusive"]
def _populate_txn(txn: LoggingTransaction) -> bool:
"""Returns True if we're done."""
# first we need to find an endpoint.
# we need to find the final row in the batch of batch_size, which means
# we need to skip over (batch_size-1) rows and get the next row.
txn.execute(
"""
SELECT stream_ordering FROM events
WHERE stream_ordering > ? AND stream_ordering <= ?
ORDER BY stream_ordering
LIMIT 1 OFFSET ?
""",
(
min_stream_ordering_exclusive,
max_stream_ordering_inclusive,
batch_size - 1,
),
)
endpoint = None
row = txn.fetchone()
if row:
endpoint = row[0]
where_clause = "e.stream_ordering > ?"
args = [min_stream_ordering_exclusive]
if endpoint:
where_clause += " AND e.stream_ordering <= ?"
args.append(endpoint)
# now do the updates. We consider rows within our range of stream orderings,
# but only those with a non-null rejection reason or state_key (since there
# is nothing to update for rows where rejection reason and state_key are
# both null.
txn.execute(
f"""
WITH t AS (
SELECT e.event_id, r.reason, se.state_key
FROM events e
LEFT JOIN rejections r USING (event_id)
LEFT JOIN state_events se USING (event_id)
WHERE ({where_clause}) AND (
r.reason IS NOT NULL OR se.state_key IS NOT NULL
)
)
UPDATE events
SET rejection_reason=t.reason, state_key=t.state_key
FROM t WHERE events.event_id = t.event_id
""",
args,
)
logger.info(
"populated new `events` columns up to %s/%i: updated %i/%i rows",
endpoint,
max_stream_ordering_inclusive,
txn.rowcount,
batch_size,
)
if endpoint is None:
# we're done
return True
progress["min_stream_ordering_exclusive"] = endpoint
self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS,
progress,
)
return False
done = await self.db_pool.runInteraction(
desc="events_populate_state_key_rejections", func=_populate_txn
)
if done:
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS
)
return batch_size

View File

@@ -1482,20 +1482,35 @@ class EventsWorkerStore(SQLBaseStore):
def get_all_new_forward_event_rows(
txn: LoggingTransaction,
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
sql = (
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" LEFT JOIN room_memberships USING (event_id)"
" LEFT JOIN rejections USING (event_id)"
" WHERE ? < stream_ordering AND stream_ordering <= ?"
" AND instance_name = ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
)
if self.STATE_KEY_IN_EVENTS:
sql = (
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
" e.state_key, redacts, relates_to_id, membership, e.rejection_reason IS NOT NULL"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" LEFT JOIN room_memberships USING (event_id)"
" WHERE ? < stream_ordering AND stream_ordering <= ?"
" AND instance_name = ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
)
else:
sql = (
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" LEFT JOIN room_memberships USING (event_id)"
" LEFT JOIN rejections USING (event_id)"
" WHERE ? < stream_ordering AND stream_ordering <= ?"
" AND instance_name = ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
)
txn.execute(sql, (last_id, current_id, instance_name, limit))
return cast(
List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall()
@@ -1523,21 +1538,36 @@ class EventsWorkerStore(SQLBaseStore):
def get_ex_outlier_stream_rows_txn(
txn: LoggingTransaction,
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
sql = (
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" FROM events AS e"
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" LEFT JOIN room_memberships USING (event_id)"
" LEFT JOIN rejections USING (event_id)"
" WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?"
" AND out.instance_name = ?"
" ORDER BY event_stream_ordering ASC"
)
if self.STATE_KEY_IN_EVENTS:
sql = (
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
" e.state_key, redacts, relates_to_id, membership, e.rejection_reason IS NOT NULL"
" FROM events AS e"
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" LEFT JOIN room_memberships USING (event_id)"
" WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?"
" AND out.instance_name = ?"
" ORDER BY event_stream_ordering ASC"
)
else:
sql = (
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" FROM events AS e"
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" LEFT JOIN room_memberships USING (event_id)"
" LEFT JOIN rejections USING (event_id)"
" WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?"
" AND out.instance_name = ?"
" ORDER BY event_stream_ordering ASC"
)
txn.execute(sql, (last_id, current_id, instance_name))
return cast(
@@ -1581,18 +1611,32 @@ class EventsWorkerStore(SQLBaseStore):
def get_all_new_backfill_event_rows(
txn: LoggingTransaction,
) -> Tuple[List[Tuple[int, Tuple[str, str, str, str, str, str]]], int, bool]:
sql = (
"SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? > stream_ordering AND stream_ordering >= ?"
" AND instance_name = ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
)
if self.STATE_KEY_IN_EVENTS:
sql = (
"SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
" e.state_key, redacts, relates_to_id"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? > stream_ordering AND stream_ordering >= ?"
" AND instance_name = ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
)
else:
sql = (
"SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? > stream_ordering AND stream_ordering >= ?"
" AND instance_name = ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
)
txn.execute(sql, (-last_id, -current_id, instance_name, limit))
new_event_updates: List[
Tuple[int, Tuple[str, str, str, str, str, str]]
@@ -1611,19 +1655,34 @@ class EventsWorkerStore(SQLBaseStore):
else:
upper_bound = current_id
sql = (
"SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id"
" FROM events AS e"
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? > event_stream_ordering"
" AND event_stream_ordering >= ?"
" AND out.instance_name = ?"
" ORDER BY event_stream_ordering DESC"
)
if self.STATE_KEY_IN_EVENTS:
sql = (
"SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
" e.state_key, redacts, relates_to_id"
" FROM events AS e"
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? > event_stream_ordering"
" AND event_stream_ordering >= ?"
" AND out.instance_name = ?"
" ORDER BY event_stream_ordering DESC"
)
else:
sql = (
"SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id"
" FROM events AS e"
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? > event_stream_ordering"
" AND event_stream_ordering >= ?"
" AND out.instance_name = ?"
" ORDER BY event_stream_ordering DESC"
)
txn.execute(sql, (-last_id, -upper_bound, instance_name))
# Type safety: iterating over `txn` yields `Tuple`, i.e.
# `Tuple[Any, ...]` of arbitrary length. Mypy detects assigning a

View File

@@ -122,7 +122,11 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
logger.info("[purge] looking for events to delete")
should_delete_expr = "state_events.state_key IS NULL"
should_delete_expr = (
"e.state_key IS NULL"
if self.STATE_KEY_IN_EVENTS
else "state_events.state_key IS NULL"
)
should_delete_params: Tuple[Any, ...] = ()
if not delete_local_events:
should_delete_expr += " AND event_id NOT LIKE ?"
@@ -134,12 +138,23 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# Note that we insert events that are outliers and aren't going to be
# deleted, as nothing will happen to them.
if self.STATE_KEY_IN_EVENTS:
sqlf = """
INSERT INTO events_to_purge
SELECT event_id, %s
FROM events AS e
WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?
"""
else:
sqlf = """
INSERT INTO events_to_purge
SELECT event_id, %s
FROM events AS e LEFT JOIN state_events USING (event_id)
WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?
"""
txn.execute(
"INSERT INTO events_to_purge"
" SELECT event_id, %s"
" FROM events AS e LEFT JOIN state_events USING (event_id)"
" WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?"
% (should_delete_expr, should_delete_expr),
sqlf % (should_delete_expr, should_delete_expr),
should_delete_params,
)

View File

@@ -22,10 +22,19 @@ logger = logging.getLogger(__name__)
class RejectionsStore(SQLBaseStore):
async def get_rejection_reason(self, event_id: str) -> Optional[str]:
return await self.db_pool.simple_select_one_onecol(
table="rejections",
retcol="reason",
keyvalues={"event_id": event_id},
allow_none=True,
desc="get_rejection_reason",
)
if self.STATE_KEY_IN_EVENTS:
return await self.db_pool.simple_select_one_onecol(
table="events",
retcol="rejection_reason",
keyvalues={"event_id": event_id},
allow_none=True,
desc="get_rejection_reason",
)
else:
return await self.db_pool.simple_select_one_onecol(
table="rejections",
retcol="reason",
keyvalues={"event_id": event_id},
allow_none=True,
desc="get_rejection_reason",
)

View File

@@ -0,0 +1,47 @@
# Copyright 2022 The Matrix.org Foundation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from synapse.storage.types import Cursor
def run_create(cur: Cursor, database_engine, *args, **kwargs):
"""Add a bg update to populate the `state_key` and `rejection_reason` columns of `events`"""
# we know that any new events will have the columns populated (and that has been
# the case since schema_version 68, so there is no chance of rolling back now).
#
# So, we only need to make sure that existing rows are updated. We read the
# current min and max stream orderings, since that is guaranteed to include all
# the events that were stored before the new columns were added.
cur.execute("SELECT MIN(stream_ordering), MAX(stream_ordering) FROM events")
(min_stream_ordering, max_stream_ordering) = cur.fetchone()
if min_stream_ordering is None:
# no rows, nothing to do.
return
cur.execute(
"INSERT into background_updates (ordering, update_name, progress_json)"
" VALUES (7203, 'events_populate_state_key_rejections', ?)",
(
json.dumps(
{
"min_stream_ordering_exclusive": min_stream_ordering - 1,
"max_stream_ordering_inclusive": max_stream_ordering,
}
),
),
)