Compare commits

...

2 Commits

Author SHA1 Message Date
Andrew Morgan
3faa0974be Use _check_sigs_and_hash_and_fetch to validate backfill requests
I believe this method drops pdus (or at least returns None instead) which allows backfill to continue
working even when an event has an invalid signature
2020-09-18 12:07:16 +01:00
Erik Johnston
858ef5e144 Intelligently select extremities used in backfill.
Instead of just using the most recent extremities let's pick the
ones that will give us results that the pagination request cares about,
i.e. pick extremities only if they have a smaller depth than the
pagination token.

This is useful when we fail to backfill an extremity, as we no longer
get stuck requesting that same extremity repeatedly.
2020-09-18 12:04:04 +01:00
4 changed files with 58 additions and 24 deletions

View File

@@ -217,11 +217,8 @@ class FederationClient(FederationBase):
for p in transaction_data["pdus"]
]
# FIXME: We should handle signature failures more gracefully.
pdus[:] = await make_deferred_yieldable(
defer.gatherResults(
self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True,
).addErrback(unwrapFirstError)
pdus[:] = await self._check_sigs_and_hash_and_fetch(
dest, pdus, outlier=True, room_version=room_version
)
return pdus

View File

@@ -937,9 +937,18 @@ class FederationHandler(BaseHandler):
return events
async def maybe_backfill(self, room_id, current_depth):
async def maybe_backfill(self, room_id: str, current_depth: int, limit: int):
"""Checks the database to see if we should backfill before paginating,
and if so do.
Args:
room_id
current_depth: The depth from which we're paginating from. This is
used to decide if we should backfill and what extremities to
use.
limit: The number of events that the pagination request will
return. This is used as part of the heuristic to decide if we
should back paginate.
"""
extremities = await self.store.get_oldest_events_with_depth_in_room(room_id)
@@ -998,16 +1007,47 @@ class FederationHandler(BaseHandler):
sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1]))
max_depth = sorted_extremeties_tuple[0][1]
# If we're approaching an extremity we trigger a backfill, otherwise we
# no-op.
if current_depth - 2 * limit > max_depth:
logger.debug(
"Not backfilling as we don't need to. %d < %d - 2 * %d",
max_depth,
current_depth,
limit,
)
return
logger.debug(
"room_id: %s, backfill: current_depth: %s, max_depth: %s, extrems: %s",
room_id,
current_depth,
max_depth,
sorted_extremeties_tuple,
)
# We ignore extremities that have a greater depth than our current depth
# as:
# 1. we don't really care about getting events that has happened
# before our current position; and
# 2. we have likely previously tried and failed to backfill from that
# extremity, so to avoid getting "stuck" requesting the same
# backfill repeatedly we drop those extremities.
filtered_sorted_extremeties_tuple = [
t for t in sorted_extremeties_tuple if int(t[1]) <= current_depth
]
# However, we need to check that the filtered extremities are non-empty.
# If they are empty then either we can a) bail or b) still attempt to
# backill. We opt to try backfilling anyway just in case we do get
# relevant events.
if filtered_sorted_extremeties_tuple:
sorted_extremeties_tuple = filtered_sorted_extremeties_tuple
# We don't want to specify too many extremities as it causes the backfill
# request URI to be too long.
extremities = dict(sorted_extremeties_tuple[:5])
if current_depth > max_depth:
logger.debug(
"Not backfilling as we don't need to. %d < %d", max_depth, current_depth
)
return
# Now we need to decide which hosts to hit first.
# First we try hosts that are already in the room

View File

@@ -335,7 +335,7 @@ class PaginationHandler(object):
if room_token.topological:
max_topo = room_token.topological
else:
max_topo = await self.store.get_max_topological_token(
max_topo = await self.store.get_current_topological_token(
room_id, room_token.stream
)
@@ -351,7 +351,7 @@ class PaginationHandler(object):
source_config.from_key = str(leave_token)
await self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, max_topo
room_id, max_topo, limit=pagin_config.limit,
)
events, next_key = await self.store.paginate_room_events(

View File

@@ -605,23 +605,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
lambda row: "t%d-%d" % (row["topological_ordering"], row["stream_ordering"])
)
def get_max_topological_token(self, room_id, stream_key):
"""Get the max topological token in a room before the given stream
def get_current_topological_token(self, room_id, stream_key):
"""Gets the topological token in a room after or at the given stream
ordering.
Args:
room_id (str)
stream_key (int)
Returns:
Deferred[int]
room_id
stream_key
"""
sql = (
"SELECT coalesce(max(topological_ordering), 0) FROM events"
" WHERE room_id = ? AND stream_ordering < ?"
"SELECT coalesce(MIN(topological_ordering), 0) FROM events"
" WHERE room_id = ? AND stream_ordering >= ?"
)
return self.db_pool.execute(
"get_max_topological_token", None, sql, room_id, stream_key
"get_current_topological_token", None, sql, room_id, stream_key
).addCallback(lambda r: r[0][0] if r else 0)
def _get_max_topological_txn(self, txn, room_id):