mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Compare commits
2 Commits
anoa/updat
...
rav/drop_s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9a9a4d9099 | ||
|
|
c3132e22fd |
1
changelog.d/13215.misc
Normal file
1
changelog.d/13215.misc
Normal file
@@ -0,0 +1 @@
|
||||
Preparation for database schema simplifications: populate `state_key` and `rejection_reason` for existing rows in the `events` table.
|
||||
@@ -36,6 +36,10 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||
per data store (and not one per physical database).
|
||||
"""
|
||||
|
||||
# if set to False, we will query the `state_events` and `rejections` tables when
|
||||
# fetching event data. When True, we rely on it all being in the `events` table.
|
||||
STATE_KEY_IN_EVENTS = False
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
database: DatabasePool,
|
||||
|
||||
@@ -257,17 +257,30 @@ class PersistEventsStore:
|
||||
def _get_events_which_are_prevs_txn(
|
||||
txn: LoggingTransaction, batch: Collection[str]
|
||||
) -> None:
|
||||
sql = """
|
||||
SELECT prev_event_id, internal_metadata
|
||||
FROM event_edges
|
||||
INNER JOIN events USING (event_id)
|
||||
LEFT JOIN rejections USING (event_id)
|
||||
LEFT JOIN event_json USING (event_id)
|
||||
WHERE
|
||||
NOT events.outlier
|
||||
AND rejections.event_id IS NULL
|
||||
AND
|
||||
"""
|
||||
if self.store.STATE_KEY_IN_EVENTS:
|
||||
sql = """
|
||||
SELECT prev_event_id, internal_metadata
|
||||
FROM event_edges
|
||||
INNER JOIN events USING (event_id)
|
||||
LEFT JOIN event_json USING (event_id)
|
||||
WHERE
|
||||
NOT events.outlier
|
||||
AND events.rejection_reason IS NULL
|
||||
AND
|
||||
"""
|
||||
|
||||
else:
|
||||
sql = """
|
||||
SELECT prev_event_id, internal_metadata
|
||||
FROM event_edges
|
||||
INNER JOIN events USING (event_id)
|
||||
LEFT JOIN rejections USING (event_id)
|
||||
LEFT JOIN event_json USING (event_id)
|
||||
WHERE
|
||||
NOT events.outlier
|
||||
AND rejections.event_id IS NULL
|
||||
AND
|
||||
"""
|
||||
|
||||
clause, args = make_in_list_sql_clause(
|
||||
self.database_engine, "prev_event_id", batch
|
||||
@@ -311,7 +324,19 @@ class PersistEventsStore:
|
||||
) -> None:
|
||||
to_recursively_check = batch
|
||||
|
||||
while to_recursively_check:
|
||||
if self.store.STATE_KEY_IN_EVENTS:
|
||||
sql = """
|
||||
SELECT
|
||||
event_id, prev_event_id, internal_metadata,
|
||||
events.rejection_reason IS NOT NULL
|
||||
FROM event_edges
|
||||
INNER JOIN events USING (event_id)
|
||||
LEFT JOIN event_json USING (event_id)
|
||||
WHERE
|
||||
NOT events.outlier
|
||||
AND
|
||||
"""
|
||||
else:
|
||||
sql = """
|
||||
SELECT
|
||||
event_id, prev_event_id, internal_metadata,
|
||||
@@ -325,6 +350,7 @@ class PersistEventsStore:
|
||||
AND
|
||||
"""
|
||||
|
||||
while to_recursively_check:
|
||||
clause, args = make_in_list_sql_clause(
|
||||
self.database_engine, "event_id", to_recursively_check
|
||||
)
|
||||
@@ -530,6 +556,7 @@ class PersistEventsStore:
|
||||
event_to_room_id = {e.event_id: e.room_id for e in state_events.values()}
|
||||
|
||||
self._add_chain_cover_index(
|
||||
self.store.STATE_KEY_IN_EVENTS,
|
||||
txn,
|
||||
self.db_pool,
|
||||
self.store.event_chain_id_gen,
|
||||
@@ -541,6 +568,7 @@ class PersistEventsStore:
|
||||
@classmethod
|
||||
def _add_chain_cover_index(
|
||||
cls,
|
||||
state_key_in_events: bool,
|
||||
txn: LoggingTransaction,
|
||||
db_pool: DatabasePool,
|
||||
event_chain_id_gen: SequenceGenerator,
|
||||
@@ -551,6 +579,8 @@ class PersistEventsStore:
|
||||
"""Calculate the chain cover index for the given events.
|
||||
|
||||
Args:
|
||||
state_key_in_events: whether to use the `state_key` column in the `events`
|
||||
table in preference to the `state_events` table
|
||||
event_to_room_id: Event ID to the room ID of the event
|
||||
event_to_types: Event ID to type and state_key of the event
|
||||
event_to_auth_chain: Event ID to list of auth event IDs of the
|
||||
@@ -610,7 +640,15 @@ class PersistEventsStore:
|
||||
|
||||
# We loop here in case we find an out of band membership and need to
|
||||
# fetch their auth event info.
|
||||
while missing_auth_chains:
|
||||
if state_key_in_events:
|
||||
sql = """
|
||||
SELECT event_id, events.type, events.state_key, chain_id, sequence_number
|
||||
FROM events
|
||||
LEFT JOIN event_auth_chains USING (event_id)
|
||||
WHERE
|
||||
events.state_key IS NOT NULL AND
|
||||
"""
|
||||
else:
|
||||
sql = """
|
||||
SELECT event_id, events.type, se.state_key, chain_id, sequence_number
|
||||
FROM events
|
||||
@@ -618,6 +656,8 @@ class PersistEventsStore:
|
||||
LEFT JOIN event_auth_chains USING (event_id)
|
||||
WHERE
|
||||
"""
|
||||
|
||||
while missing_auth_chains:
|
||||
clause, args = make_in_list_sql_clause(
|
||||
txn.database_engine,
|
||||
"event_id",
|
||||
@@ -1641,22 +1681,31 @@ class PersistEventsStore:
|
||||
) -> None:
|
||||
to_prefill = []
|
||||
|
||||
rows = []
|
||||
|
||||
ev_map = {e.event_id: e for e, _ in events_and_contexts}
|
||||
if not ev_map:
|
||||
return
|
||||
|
||||
sql = (
|
||||
"SELECT "
|
||||
" e.event_id as event_id, "
|
||||
" r.redacts as redacts,"
|
||||
" rej.event_id as rejects "
|
||||
" FROM events as e"
|
||||
" LEFT JOIN rejections as rej USING (event_id)"
|
||||
" LEFT JOIN redactions as r ON e.event_id = r.redacts"
|
||||
" WHERE "
|
||||
)
|
||||
if self.store.STATE_KEY_IN_EVENTS:
|
||||
sql = (
|
||||
"SELECT "
|
||||
" e.event_id as event_id, "
|
||||
" r.redacts as redacts,"
|
||||
" e.rejection_reason as rejects "
|
||||
" FROM events as e"
|
||||
" LEFT JOIN redactions as r ON e.event_id = r.redacts"
|
||||
" WHERE "
|
||||
)
|
||||
else:
|
||||
sql = (
|
||||
"SELECT "
|
||||
" e.event_id as event_id, "
|
||||
" r.redacts as redacts,"
|
||||
" rej.event_id as rejects "
|
||||
" FROM events as e"
|
||||
" LEFT JOIN rejections as rej USING (event_id)"
|
||||
" LEFT JOIN redactions as r ON e.event_id = r.redacts"
|
||||
" WHERE "
|
||||
)
|
||||
|
||||
clause, args = make_in_list_sql_clause(
|
||||
self.database_engine, "e.event_id", list(ev_map)
|
||||
|
||||
@@ -67,6 +67,8 @@ class _BackgroundUpdates:
|
||||
EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
|
||||
EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"
|
||||
|
||||
EVENTS_POPULATE_STATE_KEY_REJECTIONS = "events_populate_state_key_rejections"
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class _CalculateChainCover:
|
||||
@@ -253,6 +255,11 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||
replaces_index="ev_edges_id",
|
||||
)
|
||||
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS,
|
||||
self._background_events_populate_state_key_rejections,
|
||||
)
|
||||
|
||||
async def _background_reindex_fields_sender(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
@@ -441,6 +448,10 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||
# First, we get `batch_size` events from the table, pulling out
|
||||
# their successor events, if any, and the successor events'
|
||||
# rejection status.
|
||||
|
||||
# this should happen before the bg update which drops 'rejections'
|
||||
assert not self.STATE_KEY_IN_EVENTS
|
||||
|
||||
txn.execute(
|
||||
"""SELECT prev_event_id, event_id, internal_metadata,
|
||||
rejections.event_id IS NOT NULL, events.outlier
|
||||
@@ -966,6 +977,9 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||
extra_clause = "AND events.room_id = ?"
|
||||
tuple_args.append(last_room_id)
|
||||
|
||||
# this should happen before the bg update which drops 'state_events'
|
||||
assert not self.STATE_KEY_IN_EVENTS
|
||||
|
||||
sql = """
|
||||
SELECT
|
||||
event_id, state_events.type, state_events.state_key,
|
||||
@@ -1034,9 +1048,10 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||
|
||||
# Calculate and persist the chain cover index for this set of events.
|
||||
#
|
||||
# Annoyingly we need to gut wrench into the persit event store so that
|
||||
# Annoyingly we need to gut wrench into the persist event store so that
|
||||
# we can reuse the function to calculate the chain cover for rooms.
|
||||
PersistEventsStore._add_chain_cover_index(
|
||||
False,
|
||||
txn,
|
||||
self.db_pool,
|
||||
self.event_chain_id_gen, # type: ignore[attr-defined]
|
||||
@@ -1399,3 +1414,95 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||
)
|
||||
|
||||
return batch_size
|
||||
|
||||
async def _background_events_populate_state_key_rejections(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
"""Back-populate `events.state_key` and `events.rejection_reason"""
|
||||
|
||||
min_stream_ordering_exclusive = progress["min_stream_ordering_exclusive"]
|
||||
max_stream_ordering_inclusive = progress["max_stream_ordering_inclusive"]
|
||||
|
||||
def _populate_txn(txn: LoggingTransaction) -> bool:
|
||||
"""Returns True if we're done."""
|
||||
|
||||
# first we need to find an endpoint.
|
||||
# we need to find the final row in the batch of batch_size, which means
|
||||
# we need to skip over (batch_size-1) rows and get the next row.
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT stream_ordering FROM events
|
||||
WHERE stream_ordering > ? AND stream_ordering <= ?
|
||||
ORDER BY stream_ordering
|
||||
LIMIT 1 OFFSET ?
|
||||
""",
|
||||
(
|
||||
min_stream_ordering_exclusive,
|
||||
max_stream_ordering_inclusive,
|
||||
batch_size - 1,
|
||||
),
|
||||
)
|
||||
|
||||
endpoint = None
|
||||
row = txn.fetchone()
|
||||
if row:
|
||||
endpoint = row[0]
|
||||
|
||||
where_clause = "e.stream_ordering > ?"
|
||||
args = [min_stream_ordering_exclusive]
|
||||
if endpoint:
|
||||
where_clause += " AND e.stream_ordering <= ?"
|
||||
args.append(endpoint)
|
||||
|
||||
# now do the updates. We consider rows within our range of stream orderings,
|
||||
# but only those with a non-null rejection reason or state_key (since there
|
||||
# is nothing to update for rows where rejection reason and state_key are
|
||||
# both null.
|
||||
txn.execute(
|
||||
f"""
|
||||
WITH t AS (
|
||||
SELECT e.event_id, r.reason, se.state_key
|
||||
FROM events e
|
||||
LEFT JOIN rejections r USING (event_id)
|
||||
LEFT JOIN state_events se USING (event_id)
|
||||
WHERE ({where_clause}) AND (
|
||||
r.reason IS NOT NULL OR se.state_key IS NOT NULL
|
||||
)
|
||||
)
|
||||
UPDATE events
|
||||
SET rejection_reason=t.reason, state_key=t.state_key
|
||||
FROM t WHERE events.event_id = t.event_id
|
||||
""",
|
||||
args,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"populated new `events` columns up to %s/%i: updated %i/%i rows",
|
||||
endpoint,
|
||||
max_stream_ordering_inclusive,
|
||||
txn.rowcount,
|
||||
batch_size,
|
||||
)
|
||||
|
||||
if endpoint is None:
|
||||
# we're done
|
||||
return True
|
||||
|
||||
progress["min_stream_ordering_exclusive"] = endpoint
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn,
|
||||
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS,
|
||||
progress,
|
||||
)
|
||||
return False
|
||||
|
||||
done = await self.db_pool.runInteraction(
|
||||
desc="events_populate_state_key_rejections", func=_populate_txn
|
||||
)
|
||||
|
||||
if done:
|
||||
await self.db_pool.updates._end_background_update(
|
||||
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS
|
||||
)
|
||||
|
||||
return batch_size
|
||||
|
||||
@@ -1482,20 +1482,35 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
def get_all_new_forward_event_rows(
|
||||
txn: LoggingTransaction,
|
||||
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
|
||||
sql = (
|
||||
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
|
||||
" FROM events AS e"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events AS se USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" LEFT JOIN room_memberships USING (event_id)"
|
||||
" LEFT JOIN rejections USING (event_id)"
|
||||
" WHERE ? < stream_ordering AND stream_ordering <= ?"
|
||||
" AND instance_name = ?"
|
||||
" ORDER BY stream_ordering ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
if self.STATE_KEY_IN_EVENTS:
|
||||
sql = (
|
||||
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" e.state_key, redacts, relates_to_id, membership, e.rejection_reason IS NOT NULL"
|
||||
" FROM events AS e"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" LEFT JOIN room_memberships USING (event_id)"
|
||||
" WHERE ? < stream_ordering AND stream_ordering <= ?"
|
||||
" AND instance_name = ?"
|
||||
" ORDER BY stream_ordering ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
else:
|
||||
sql = (
|
||||
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
|
||||
" FROM events AS e"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events AS se USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" LEFT JOIN room_memberships USING (event_id)"
|
||||
" LEFT JOIN rejections USING (event_id)"
|
||||
" WHERE ? < stream_ordering AND stream_ordering <= ?"
|
||||
" AND instance_name = ?"
|
||||
" ORDER BY stream_ordering ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
|
||||
txn.execute(sql, (last_id, current_id, instance_name, limit))
|
||||
return cast(
|
||||
List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall()
|
||||
@@ -1523,21 +1538,36 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
def get_ex_outlier_stream_rows_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
|
||||
sql = (
|
||||
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
|
||||
" FROM events AS e"
|
||||
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events AS se USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" LEFT JOIN room_memberships USING (event_id)"
|
||||
" LEFT JOIN rejections USING (event_id)"
|
||||
" WHERE ? < event_stream_ordering"
|
||||
" AND event_stream_ordering <= ?"
|
||||
" AND out.instance_name = ?"
|
||||
" ORDER BY event_stream_ordering ASC"
|
||||
)
|
||||
if self.STATE_KEY_IN_EVENTS:
|
||||
sql = (
|
||||
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" e.state_key, redacts, relates_to_id, membership, e.rejection_reason IS NOT NULL"
|
||||
" FROM events AS e"
|
||||
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" LEFT JOIN room_memberships USING (event_id)"
|
||||
" WHERE ? < event_stream_ordering"
|
||||
" AND event_stream_ordering <= ?"
|
||||
" AND out.instance_name = ?"
|
||||
" ORDER BY event_stream_ordering ASC"
|
||||
)
|
||||
else:
|
||||
sql = (
|
||||
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
|
||||
" FROM events AS e"
|
||||
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events AS se USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" LEFT JOIN room_memberships USING (event_id)"
|
||||
" LEFT JOIN rejections USING (event_id)"
|
||||
" WHERE ? < event_stream_ordering"
|
||||
" AND event_stream_ordering <= ?"
|
||||
" AND out.instance_name = ?"
|
||||
" ORDER BY event_stream_ordering ASC"
|
||||
)
|
||||
|
||||
txn.execute(sql, (last_id, current_id, instance_name))
|
||||
return cast(
|
||||
@@ -1581,18 +1611,32 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
def get_all_new_backfill_event_rows(
|
||||
txn: LoggingTransaction,
|
||||
) -> Tuple[List[Tuple[int, Tuple[str, str, str, str, str, str]]], int, bool]:
|
||||
sql = (
|
||||
"SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" se.state_key, redacts, relates_to_id"
|
||||
" FROM events AS e"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events AS se USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" WHERE ? > stream_ordering AND stream_ordering >= ?"
|
||||
" AND instance_name = ?"
|
||||
" ORDER BY stream_ordering ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
if self.STATE_KEY_IN_EVENTS:
|
||||
sql = (
|
||||
"SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" e.state_key, redacts, relates_to_id"
|
||||
" FROM events AS e"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" WHERE ? > stream_ordering AND stream_ordering >= ?"
|
||||
" AND instance_name = ?"
|
||||
" ORDER BY stream_ordering ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
else:
|
||||
sql = (
|
||||
"SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" se.state_key, redacts, relates_to_id"
|
||||
" FROM events AS e"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events AS se USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" WHERE ? > stream_ordering AND stream_ordering >= ?"
|
||||
" AND instance_name = ?"
|
||||
" ORDER BY stream_ordering ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
|
||||
txn.execute(sql, (-last_id, -current_id, instance_name, limit))
|
||||
new_event_updates: List[
|
||||
Tuple[int, Tuple[str, str, str, str, str, str]]
|
||||
@@ -1611,19 +1655,34 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
else:
|
||||
upper_bound = current_id
|
||||
|
||||
sql = (
|
||||
"SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" se.state_key, redacts, relates_to_id"
|
||||
" FROM events AS e"
|
||||
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events AS se USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" WHERE ? > event_stream_ordering"
|
||||
" AND event_stream_ordering >= ?"
|
||||
" AND out.instance_name = ?"
|
||||
" ORDER BY event_stream_ordering DESC"
|
||||
)
|
||||
if self.STATE_KEY_IN_EVENTS:
|
||||
sql = (
|
||||
"SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" e.state_key, redacts, relates_to_id"
|
||||
" FROM events AS e"
|
||||
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" WHERE ? > event_stream_ordering"
|
||||
" AND event_stream_ordering >= ?"
|
||||
" AND out.instance_name = ?"
|
||||
" ORDER BY event_stream_ordering DESC"
|
||||
)
|
||||
else:
|
||||
sql = (
|
||||
"SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" se.state_key, redacts, relates_to_id"
|
||||
" FROM events AS e"
|
||||
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events AS se USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" WHERE ? > event_stream_ordering"
|
||||
" AND event_stream_ordering >= ?"
|
||||
" AND out.instance_name = ?"
|
||||
" ORDER BY event_stream_ordering DESC"
|
||||
)
|
||||
|
||||
txn.execute(sql, (-last_id, -upper_bound, instance_name))
|
||||
# Type safety: iterating over `txn` yields `Tuple`, i.e.
|
||||
# `Tuple[Any, ...]` of arbitrary length. Mypy detects assigning a
|
||||
|
||||
@@ -122,7 +122,11 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||
|
||||
logger.info("[purge] looking for events to delete")
|
||||
|
||||
should_delete_expr = "state_events.state_key IS NULL"
|
||||
should_delete_expr = (
|
||||
"e.state_key IS NULL"
|
||||
if self.STATE_KEY_IN_EVENTS
|
||||
else "state_events.state_key IS NULL"
|
||||
)
|
||||
should_delete_params: Tuple[Any, ...] = ()
|
||||
if not delete_local_events:
|
||||
should_delete_expr += " AND event_id NOT LIKE ?"
|
||||
@@ -134,12 +138,23 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||
|
||||
# Note that we insert events that are outliers and aren't going to be
|
||||
# deleted, as nothing will happen to them.
|
||||
if self.STATE_KEY_IN_EVENTS:
|
||||
sqlf = """
|
||||
INSERT INTO events_to_purge
|
||||
SELECT event_id, %s
|
||||
FROM events AS e
|
||||
WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?
|
||||
"""
|
||||
else:
|
||||
sqlf = """
|
||||
INSERT INTO events_to_purge
|
||||
SELECT event_id, %s
|
||||
FROM events AS e LEFT JOIN state_events USING (event_id)
|
||||
WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?
|
||||
"""
|
||||
|
||||
txn.execute(
|
||||
"INSERT INTO events_to_purge"
|
||||
" SELECT event_id, %s"
|
||||
" FROM events AS e LEFT JOIN state_events USING (event_id)"
|
||||
" WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?"
|
||||
% (should_delete_expr, should_delete_expr),
|
||||
sqlf % (should_delete_expr, should_delete_expr),
|
||||
should_delete_params,
|
||||
)
|
||||
|
||||
|
||||
@@ -22,10 +22,19 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
class RejectionsStore(SQLBaseStore):
|
||||
async def get_rejection_reason(self, event_id: str) -> Optional[str]:
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="rejections",
|
||||
retcol="reason",
|
||||
keyvalues={"event_id": event_id},
|
||||
allow_none=True,
|
||||
desc="get_rejection_reason",
|
||||
)
|
||||
if self.STATE_KEY_IN_EVENTS:
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="events",
|
||||
retcol="rejection_reason",
|
||||
keyvalues={"event_id": event_id},
|
||||
allow_none=True,
|
||||
desc="get_rejection_reason",
|
||||
)
|
||||
else:
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="rejections",
|
||||
retcol="reason",
|
||||
keyvalues={"event_id": event_id},
|
||||
allow_none=True,
|
||||
desc="get_rejection_reason",
|
||||
)
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
# Copyright 2022 The Matrix.org Foundation C.I.C
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
|
||||
from synapse.storage.types import Cursor
|
||||
|
||||
|
||||
def run_create(cur: Cursor, database_engine, *args, **kwargs):
|
||||
"""Add a bg update to populate the `state_key` and `rejection_reason` columns of `events`"""
|
||||
|
||||
# we know that any new events will have the columns populated (and that has been
|
||||
# the case since schema_version 68, so there is no chance of rolling back now).
|
||||
#
|
||||
# So, we only need to make sure that existing rows are updated. We read the
|
||||
# current min and max stream orderings, since that is guaranteed to include all
|
||||
# the events that were stored before the new columns were added.
|
||||
cur.execute("SELECT MIN(stream_ordering), MAX(stream_ordering) FROM events")
|
||||
(min_stream_ordering, max_stream_ordering) = cur.fetchone()
|
||||
|
||||
if min_stream_ordering is None:
|
||||
# no rows, nothing to do.
|
||||
return
|
||||
|
||||
cur.execute(
|
||||
"INSERT into background_updates (ordering, update_name, progress_json)"
|
||||
" VALUES (7203, 'events_populate_state_key_rejections', ?)",
|
||||
(
|
||||
json.dumps(
|
||||
{
|
||||
"min_stream_ordering_exclusive": min_stream_ordering - 1,
|
||||
"max_stream_ordering_inclusive": max_stream_ordering,
|
||||
}
|
||||
),
|
||||
),
|
||||
)
|
||||
Reference in New Issue
Block a user