Compare commits

...

11 Commits

Author SHA1 Message Date
Andrew Morgan
866a592a6f Merge branch 'anoa/backfill_event_cache' into anoa/test_me 2022-10-18 09:55:45 +01:00
Andrew Morgan
e893697a94 Merge branch 'anoa/have_seen_events_no_cache' into anoa/test_me 2022-10-18 09:55:38 +01:00
Andrew Morgan
42f14104e3 fix wording 2022-10-18 09:50:37 +01:00
Andrew Morgan
52387d783c Add a test for backfill ignoring events we already have
And explicitly ensure that it ignores get_event_cache for now.
2022-10-18 09:41:51 +01:00
Andrew Morgan
a9709f0782 Fix up docstring 2022-10-18 09:23:50 +01:00
Andrew Morgan
74812f9e18 Add missing 'continue' 2022-10-18 09:23:37 +01:00
Andrew Morgan
1d24bd394d Remove test that checked for optimisation
a bit depressing
2022-10-12 19:07:02 +01:00
Andrew Morgan
4d80032518 changelog 2022-10-12 18:58:27 +01:00
Andrew Morgan
222d270e8a changelog 2022-10-12 18:31:23 +01:00
Andrew Morgan
46f5f30c15 Remove _get_events_cache check from _have_seen_events_dict
Checking this cache is currently an invalid assumption, as the _get_event_cache is not correctly
invalidated when purging events from a room. Remove this optimisation for now as its causing
more harm than good.

We can re-add it after fixing _get_event_cache.
2022-10-12 18:14:37 +01:00
Andrew Morgan
fc2286e9d4 Avoid checking the event cache when backfilling events 2022-10-12 18:11:40 +01:00
6 changed files with 154 additions and 45 deletions

1
changelog.d/14161.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a bug introduced in 1.30.0 where purging and rejoining a room without restarting in-between would result in a broken room.

1
changelog.d/14164.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a bug introduced in 1.30.0 where purging and rejoining a room without restarting in-between would result in a broken room.

View File

@@ -798,9 +798,42 @@ class FederationEventHandler:
],
)
# Check if we already any of these have these events.
# Note: we currently make a lookup in the database directly here rather than
# checking the event cache, due to:
# https://github.com/matrix-org/synapse/issues/13476
existing_events_map = await self._store._get_events_from_db(
[event.event_id for event in events]
)
new_events = []
for event in events:
event_id = event.event_id
# If we've already seen this event ID...
if event_id in existing_events_map:
existing_event = existing_events_map[event_id]
# ...and the event itself was not previously stored as an outlier...
if not existing_event.event.internal_metadata.is_outlier():
# ...then there's no need to persist it. We have it already.
logger.info(
"_process_pulled_event: Ignoring received event %s which we "
"have already seen",
event.event_id,
)
continue
# While we have seen this event before, it was stored as an outlier.
# We'll now persist it as a non-outlier.
logger.info("De-outliering event %s", event_id)
# Continue on with the events that are new to us.
new_events.append(event)
# We want to sort these by depth so we process them and
# tell clients about them in order.
sorted_events = sorted(events, key=lambda x: x.depth)
sorted_events = sorted(new_events, key=lambda x: x.depth)
for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)
@@ -852,18 +885,6 @@ class FederationEventHandler:
event_id = event.event_id
existing = await self._store.get_event(
event_id, allow_none=True, allow_rejected=True
)
if existing:
if not existing.internal_metadata.is_outlier():
logger.info(
"_process_pulled_event: Ignoring received event %s which we have already seen",
event_id,
)
return
logger.info("De-outliering event %s", event_id)
try:
self._sanity_check_event(event)
except SynapseError as err:

View File

@@ -374,7 +374,7 @@ class EventsWorkerStore(SQLBaseStore):
If there is a mismatch, behave as per allow_none.
Returns:
The event, or None if the event was not found.
The event, or None if the event was not found and allow_none is `True`.
"""
if not isinstance(event_id, str):
raise TypeError("Invalid event event_id %r" % (event_id,))
@@ -1502,21 +1502,15 @@ class EventsWorkerStore(SQLBaseStore):
Returns:
a dict {event_id -> bool}
"""
# if the event cache contains the event, obviously we've seen it.
# TODO: We used to query the _get_event_cache here as a fast-path before
# hitting the database. For if an event were in the cache, we've presumably
# seen it before.
#
# But this is currently an invalid assumption due to the _get_event_cache
# not being invalidated when purging events from a room. The optimisation can
# be re-added after https://github.com/matrix-org/synapse/issues/13476
cache_results = {
event_id
for event_id in event_ids
if await self._get_event_cache.contains((event_id,))
}
results = dict.fromkeys(cache_results, True)
remaining = [
event_id for event_id in event_ids if event_id not in cache_results
]
if not remaining:
return results
def have_seen_events_txn(txn: LoggingTransaction) -> None:
def have_seen_events_txn(txn: LoggingTransaction) -> Dict[str, bool]:
# we deliberately do *not* query the database for room_id, to make the
# query an index-only lookup on `events_event_id_key`.
#
@@ -1524,16 +1518,17 @@ class EventsWorkerStore(SQLBaseStore):
sql = "SELECT event_id FROM events AS e WHERE "
clause, args = make_in_list_sql_clause(
txn.database_engine, "e.event_id", remaining
txn.database_engine, "e.event_id", event_ids
)
txn.execute(sql + clause, args)
found_events = {eid for eid, in txn}
# ... and then we can update the results for each key
results.update({eid: (eid in found_events) for eid in remaining})
return {eid: (eid in found_events) for eid in event_ids}
await self.db_pool.runInteraction("have_seen_events", have_seen_events_txn)
return results
return await self.db_pool.runInteraction(
"have_seen_events", have_seen_events_txn
)
@cached(max_entries=100000, tree=True)
async def have_seen_event(self, room_id: str, event_id: str) -> bool:

View File

@@ -19,7 +19,13 @@ from unittest.mock import Mock, patch
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError, Codes, LimitExceededError, SynapseError
from synapse.api.errors import (
AuthError,
Codes,
LimitExceededError,
NotFoundError,
SynapseError,
)
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, make_event_from_dict
from synapse.federation.federation_base import event_from_pdu_json
@@ -28,6 +34,7 @@ from synapse.logging.context import LoggingContext, run_in_background
from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
from synapse.storage.databases.main.events_worker import EventCacheEntry
from synapse.util import Clock
from synapse.util.stringutils import random_string
@@ -322,6 +329,102 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase):
)
self.get_success(d)
def test_backfill_ignores_known_events(self) -> None:
"""
Tests that events that we already know about are ignored when backfilling.
"""
# Set up users
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test")
other_server = "otherserver"
other_user = "@otheruser:" + other_server
# Create a room to backfill events into
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
room_version = self.get_success(self.store.get_room_version(room_id))
# Build an event to backfill
event = event_from_pdu_json(
{
"type": EventTypes.Message,
"content": {"body": "hello world", "msgtype": "m.text"},
"room_id": room_id,
"sender": other_user,
"depth": 32,
"prev_events": [],
"auth_events": [],
"origin_server_ts": self.clock.time_msec(),
},
room_version,
)
# Ensure the event is not already in the DB
self.get_failure(
self.store.get_event(event.event_id),
NotFoundError,
)
# Backfill the event and check that it has entered the DB.
# We mock out the FederationClient.backfill method, to pretend that a remote
# server has returned our fake event.
federation_client_backfill_mock = Mock(return_value=make_awaitable([event]))
self.hs.get_federation_client().backfill = federation_client_backfill_mock
# We also mock the persist method with a side effect of itself. This allows us
# to track when it has been called while preserving its function.
persist_events_and_notify_mock = Mock(
side_effect=self.hs.get_federation_event_handler().persist_events_and_notify
)
self.hs.get_federation_event_handler().persist_events_and_notify = (
persist_events_and_notify_mock
)
# Small side-tangent. We populate the event cache with the event, even though
# it is not yet in the DB. This is an invalid scenario that can currently occur
# due to not properly invalidating the event cache.
# See https://github.com/matrix-org/synapse/issues/13476.
#
# As a result, backfill should not rely on the event cache to check whether
# we already have an event in the DB.
# TODO: Remove this bit when the event cache is properly invalidated.
cache_entry = EventCacheEntry(
event=event,
redacted_event=None,
)
self.store._get_event_cache.set_local((event.event_id,), cache_entry)
# We now call FederationEventHandler.backfill (a separate method) to trigger
# a backfill request. It should receive the fake event.
self.get_success(
self.hs.get_federation_event_handler().backfill(
other_user,
room_id,
limit=10,
extremities=[],
)
)
# Check that our fake event was persisted.
persist_events_and_notify_mock.assert_called_once()
persist_events_and_notify_mock.reset_mock()
# Now we repeat the backfill, having the homeserver receive the fake event
# again.
self.get_success(
self.hs.get_federation_event_handler().backfill(
other_user,
room_id,
limit=10,
extremities=[],
),
)
# This time, we expect no event persistence to have occurred, as we already
# have this event.
persist_events_and_notify_mock.assert_not_called()
@unittest.override_config(
{"rc_invites": {"per_user": {"per_second": 0.5, "burst_count": 3}}}
)

View File

@@ -90,18 +90,6 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
def test_query_via_event_cache(self):
# fetch an event into the event cache
self.get_success(self.store.get_event(self.event_ids[0]))
# looking it up should now cause no db hits
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events(self.room_id, [self.event_ids[0]])
)
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
def test_persisting_event_invalidates_cache(self):
"""
Test to make sure that the `have_seen_event` cache