Compare commits

...

6 Commits

9 changed files with 237 additions and 147 deletions

View File

@@ -45,16 +45,6 @@ jobs:
- run: poetry run scripts-dev/generate_sample_config.sh --check - run: poetry run scripts-dev/generate_sample_config.sh --check
- run: poetry run scripts-dev/config-lint.sh - run: poetry run scripts-dev/config-lint.sh
check-schema-delta:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.x"
- run: "pip install 'click==8.1.1' 'GitPython>=3.1.20'"
- run: scripts-dev/check_schema_delta.py --force-colors
check-lockfile: check-lockfile:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
@@ -221,7 +211,6 @@ jobs:
- lint-newsfile - lint-newsfile
- lint-pydantic - lint-pydantic
- check-sampleconfig - check-sampleconfig
- check-schema-delta
- check-lockfile - check-lockfile
- lint-clippy - lint-clippy
- lint-rustfmt - lint-rustfmt
@@ -609,6 +598,16 @@ jobs:
- run: cargo bench --no-run - run: cargo bench --no-run
check-schema-delta:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.x"
- run: "pip install 'click==8.1.1' 'GitPython>=3.1.20'"
- run: scripts-dev/check_schema_delta.py --force-colors
# a job which marks all the other jobs as complete, thus allowing PRs to be merged. # a job which marks all the other jobs as complete, thus allowing PRs to be merged.
tests-done: tests-done:
if: ${{ always() }} if: ${{ always() }}

1
changelog.d/15724.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix missing dependencies in background jobs.

View File

@@ -22,7 +22,7 @@ from synapse.storage.database import (
LoggingDatabaseConnection, LoggingDatabaseConnection,
LoggingTransaction, LoggingTransaction,
) )
from synapse.storage.engines import PostgresEngine from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import MutableStateMap, StateMap from synapse.types import MutableStateMap, StateMap
from synapse.types.state import StateFilter from synapse.types.state import StateFilter
from synapse.util.caches import intern_string from synapse.util.caches import intern_string
@@ -328,6 +328,15 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
columns=["event_stream_ordering"], columns=["event_stream_ordering"],
) )
self.db_pool.updates.register_background_update_handler(
"add_event_stream_ordering",
self._add_event_stream_ordering,
)
self.db_pool.updates.register_background_update_handler(
"add_stream_ordering_triggers", self._add_triggers_in_bg
)
async def _background_deduplicate_state( async def _background_deduplicate_state(
self, progress: dict, batch_size: int self, progress: dict, batch_size: int
) -> int: ) -> int:
@@ -504,3 +513,175 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
) )
return 1 return 1
async def _add_event_stream_ordering(self, progress: dict, batch_size: int) -> int:
"""
Add denormalised copies of `stream_ordering` from the corresponding row in `events`
to the tables current_state_events, local_current_membership, and room_memberships.
This is done to improve database performance by reduring JOINs.
"""
tables = [
"current_state_events",
"local_current_membership",
"room_memberships",
]
if isinstance(self.database_engine, PostgresEngine):
def check_pg_column(txn: LoggingTransaction, table: str) -> list:
"""
check if the column event_stream_ordering already exists
"""
check_sql = f"""
SELECT column_name FROM information_schema.columns
WHERE table_name = '{table}' and column_name = 'event_stream_ordering';
"""
txn.execute(check_sql)
column = txn.fetchall()
return column
def add_pg_column(txn: LoggingTransaction, table: str) -> None:
"""
Add column event_stream_ordering to A given table
"""
add_column_sql = f"""
ALTER TABLE {table} ADD COLUMN event_stream_ordering BIGINT;
"""
txn.execute(add_column_sql)
add_fk_sql = f"""
ALTER TABLE {table} ADD CONSTRAINT event_stream_ordering_fkey
FOREIGN KEY(event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID;
"""
txn.execute(add_fk_sql)
for table in tables:
res = await self.db_pool.runInteraction(
"check_column", check_pg_column, table
)
# if the column exists do nothing
if not res:
await self.db_pool.runInteraction(
"add_event_stream_ordering",
add_pg_column,
table,
)
await self.db_pool.updates._end_background_update(
"add_event_stream_ordering"
)
return 1
elif isinstance(self.database_engine, Sqlite3Engine):
def check_sqlite_column(txn: LoggingTransaction, table: str) -> List[tuple]:
"""
Get table info (to see if column event_stream_ordering exists)
"""
check_sql = f"""
PRAGMA table_info({table})
"""
txn.execute(check_sql)
res = txn.fetchall()
return res
def add_sqlite_column(txn: LoggingTransaction, table: str) -> None:
"""
Add column event_stream_ordering to given table
"""
add_column_sql = f"""
ALTER TABLE {table} ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
"""
txn.execute(add_column_sql)
for table in tables:
res = await self.db_pool.runInteraction(
"check_column", check_sqlite_column, table
)
columns = [tup[1] for tup in res]
# if the column exists do nothing
if "event_stream_ordering" not in columns:
await self.db_pool.runInteraction(
"add_event_stream_ordering", add_sqlite_column, table
)
await self.db_pool.updates._end_background_update(
"add_event_stream_ordering"
)
return 1
async def _add_triggers_in_bg(self, progress: dict, batch_size: int) -> int:
"""
Adds triggers to the room membership tables to enforce consistency
"""
# Complain if the `event_stream_ordering` in membership tables doesn't match
# the `stream_ordering` row with the same `event_id` in `events`.
if isinstance(self.database_engine, Sqlite3Engine):
def add_sqlite_triggers(txn: LoggingTransaction) -> None:
for table in (
"current_state_events",
"local_current_membership",
"room_memberships",
):
txn.execute(
f"""
CREATE TRIGGER IF NOT EXISTS {table}_bad_event_stream_ordering
BEFORE INSERT ON {table}
FOR EACH ROW
BEGIN
SELECT RAISE(ABORT, 'Incorrect event_stream_ordering in {table}')
WHERE EXISTS (
SELECT 1 FROM events
WHERE events.event_id = NEW.event_id
AND events.stream_ordering != NEW.event_stream_ordering
);
END;
"""
)
await self.db_pool.runInteraction(
"add_sqlite_triggers", add_sqlite_triggers
)
elif isinstance(self.database_engine, PostgresEngine):
def add_pg_triggers(txn: LoggingTransaction) -> None:
txn.execute(
"""
CREATE OR REPLACE FUNCTION check_event_stream_ordering() RETURNS trigger AS $BODY$
BEGIN
IF EXISTS (
SELECT 1 FROM events
WHERE events.event_id = NEW.event_id
AND events.stream_ordering != NEW.event_stream_ordering
) THEN
RAISE EXCEPTION 'Incorrect event_stream_ordering';
END IF;
RETURN NEW;
END;
$BODY$ LANGUAGE plpgsql;
"""
)
for table in (
"current_state_events",
"local_current_membership",
"room_memberships",
):
txn.execute(
f"""
CREATE TRIGGER check_event_stream_ordering BEFORE INSERT OR UPDATE ON {table}
FOR EACH ROW
EXECUTE PROCEDURE check_event_stream_ordering()
"""
)
await self.db_pool.runInteraction("add_postgres_triggers", add_pg_triggers)
else:
raise NotImplementedError("Unknown database engine")
await self.db_pool.updates._end_background_update(
"add_stream_ordering_triggers"
)
return 1

View File

@@ -0,0 +1,18 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on)
VALUES
(7403, 'add_event_stream_ordering', '{}', 'replace_stream_ordering_column');

View File

@@ -1,29 +0,0 @@
/* Copyright 2022 Beeper
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Each of these are denormalised copies of `stream_ordering` from the corresponding row in` events` which
-- we use to improve database performance by reduring JOINs.
-- NOTE: these are set to NOT VALID to prevent locks while adding the column on large existing tables,
-- which will be validated in a later migration. For all new/updated rows the FKEY will be checked.
ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT;
ALTER TABLE current_state_events ADD CONSTRAINT event_stream_ordering_fkey FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID;
ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT;
ALTER TABLE local_current_membership ADD CONSTRAINT event_stream_ordering_fkey FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID;
ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT;
ALTER TABLE room_memberships ADD CONSTRAINT event_stream_ordering_fkey FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID;

View File

@@ -1,23 +0,0 @@
/* Copyright 2022 Beeper
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Each of these are denormalised copies of `stream_ordering` from the corresponding row in` events` which
-- we use to improve database performance by reduring JOINs.
-- NOTE: sqlite does not support ADD CONSTRAINT so we add the new columns with FK constraint as-is
ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);

View File

@@ -1,79 +0,0 @@
# Copyright 2022 Beeper
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This migration adds triggers to the room membership tables to enforce consistency.
Triggers cannot be expressed in .sql files, so we have to use a separate file.
"""
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
# Complain if the `event_stream_ordering` in membership tables doesn't match
# the `stream_ordering` row with the same `event_id` in `events`.
if isinstance(database_engine, Sqlite3Engine):
for table in (
"current_state_events",
"local_current_membership",
"room_memberships",
):
cur.execute(
f"""
CREATE TRIGGER IF NOT EXISTS {table}_bad_event_stream_ordering
BEFORE INSERT ON {table}
FOR EACH ROW
BEGIN
SELECT RAISE(ABORT, 'Incorrect event_stream_ordering in {table}')
WHERE EXISTS (
SELECT 1 FROM events
WHERE events.event_id = NEW.event_id
AND events.stream_ordering != NEW.event_stream_ordering
);
END;
"""
)
elif isinstance(database_engine, PostgresEngine):
cur.execute(
"""
CREATE OR REPLACE FUNCTION check_event_stream_ordering() RETURNS trigger AS $BODY$
BEGIN
IF EXISTS (
SELECT 1 FROM events
WHERE events.event_id = NEW.event_id
AND events.stream_ordering != NEW.event_stream_ordering
) THEN
RAISE EXCEPTION 'Incorrect event_stream_ordering';
END IF;
RETURN NEW;
END;
$BODY$ LANGUAGE plpgsql;
"""
)
for table in (
"current_state_events",
"local_current_membership",
"room_memberships",
):
cur.execute(
f"""
CREATE TRIGGER check_event_stream_ordering BEFORE INSERT OR UPDATE ON {table}
FOR EACH ROW
EXECUTE PROCEDURE check_event_stream_ordering()
"""
)
else:
raise NotImplementedError("Unknown database engine")

View File

@@ -0,0 +1,22 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- This migration adds triggers to the room membership tables to enforce consistency.
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on)
VALUES
(7404, 'add_stream_ordering_triggers', '{}', 'add_event_stream_ordering');

View File

@@ -13,8 +13,8 @@
* limitations under the License. * limitations under the License.
*/ */
INSERT INTO background_updates (ordering, update_name, progress_json) INSERT INTO background_updates (ordering, update_name, progress_json, depends_on)
VALUES VALUES
(7714, 'current_state_events_stream_ordering_idx', '{}'), (7714, 'current_state_events_stream_ordering_idx', '{}', 'add_event_stream_ordering'),
(7714, 'local_current_membership_stream_ordering_idx', '{}'), (7714, 'local_current_membership_stream_ordering_idx', '{}', 'add_event_stream_ordering'),
(7714, 'room_memberships_stream_ordering_idx', '{}'); (7714, 'room_memberships_stream_ordering_idx', '{}', 'add_event_stream_ordering');