Compare commits

...

35 Commits

Author SHA1 Message Date
Erik Johnston
918a5055ff Use fractions for ordering of chunks
Using floats turned out to be a bad idea, as it broke subtely if the
needed precision was too large. This PR replaces the implementation with
one that uses fractions and stores them in the database as two integers.
2018-06-05 16:40:16 +01:00
Erik Johnston
2d97fb6740 Implement backgroud update for chunks 2018-06-05 14:53:01 +01:00
Erik Johnston
fbafc86aca Assign chunks to forward extremities 2018-06-05 14:52:31 +01:00
Erik Johnston
9eaf69a386 Merge pull request #3315 from matrix-org/erikj/chunk_pag_1
Implement pagination using chunks
2018-06-01 15:17:58 +01:00
Erik Johnston
c33810d9cc Remove spurious conditional 2018-06-01 11:55:08 +01:00
Erik Johnston
58aadd3dd4 Remove spurious break 2018-06-01 11:54:24 +01:00
Erik Johnston
e7bb34b72a Use *row 2018-06-01 11:53:43 +01:00
Erik Johnston
9e7cf48461 Reuse stream_ordering attribute instead of order
The internal metadata "order" attribute was only used in one place,
which was equivalent to using the stream ordering anyway.
2018-06-01 11:51:11 +01:00
Erik Johnston
5bf4fa0fc4 Don't drop topo ordering when there is no chunk_id 2018-06-01 11:43:03 +01:00
Erik Johnston
80a877e9d9 Comment on stream vs topological vs depth ordering in schema 2018-06-01 11:31:16 +01:00
Erik Johnston
47b36e9a02 Update docs for RoomStreamToken 2018-06-01 11:19:57 +01:00
Erik Johnston
b671e57759 Implement pagination using chunks 2018-05-31 11:27:31 +01:00
Erik Johnston
bf599cdba1 Use calculated topological ordering when persisting events 2018-05-31 10:18:40 +01:00
Erik Johnston
6188512b18 Add chunk ID to pagination token 2018-05-31 10:04:33 +01:00
Erik Johnston
867132f28c Merge pull request #3240 from matrix-org/erikj/events_chunks
Compute new chunks for new events
2018-05-31 09:37:52 +01:00
Erik Johnston
384731330d Rename func to _insert_into_chunk_txn 2018-05-30 11:51:03 +01:00
Erik Johnston
9e1d3f119a Remove unnecessary COALESCE 2018-05-30 11:45:58 +01:00
Erik Johnston
f687d8fae2 Comments 2018-05-30 11:45:41 +01:00
Erik Johnston
ecd4931ab2 Just iterate once rather than create a new set 2018-05-30 11:35:02 +01:00
Erik Johnston
1cdd0d3b0d Remove redundant conditions 2018-05-30 11:33:57 +01:00
Erik Johnston
1810cc3f7e Remove unnecessary set 2018-05-30 11:32:27 +01:00
Erik Johnston
6c1d13a15a Correctly loop over events_and_contexts 2018-05-30 11:30:33 +01:00
Erik Johnston
13dbcafb9b Compute new chunks for new events
We also calculate a consistent topological ordering within a chunk, but
it isn't used yet.
2018-05-25 10:54:23 +01:00
Erik Johnston
bcc9e7f777 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/room_chunks 2018-05-25 10:53:43 +01:00
Erik Johnston
6e11803ed3 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/room_chunks 2018-05-23 10:54:14 +01:00
Erik Johnston
0a325e5385 Merge pull request #3226 from matrix-org/erikj/chunk_base
Begin adding implementing room chunks
2018-05-18 13:54:34 +01:00
Erik Johnston
b725e128f8 Comments 2018-05-18 13:43:01 +01:00
Erik Johnston
0504d809fd More comments 2018-05-17 17:08:36 +01:00
Erik Johnston
12fd6d7688 Document case of unconnected chunks 2018-05-17 16:07:20 +01:00
Erik Johnston
a638649254 Make insert_* functions internal and reorder funcs
This makes it clearer what the public interface is vs what subclasses
need to implement.
2018-05-17 15:10:23 +01:00
Erik Johnston
d4e4a7344f Increase range of rebalance interval
This both simplifies the code, and ensures that the target node is
roughly in the center of the range rather than at an end.
2018-05-17 15:09:31 +01:00
Erik Johnston
c771c124d5 Improve documentation and comments 2018-05-17 15:09:10 +01:00
Erik Johnston
3369354b56 Add note about index in changelog 2018-05-17 14:00:54 +01:00
Erik Johnston
3b505a80dc Merge branch 'develop' of github.com:matrix-org/synapse into erikj/chunk_base 2018-05-17 14:00:41 +01:00
Erik Johnston
943f1029d6 Begin adding implementing room chunks
This commit adds the necessary tables and columns, as well as an
implementation of an online topological sorting algorithm to maintain an
absolute ordering of the room chunks.
2018-05-17 12:05:22 +01:00
13 changed files with 1949 additions and 86 deletions

View File

@@ -1,3 +1,11 @@
Changes in <unreleased>
=======================
This release adds an index to the events table. This means that on first
startup there will be an inceased amount of IO until the index is created, and
an increase in disk usage.
Changes in synapse v0.30.0 (2018-05-24)
==========================================
@@ -53,7 +61,6 @@ Bug Fixes:
* Fix error in handling receipts (PR #3235)
* Stop the transaction cache caching failures (PR #3255)
Changes in synapse v0.29.1 (2018-05-17)
==========================================
Changes:

View File

@@ -235,7 +235,7 @@ class MessageHandler(BaseHandler):
room_id, max_topo
)
events, next_key = yield self.store.paginate_room_events(
events, next_key, extremities = yield self.store.paginate_room_events(
room_id=room_id,
from_key=source_config.from_key,
to_key=source_config.to_key,

View File

@@ -514,7 +514,8 @@ class RoomEventSource(object):
events = list(room_events)
events.extend(e for evs, _ in room_to_events.values() for e in evs)
events.sort(key=lambda e: e.internal_metadata.order)
# Order by the stream ordering of the events.
events.sort(key=lambda e: e.internal_metadata.stream_ordering)
if limit:
events[:] = events[:limit]
@@ -534,7 +535,7 @@ class RoomEventSource(object):
@defer.inlineCallbacks
def get_pagination_rows(self, user, config, key):
events, next_key = yield self.store.paginate_room_events(
events, next_key, _ = yield self.store.paginate_room_events(
room_id=key,
from_key=config.from_key,
to_key=config.to_key,

View File

@@ -131,6 +131,7 @@ class DataStore(RoomMemberStore, RoomStore,
self._group_updates_id_gen = StreamIdGenerator(
db_conn, "local_group_updates", "stream_id",
)
self._chunk_id_gen = IdGenerator(db_conn, "events", "chunk_id")
if isinstance(self.database_engine, PostgresEngine):
self._cache_id_gen = StreamIdGenerator(

View File

@@ -0,0 +1,577 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import logging
from fractions import Fraction
from synapse.storage._base import SQLBaseStore
from synapse.storage.engines import PostgresEngine
from synapse.util.katriel_bodlaender import OrderedListStore
import synapse.metrics
metrics = synapse.metrics.get_metrics_for(__name__)
rebalance_counter = metrics.register_counter("rebalances")
logger = logging.getLogger(__name__)
class ChunkDBOrderedListStore(OrderedListStore):
"""Used as the list store for room chunks, efficiently maintaining them in
topological order on updates.
A room chunk is a connected portion of the room events DAG. Chunks are
constructed so that they have the additional property that for all events in
the chunk, either all of their prev_events are in that chunk or none of them
are. This ensures that no event that is subsequently received needs to be
inserted into the middle of a chunk, since it cannot both reference an event
in the chunk and be referenced by an event in the chunk (assuming no
cycles).
As such the set of chunks in a room inherits a DAG, i.e. if an event in one
chunk references an event in a second chunk, then we say that the first
chunk references the second, and thus forming a DAG. (This means that chunks
start off disconnected until an event is received that connects the two
chunks.)
We can therefore end up with multiple chunks in a room when the server
misses some events, e.g. due to the server being offline for a time.
The server may only have a subset of all events in a room, in which case
its possible for the server to have chunks that are unconnected from each
other. The ordering between unconnected chunks is arbitrary.
The class is designed for use inside transactions and so takes a
transaction object in the constructor. This means that it needs to be
re-instantiated in each transaction, so all state needs to be stored
in the database.
Internally the ordering is implemented using a linked list and assigning
each chunk a fraction. `get_next` and `get_prev` are implemented via linked
lists, and comparisons implemented using the fractions. When inserting
chunks fractions are picked such that their denominator is the smallest
possible. However, if the denominators grow too big then a rebalancing has
to take place to reduce the denominators; see `_rebalance` for details.
Note that OrderedListStore orders nodes such that source of an edge
comes before the target. This is counter intuitive when edges represent
causality, so for the purposes of ordering algorithm we invert the edge
directions, i.e. if chunk A has a prev chunk of B then we say that the
edge is from B to A. This ensures that newer chunks get inserted at the
end (rather than the start).
Note: Calls to `add_node` and `add_edge` cannot overlap for the same room,
and so callers should perform some form of per-room locking when using
this class.
Args:
txn
room_id (str)
clock
database_engine
rebalance_max_denominator (int): When a rebalance is triggered we
replace existing orders with those that have a denominator smaller
or equal to this
max_denominator (int): A rebalance is triggered when a node has an
ordering with a denominator greater than this
"""
def __init__(self,
txn, room_id, clock, database_engine,
rebalance_max_denominator=100,
max_denominator=100000):
self.txn = txn
self.room_id = room_id
self.clock = clock
self.database_engine = database_engine
self.rebalance_md = rebalance_max_denominator
self.max_denominator = max_denominator
def is_before(self, a, b):
"""Implements OrderedListStore"""
return self._get_order(a) < self._get_order(b)
def get_prev(self, node_id):
"""Implements OrderedListStore"""
sql = """
SELECT chunk_id FROM chunk_linearized
WHERE next_chunk_id = ?
"""
self.txn.execute(sql, (node_id,))
row = self.txn.fetchone()
if row:
return row[0]
return None
def get_next(self, node_id):
"""Implements OrderedListStore"""
sql = """
SELECT next_chunk_id FROM chunk_linearized
WHERE chunk_id = ?
"""
self.txn.execute(sql, (node_id,))
row = self.txn.fetchone()
if row:
return row[0]
return None
def _insert_before(self, node_id, target_id):
"""Implements OrderedListStore"""
rebalance = False # Set to true if we need to trigger a rebalance
if target_id:
before_id = self.get_prev(target_id)
if before_id:
new_order = self._insert_between(node_id, before_id, target_id)
else:
new_order = self._insert_at_start(node_id, target_id)
else:
# If target_id is None then we insert at the end.
self.txn.execute("""
SELECT chunk_id
FROM chunk_linearized
WHERE room_id = ? AND next_chunk_id is NULL
""", (self.room_id,))
row = self.txn.fetchone()
if row:
new_order = self._insert_at_end(node_id, row[0])
else:
new_order = self._insert_first(node_id)
rebalance = new_order.denominator > self.max_denominator
if rebalance:
self._rebalance(node_id)
def _insert_after(self, node_id, target_id):
"""Implements OrderedListStore"""
rebalance = False # Set to true if we need to trigger a rebalance
next_chunk_id = None
if target_id:
next_chunk_id = self.get_next(target_id)
if next_chunk_id:
new_order = self._insert_between(node_id, target_id, next_chunk_id)
else:
new_order = self._insert_at_end(node_id, target_id)
else:
# If target_id is None then we insert at the start.
self.txn.execute("""
SELECT chunk_id
FROM chunk_linearized
NATURAL JOIN chunk_linearized_first
WHERE room_id = ?
""", (self.room_id,))
row = self.txn.fetchone()
if row:
new_order = self._insert_at_start(node_id, row[0])
else:
new_order = self._insert_first(node_id)
rebalance = new_order.denominator > self.max_denominator
if rebalance:
self._rebalance(node_id)
def _insert_between(self, node_id, left_id, right_id):
"""Inserts node between given existing nodes.
"""
left_order = self._get_order(left_id)
right_order = self._get_order(right_id)
assert left_order < right_order
new_order = get_fraction_in_range(left_order, right_order)
SQLBaseStore._simple_update_one_txn(
self.txn,
table="chunk_linearized",
keyvalues={"chunk_id": left_id},
updatevalues={"next_chunk_id": node_id},
)
SQLBaseStore._simple_insert_txn(
self.txn,
table="chunk_linearized",
values={
"chunk_id": node_id,
"room_id": self.room_id,
"next_chunk_id": right_id,
"numerator": int(new_order.numerator),
"denominator": int(new_order.denominator),
}
)
return new_order
def _insert_at_end(self, node_id, last_id):
"""Inserts node at the end using existing last node.
"""
last_order = self._get_order(last_id)
new_order = Fraction(int(math.ceil(last_order)) + 1, 1)
SQLBaseStore._simple_update_one_txn(
self.txn,
table="chunk_linearized",
keyvalues={"chunk_id": last_id},
updatevalues={"next_chunk_id": node_id},
)
SQLBaseStore._simple_insert_txn(
self.txn,
table="chunk_linearized",
values={
"chunk_id": node_id,
"room_id": self.room_id,
"next_chunk_id": None,
"numerator": int(new_order.numerator),
"denominator": int(new_order.denominator),
}
)
return new_order
def _insert_at_start(self, node_id, first_id):
"""Inserts node at the start using existing first node.
"""
first_order = self._get_order(first_id)
new_order = get_fraction_in_range(0, first_order)
SQLBaseStore._simple_update_one_txn(
self.txn,
table="chunk_linearized_first",
keyvalues={"room_id": self.room_id},
updatevalues={"chunk_id": node_id},
)
SQLBaseStore._simple_insert_txn(
self.txn,
table="chunk_linearized",
values={
"chunk_id": node_id,
"room_id": self.room_id,
"next_chunk_id": first_id,
"numerator": int(new_order.numerator),
"denominator": int(new_order.denominator),
}
)
return new_order
def _insert_first(self, node_id):
"""Inserts the first node for this room.
"""
SQLBaseStore._simple_insert_txn(
self.txn,
table="chunk_linearized_first",
values={
"room_id": self.room_id,
"chunk_id": node_id,
},
)
SQLBaseStore._simple_insert_txn(
self.txn,
table="chunk_linearized",
values={
"chunk_id": node_id,
"room_id": self.room_id,
"next_chunk_id": None,
"numerator": 1,
"denominator": 1,
}
)
return Fraction(1, 1)
def get_nodes_with_edges_to(self, node_id):
"""Implements OrderedListStore"""
# Note that we use the inverse relation here
sql = """
SELECT l.chunk_id, l.numerator, l.denominator FROM chunk_graph AS g
INNER JOIN chunk_linearized AS l ON g.prev_id = l.chunk_id
WHERE g.chunk_id = ?
"""
self.txn.execute(sql, (node_id,))
return [(Fraction(n, d), c) for c, n, d in self.txn]
def get_nodes_with_edges_from(self, node_id):
"""Implements OrderedListStore"""
# Note that we use the inverse relation here
sql = """
SELECT l.chunk_id, l.numerator, l.denominator FROM chunk_graph AS g
INNER JOIN chunk_linearized AS l ON g.chunk_id = l.chunk_id
WHERE g.prev_id = ?
"""
self.txn.execute(sql, (node_id,))
return [(Fraction(n, d), c) for c, n, d in self.txn]
def _delete_ordering(self, node_id):
"""Implements OrderedListStore"""
next_chunk_id = SQLBaseStore._simple_select_one_onecol_txn(
self.txn,
table="chunk_linearized",
keyvalues={
"chunk_id": node_id,
},
retcol="next_chunk_id",
)
SQLBaseStore._simple_delete_txn(
self.txn,
table="chunk_linearized",
keyvalues={"chunk_id": node_id},
)
sql = """
UPDATE chunk_linearized SET next_chunk_id = ?
WHERE next_chunk_id = ?
"""
self.txn.execute(sql, (next_chunk_id, node_id,))
sql = """
UPDATE chunk_linearized_first SET chunk_id = ?
WHERE chunk_id = ?
"""
self.txn.execute(sql, (next_chunk_id, node_id,))
def _add_edge_to_graph(self, source_id, target_id):
"""Implements OrderedListStore"""
# Note that we use the inverse relation
SQLBaseStore._simple_insert_txn(
self.txn,
table="chunk_graph",
values={"chunk_id": target_id, "prev_id": source_id}
)
def _get_order(self, node_id):
"""Get the ordering of the given node.
"""
row = SQLBaseStore._simple_select_one_txn(
self.txn,
table="chunk_linearized",
keyvalues={"chunk_id": node_id},
retcols=("numerator", "denominator",),
)
return Fraction(row["numerator"], row["denominator"])
def _rebalance(self, node_id):
"""Rebalances the list around the given node to ensure that the
ordering denominators aren't too big.
This is done by starting at the given chunk and generating new orders
based on a Farey sequence of order `self.rebalance_md` for all
subsequent chunks that have an order less than that of the ordering
generated by the Farey sequence.
For example say we have chunks (and orders): A (23/90), B (24/91) and
C (2/3), and we have rebalance_md set to 5, a rebalancing would produce:
A: 23/90 -> 1/3
B: 24/91 -> 2/5
C: 2/3 (no change)
Since the farey sequence is 1/5, 1/4, 1/3, 2/5, 1/2, ... and 1/3 is the
smallest term greater than 23/90.
Note that we've extended Farey Sequence to be infinite by repeating the
sequence with an added integer. For example sequence with order 3:
0/1, 1/3, 2/3, 1/1, 4/3, 5/3, 2/1, 7/3, ...
"""
logger.info("Rebalancing room %s, chunk %s", self.room_id, node_id)
old_order = self._get_order(node_id)
a, b, c, d = find_farey_terms(old_order, self.rebalance_md)
assert old_order < Fraction(a, b)
assert b + d > self.rebalance_md
# Since we can easily produce farey sequence terms with an iterative
# algorithm, we can use WITH RECURSIVE to do so. This is less clear
# than doing it in python, but saves us being killed by the RTT to the
# DB if we need to rebalance a large number of nodes.
with_sql = """
WITH RECURSIVE chunks (chunk_id, next, n, a, b, c, d) AS (
SELECT chunk_id, next_chunk_id, ?, ?, ?, ?, ?
FROM chunk_linearized WHERE chunk_id = ?
UNION ALL
SELECT n.chunk_id, n.next_chunk_id, n,
c, d, ((n + b) / d) * c - a, ((n + b) / d) * d - b
FROM chunks AS c
INNER JOIN chunk_linearized AS l ON l.chunk_id = c.chunk_id
INNER JOIN chunk_linearized AS n ON n.chunk_id = l.next_chunk_id
WHERE c * 1.0 / d > n.numerator * 1.0 / n.denominator
)
"""
# Annoyingly, postgres 9.4 doesn't support the standard SQL subquery
# syntax for updates.
if isinstance(self.database_engine, PostgresEngine):
sql = with_sql + """
UPDATE chunk_linearized AS l
SET numerator = a, denominator = b
FROM chunks AS c
WHERE c.chunk_id = l.chunk_id
"""
else:
sql = with_sql + """
UPDATE chunk_linearized
SET (numerator, denominator) = (
SELECT a, b FROM chunks
WHERE chunks.chunk_id = chunk_linearized.chunk_id
)
WHERE chunk_id in (SELECT chunk_id FROM chunks)
"""
self.txn.execute(sql, (
self.rebalance_md, a, b, c, d, node_id
))
logger.info("Rebalanced %d chunks in room %s", self.txn.rowcount, self.room_id)
rebalance_counter.inc()
def get_fraction_in_range(min_frac, max_frac):
"""Gets a fraction in between the given numbers.
Uses Stern-Brocot tree to generate the fraction with the smallest
denominator.
See https://en.wikipedia.org/wiki/Stern%E2%80%93Brocot_tree
Args:
min_frac (numbers.Rational)
max_frac (numbers.Rational)
Returns:
numbers.Rational
"""
assert 0 <= min_frac < max_frac
# If the determinant is 1 then the fraction with smallest numerator and
# denominator in the range is the mediant, so we don't have to use the
# stern brocot tree to search for it.
determinant = (
min_frac.denominator * max_frac.numerator
- min_frac.numerator * max_frac.denominator
)
if determinant == 1:
return Fraction(
min_frac.numerator + max_frac.numerator,
min_frac.denominator + max_frac.denominator,
)
# This works by tracking two fractions a/b and c/d and repeatedly replacing
# one of them with their mediant, depending on if the mediant is smaller
# or greater than the specified range.
a, b, c, d = 0, 1, 1, 0
while True:
f = Fraction(a + c, b + d)
if f <= min_frac:
a, b, c, d = a + c, b + d, c, d
elif min_frac < f < max_frac:
return f
else:
a, b, c, d = a, b, a + c, b + d
def find_farey_terms(min_frac, max_denom):
"""Find the smallest pair of fractions that are part of the Farey sequence
of order `max_denom` (the ordered sequence of all fraction with denominator
less than or equal to max_denom).
This is useful as it can be fed into a simple iterative algorithm to
generate subsequent entries in the sequence.
A pair of fractions a/b, c/d are neighbours in the sequence of order
max(b, d) if and only if their determinant is one, i.e. bc - ad = 1. Note
that the next order sequence is generate by taking the mediants of the
previous order, so a/b and c/d are neighbours in all sequences with orders
between max(b, d) and b + d.
We can therefore use the Stern-Brocot tree to find the closest pair of
fractions to min_frac such that b + d is strictly greater than max_denom,
since all neighbouring fractions in Stern-Brocot satisfy the necessary
determinant property.
Note that we've extended Farey Sequence to be infinite by repeating the
sequence with an added integer. For example sequence with order 3:
0/1, 1/3, 2/3, 1/1, 4/3, 5/3, 2/1, 7/3, ...
See https://en.wikipedia.org/wiki/Farey_sequence
Args:
min_frac (numbers.Rational)
max_frac (int)
Returns:
tuple[int, int, int, int]
"""
a, b, c, d = 0, 1, 1, 0
while True:
cur_frac = Fraction(a + c, b + d)
if b + d > max_denom:
break
if cur_frac <= min_frac:
a, b, c, d = a + c, b + d, c, d
elif min_frac < cur_frac:
a, b, c, d = a, b, a + c, b + d
# a/b may be smaller than min_frac, so we run the algorithm to generate
# next Farey sequence terms until a/b is strictly greater than min_frac
while Fraction(a, b) <= min_frac:
k = int((max_denom + b) / d)
a, b, c, d = c, d, k * c - a, k * d - b
assert min_frac < Fraction(a, b) < Fraction(c, d)
assert b * c - a * d == 1
return a, b, c, d

View File

@@ -23,6 +23,7 @@ import simplejson as json
from twisted.internet import defer
from synapse.storage.events_worker import EventsWorkerStore
from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore
from synapse.util.async import ObservableDeferred
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import (
@@ -201,6 +202,7 @@ def _retry_on_integrity_error(func):
class EventsStore(EventsWorkerStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
EVENT_FIELDS_CHUNK = "event_fields_chunk_id"
def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)
@@ -232,6 +234,20 @@ class EventsStore(EventsWorkerStore):
psql_only=True,
)
self.register_background_index_update(
"events_chunk_index",
index_name="events_chunk_index",
table="events",
columns=["room_id", "chunk_id", "topological_ordering", "stream_ordering"],
unique=True,
psql_only=True,
)
self.register_background_update_handler(
self.EVENT_FIELDS_CHUNK,
self._background_compute_chunks,
)
self._event_persist_queue = _EventPeristenceQueue()
self._state_resolution_handler = hs.get_state_resolution_handler()
@@ -1010,13 +1026,20 @@ class EventsStore(EventsWorkerStore):
}
)
sql = (
"UPDATE events SET outlier = ?"
" WHERE event_id = ?"
chunk_id, topo = self._insert_into_chunk_txn(
txn, event.room_id, event.event_id,
[eid for eid, _ in event.prev_events],
)
txn.execute(
sql,
(False, event.event_id,)
self._simple_update_txn(
txn,
table="events",
keyvalues={"event_id": event.event_id},
updatevalues={
"outlier": False,
"chunk_id": chunk_id,
"topological_ordering": topo,
},
)
# Update the event_backward_extremities table now that this
@@ -1099,13 +1122,22 @@ class EventsStore(EventsWorkerStore):
],
)
self._simple_insert_many_txn(
txn,
table="events",
values=[
{
for event, _ in events_and_contexts:
if event.internal_metadata.is_outlier():
chunk_id, topo = None, 0
else:
chunk_id, topo = self._insert_into_chunk_txn(
txn, event.room_id, event.event_id,
[eid for eid, _ in event.prev_events],
)
self._simple_insert_txn(
txn,
table="events",
values={
"stream_ordering": event.internal_metadata.stream_ordering,
"topological_ordering": event.depth,
"chunk_id": chunk_id,
"topological_ordering": topo,
"depth": event.depth,
"event_id": event.event_id,
"room_id": event.room_id,
@@ -1120,10 +1152,8 @@ class EventsStore(EventsWorkerStore):
"url" in event.content
and isinstance(event.content["url"], basestring)
),
}
for event, _ in events_and_contexts
],
)
},
)
def _store_rejected_events_txn(self, txn, events_and_contexts):
"""Add rows to the 'rejections' table for received events which were
@@ -1335,6 +1365,177 @@ class EventsStore(EventsWorkerStore):
(event.event_id, event.redacts)
)
def _insert_into_chunk_txn(self, txn, room_id, event_id, prev_event_ids):
"""Computes the chunk ID and topological ordering for an event and
handles updating chunk_graph table.
Args:
txn,
room_id (str)
event_id (str)
prev_event_ids (list[str])
Returns:
tuple[int, int]: Returns the chunk_id, topological_ordering for
the event
"""
# We calculate the chunk for an event using the following rules:
#
# 1. If all prev events have the same chunk ID then use that chunk ID
# 2. If we have none of the prev events but do have events pointing to
# the event, then we use their chunk ID if:
# - They're all in the same chunk, and
# - All their prev events match the events being inserted
# 3. Otherwise, create a new chunk and use that
# Set of chunks that the event refers to. Includes None if there were
# prev events that we don't have (or don't have a chunk for)
prev_chunk_ids = set()
for eid in prev_event_ids:
chunk_id = self._simple_select_one_onecol_txn(
txn,
table="events",
keyvalues={"event_id": eid},
retcol="chunk_id",
allow_none=True,
)
prev_chunk_ids.add(chunk_id)
forward_events = self._simple_select_onecol_txn(
txn,
table="event_edges",
keyvalues={
"prev_event_id": event_id,
"is_state": False,
},
retcol="event_id",
)
# Set of chunks that refer to this event.
forward_chunk_ids = set()
# All the prev_events of events in `forward_events`.
# Note that this will include the current event_id.
sibling_events = set()
for eid in forward_events:
chunk_id = self._simple_select_one_onecol_txn(
txn,
table="events",
keyvalues={"event_id": eid},
retcol="chunk_id",
allow_none=True,
)
if chunk_id is not None:
# chunk_id can be None if it's an outlier
forward_chunk_ids.add(chunk_id)
pes = self._simple_select_onecol_txn(
txn,
table="event_edges",
keyvalues={
"event_id": eid,
"is_state": False,
},
retcol="prev_event_id",
)
sibling_events.update(pes)
table = ChunkDBOrderedListStore(
txn, room_id, self.clock, self.database_engine,
)
# If there is only one previous chunk (and that isn't None), then this
# satisfies condition one.
if len(prev_chunk_ids) == 1 and None not in prev_chunk_ids:
chunk_id = list(prev_chunk_ids)[0]
# This event is being inserted at the end of the chunk
new_topo = self._simple_select_one_onecol_txn(
txn,
table="events",
keyvalues={
"room_id": room_id,
"chunk_id": chunk_id,
},
retcol="MAX(topological_ordering)",
)
new_topo += 1
# If there is only one forward chunk and only one sibling event (which
# would be the given event), then this satisfies condition two.
elif len(forward_chunk_ids) == 1 and len(sibling_events) == 1:
chunk_id = list(forward_chunk_ids)[0]
# This event is being inserted at the start of the chunk
new_topo = self._simple_select_one_onecol_txn(
txn,
table="events",
keyvalues={
"room_id": room_id,
"chunk_id": chunk_id,
},
retcol="MIN(topological_ordering)",
)
new_topo -= 1
else:
chunk_id = self._chunk_id_gen.get_next()
new_topo = 0
# We've generated a new chunk, so we have to tell the
# ChunkDBOrderedListStore about that.
table.add_node(chunk_id)
# We need to now update the database with any new edges between chunks
current_prev_ids = self._simple_select_onecol_txn(
txn,
table="chunk_graph",
keyvalues={
"chunk_id": chunk_id,
},
retcol="prev_id",
)
current_forward_ids = self._simple_select_onecol_txn(
txn,
table="chunk_graph",
keyvalues={
"prev_id": chunk_id,
},
retcol="chunk_id",
)
for pid in prev_chunk_ids:
if pid is not None and pid not in current_prev_ids and pid != chunk_id:
# Note that the edge direction is reversed than what you might
# expect. See ChunkDBOrderedListStore for more details.
table.add_edge(pid, chunk_id)
for fid in forward_chunk_ids:
# Note that the edge direction is reversed than what you might
# expect. See ChunkDBOrderedListStore for more details.
if fid not in current_forward_ids and fid != chunk_id:
table.add_edge(chunk_id, fid)
# We now need to update the backwards extremities for the chunks.
txn.executemany("""
INSERT INTO chunk_backwards_extremities (chunk_id, event_id)
SELECT ?, ? WHERE ? NOT IN (SELECT event_id FROM events)
""", [(chunk_id, eid, eid) for eid in prev_event_ids])
self._simple_delete_txn(
txn,
table="chunk_backwards_extremities",
keyvalues={"event_id": event_id},
)
return chunk_id, new_topo
@defer.inlineCallbacks
def have_events_in_timeline(self, event_ids):
"""Given a list of event ids, check if we have already processed and
@@ -1628,6 +1829,72 @@ class EventsStore(EventsWorkerStore):
defer.returnValue(result)
@defer.inlineCallbacks
def _background_compute_chunks(self, progress, batch_size):
"""Iterates over events and assigns them chunk IDs
"""
up_to_stream_id = progress.get("up_to_stream_id")
if up_to_stream_id is None:
up_to_stream_id = self.get_current_events_token() + 1
rows_inserted = progress.get("rows_inserted", 0)
def reindex_chunks_txn(txn):
txn.execute("""
SELECT stream_ordering, room_id, event_id FROM events
WHERE stream_ordering < ? AND outlier = ? AND chunk_id IS NULL
ORDER BY stream_ordering DESC
LIMIT ?
""", (up_to_stream_id, False, batch_size))
rows = txn.fetchall()
stream_ordering = up_to_stream_id
for stream_ordering, room_id, event_id in rows:
prev_events = self._simple_select_onecol_txn(
txn,
table="event_edges",
keyvalues={
"event_id": event_id,
},
retcol="prev_event_id",
)
chunk_id, topo = self._insert_into_chunk_txn(
txn, room_id, event_id, prev_events,
)
self._simple_update_txn(
txn,
table="events",
keyvalues={"event_id": event_id},
updatevalues={
"chunk_id": chunk_id,
"topological_ordering": topo,
},
)
progress = {
"up_to_stream_id": stream_ordering,
"rows_inserted": rows_inserted + len(rows)
}
self._background_update_progress_txn(
txn, self.EVENT_FIELDS_CHUNK, progress
)
return len(rows)
result = yield self.runInteraction(
self.EVENT_FIELDS_CHUNK, reindex_chunks_txn
)
if not result:
yield self._end_background_update(self.EVENT_FIELDS_CHUNK)
defer.returnValue(result)
def get_current_backfill_token(self):
"""The current minimum token that backfilled events have reached"""
return -self._backfill_id_gen.get_current_token()

View File

@@ -0,0 +1,150 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.storage._base import SQLBaseStore, LoggingTransaction
from synapse.storage.prepare_database import get_statements
SQL = """
ALTER TABLE events ADD COLUMN chunk_id BIGINT;
-- FIXME: Add index on contains_url
INSERT INTO background_updates (update_name, progress_json) VALUES
('events_chunk_index', '{}');
-- Stores how chunks of graph relate to each other
CREATE TABLE chunk_graph (
chunk_id BIGINT NOT NULL,
prev_id BIGINT NOT NULL
);
CREATE UNIQUE INDEX chunk_graph_id ON chunk_graph (chunk_id, prev_id);
CREATE INDEX chunk_graph_prev_id ON chunk_graph (prev_id);
-- The extremities in each chunk. Note that these are pointing to events that
-- we don't have, rather than boundary between chunks.
CREATE TABLE chunk_backwards_extremities (
chunk_id BIGINT NOT NULL,
event_id TEXT NOT NULL
);
CREATE INDEX chunk_backwards_extremities_id ON chunk_backwards_extremities(
chunk_id, event_id
);
CREATE INDEX chunk_backwards_extremities_event_id ON chunk_backwards_extremities(
event_id
);
-- Maintains an absolute ordering of chunks. Gets updated when we see new
-- edges between chunks.
CREATE TABLE chunk_linearized (
chunk_id BIGINT NOT NULL,
room_id TEXT NOT NULL,
next_chunk_id BIGINT, -- The chunk directly after this chunk, or NULL if last chunk
numerator BIGINT NOT NULL,
denominator BIGINT NOT NULL
);
CREATE UNIQUE INDEX chunk_linearized_id ON chunk_linearized (chunk_id);
CREATE UNIQUE INDEX chunk_linearized_next_id ON chunk_linearized (
next_chunk_id, room_id
);
-- Records the first chunk in a room.
CREATE TABLE chunk_linearized_first (
chunk_id BIGINT NOT NULL,
room_id TEXT NOT NULL
);
CREATE UNIQUE INDEX chunk_linearized_first_id ON chunk_linearized_first (room_id);
INSERT into background_updates (update_name, progress_json)
VALUES ('event_fields_chunk_id', '{}');
"""
def run_create(cur, database_engine, *args, **kwargs):
for statement in get_statements(SQL.splitlines()):
cur.execute(statement)
txn = LoggingTransaction(
cur, "schema_update", database_engine, [], [],
)
rows = SQLBaseStore._simple_select_list_txn(
txn,
table="event_forward_extremities",
keyvalues={},
retcols=("event_id", "room_id",),
)
next_chunk_id = 1
room_to_next_order = {}
prev_chunks_by_room = {}
for row in rows:
chunk_id = next_chunk_id
next_chunk_id += 1
room_id = row["room_id"]
event_id = row["event_id"]
SQLBaseStore._simple_update_txn(
txn,
table="events",
keyvalues={"room_id": room_id, "event_id": event_id},
updatevalues={"chunk_id": chunk_id},
)
ordering = room_to_next_order.get(room_id, 1)
room_to_next_order[room_id] = ordering + 1
prev_chunks = prev_chunks_by_room.setdefault(room_id, [])
SQLBaseStore._simple_insert_txn(
txn,
table="chunk_linearized",
values={
"chunk_id": chunk_id,
"room_id": row["room_id"],
"numerator": ordering,
"denominator": 1,
},
)
if prev_chunks:
SQLBaseStore._simple_update_one_txn(
txn,
table="chunk_linearized",
keyvalues={"chunk_id": prev_chunks[-1]},
updatevalues={"next_chunk_id": chunk_id},
)
else:
SQLBaseStore._simple_insert_txn(
txn,
table="chunk_linearized_first",
values={
"chunk_id": chunk_id,
"room_id": row["room_id"],
},
)
prev_chunks.append(chunk_id)
def run_upgrade(*args, **kwargs):
pass

View File

@@ -14,7 +14,12 @@
*/
CREATE TABLE IF NOT EXISTS events(
-- Defines an ordering used to stream new events to clients. Events
-- fetched via backfill have negative values.
stream_ordering INTEGER PRIMARY KEY,
-- Defines a topological ordering of events within a chunk
-- (The concept of a chunk was added in later schemas, this used to
-- be set to the same value as the `depth` field in an event)
topological_ordering BIGINT NOT NULL,
event_id TEXT NOT NULL,
type TEXT NOT NULL,

View File

@@ -41,6 +41,7 @@ from synapse.storage.events import EventsWorkerStore
from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore
from synapse.storage.engines import PostgresEngine
import abc
@@ -62,24 +63,25 @@ _TOPOLOGICAL_TOKEN = "topological"
# Used as return values for pagination APIs
_EventDictReturn = namedtuple("_EventDictReturn", (
"event_id", "topological_ordering", "stream_ordering",
"event_id", "chunk_id", "topological_ordering", "stream_ordering",
))
def lower_bound(token, engine, inclusive=False):
inclusive = "=" if inclusive else ""
if token.topological is None:
if token.chunk is None:
return "(%d <%s %s)" % (token.stream, inclusive, "stream_ordering")
else:
if isinstance(engine, PostgresEngine):
# Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well
# as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we
# use the later form when running against postgres.
return "((%d,%d) <%s (%s,%s))" % (
token.topological, token.stream, inclusive,
return "(chunk_id = %d AND (%d,%d) <%s (%s,%s))" % (
token.chunk, token.topological, token.stream, inclusive,
"topological_ordering", "stream_ordering",
)
return "(%d < %s OR (%d = %s AND %d <%s %s))" % (
return "(chunk_id = %d AND (%d < %s OR (%d = %s AND %d <%s %s)))" % (
token.chunk,
token.topological, "topological_ordering",
token.topological, "topological_ordering",
token.stream, inclusive, "stream_ordering",
@@ -88,18 +90,19 @@ def lower_bound(token, engine, inclusive=False):
def upper_bound(token, engine, inclusive=True):
inclusive = "=" if inclusive else ""
if token.topological is None:
if token.chunk is None:
return "(%d >%s %s)" % (token.stream, inclusive, "stream_ordering")
else:
if isinstance(engine, PostgresEngine):
# Postgres doesn't optimise ``(x > a) OR (x=a AND y>b)`` as well
# as it optimises ``(x,y) > (a,b)`` on multicolumn indexes. So we
# use the later form when running against postgres.
return "((%d,%d) >%s (%s,%s))" % (
token.topological, token.stream, inclusive,
return "(chunk_id = %d AND (%d,%d) >%s (%s,%s))" % (
token.chunk, token.topological, token.stream, inclusive,
"topological_ordering", "stream_ordering",
)
return "(%d > %s OR (%d = %s AND %d >%s %s))" % (
return "(chunk_id = %d AND (%d > %s OR (%d = %s AND %d >%s %s)))" % (
token.chunk,
token.topological, "topological_ordering",
token.topological, "topological_ordering",
token.stream, inclusive, "stream_ordering",
@@ -275,7 +278,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
) % (order,)
txn.execute(sql, (room_id, from_id, to_id, limit))
rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
rows = [_EventDictReturn(row[0], None, None, row[1]) for row in txn]
return rows
rows = yield self.runInteraction("get_room_events_stream_for_room", f)
@@ -325,7 +328,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
)
txn.execute(sql, (user_id, from_id, to_id,))
rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
rows = [_EventDictReturn(row[0], None, None, row[1]) for row in txn]
return rows
@@ -392,7 +395,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
end_token = RoomStreamToken.parse(end_token)
rows, token = yield self.runInteraction(
rows, token, _ = yield self.runInteraction(
"get_recent_event_ids_for_room", self._paginate_room_events_txn,
room_id, from_token=end_token, limit=limit,
)
@@ -437,15 +440,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
`room_id` causes it to return the current room specific topological
token.
"""
token = yield self.get_room_max_stream_ordering()
if room_id is None:
defer.returnValue("s%d" % (token,))
token = yield self.get_room_max_stream_ordering()
defer.returnValue(str(RoomStreamToken(None, None, token)))
else:
topo = yield self.runInteraction(
"_get_max_topological_txn", self._get_max_topological_txn,
token = yield self.runInteraction(
"get_room_events_max_id", self._get_topological_token_for_room_txn,
room_id,
)
defer.returnValue("t%d-%d" % (topo, token))
if not token:
raise Exception("Server not in room")
defer.returnValue(str(token))
def get_stream_token_for_event(self, event_id):
"""The stream token for an event
@@ -460,7 +465,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
table="events",
keyvalues={"event_id": event_id},
retcol="stream_ordering",
).addCallback(lambda row: "s%d" % (row,))
).addCallback(lambda row: str(RoomStreamToken(None, None, row)))
def get_topological_token_for_event(self, event_id):
"""The stream token for an event
@@ -469,16 +474,34 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
Raises:
StoreError if the event wasn't in the database.
Returns:
A deferred "t%d-%d" topological token.
A deferred topological token.
"""
return self._simple_select_one(
table="events",
keyvalues={"event_id": event_id},
retcols=("stream_ordering", "topological_ordering"),
retcols=("stream_ordering", "topological_ordering", "chunk_id"),
desc="get_topological_token_for_event",
).addCallback(lambda row: "t%d-%d" % (
row["topological_ordering"], row["stream_ordering"],)
)
).addCallback(lambda row: str(RoomStreamToken(
row["chunk_id"],
row["topological_ordering"],
row["stream_ordering"],
)))
def _get_topological_token_for_room_txn(self, txn, room_id):
sql = """
SELECT chunk_id, topological_ordering, stream_ordering
FROM events
NATURAL JOIN event_forward_extremities
WHERE room_id = ?
ORDER BY stream_ordering DESC
LIMIT 1
"""
txn.execute(sql, (room_id,))
row = txn.fetchone()
if row:
c, t, s = row
return RoomStreamToken(c, t, s)
return None
def get_max_topological_token(self, room_id, stream_key):
sql = (
@@ -515,18 +538,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
null topological_ordering.
"""
for event, row in zip(events, rows):
chunk = row.chunk_id
topo = row.topological_ordering
stream = row.stream_ordering
if topo_order and row.topological_ordering:
topo = row.topological_ordering
else:
topo = None
internal = event.internal_metadata
internal.before = str(RoomStreamToken(topo, stream - 1))
internal.after = str(RoomStreamToken(topo, stream))
internal.order = (
int(topo) if topo else 0,
int(stream),
)
internal.stream_ordering = stream
if topo_order:
internal.before = str(RoomStreamToken(chunk, topo, stream - 1))
internal.after = str(RoomStreamToken(chunk, topo, stream))
else:
internal.before = str(RoomStreamToken(None, None, stream - 1))
internal.after = str(RoomStreamToken(None, None, stream))
@defer.inlineCallbacks
def get_events_around(self, room_id, event_id, before_limit, after_limit):
@@ -586,27 +611,29 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"event_id": event_id,
"room_id": room_id,
},
retcols=["stream_ordering", "topological_ordering"],
retcols=["stream_ordering", "topological_ordering", "chunk_id"],
)
# Paginating backwards includes the event at the token, but paginating
# forward doesn't.
before_token = RoomStreamToken(
results["topological_ordering"] - 1,
results["stream_ordering"],
results["chunk_id"],
results["topological_ordering"],
results["stream_ordering"] - 1,
)
after_token = RoomStreamToken(
results["chunk_id"],
results["topological_ordering"],
results["stream_ordering"],
)
rows, start_token = self._paginate_room_events_txn(
rows, start_token, _ = self._paginate_room_events_txn(
txn, room_id, before_token, direction='b', limit=before_limit,
)
events_before = [r.event_id for r in rows]
rows, end_token = self._paginate_room_events_txn(
rows, end_token, _ = self._paginate_room_events_txn(
txn, room_id, after_token, direction='f', limit=after_limit,
)
events_after = [r.event_id for r in rows]
@@ -689,12 +716,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
those that match the filter.
Returns:
Deferred[tuple[list[_EventDictReturn], str]]: Returns the results
as a list of _EventDictReturn and a token that points to the end
of the result set.
Deferred[tuple[list[_EventDictReturn], str, list[int]]: Returns
the results as a list of _EventDictReturn, a token that points to
the end of the result set, and a list of chunks iterated over.
"""
assert int(limit) >= 0
limit = int(limit) # Sometimes we are passed a string from somewhere
assert limit >= 0
# There are two modes of fetching events: by stream order or by
# topological order. This is determined by whether the from_token is a
# stream or topological token. If stream then we can simply do a select
# ordered by stream_ordering column. If topological, then we need to
# fetch events from one chunk at a time until we hit the limit.
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
@@ -725,10 +759,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
bounds += " AND " + filter_clause
args.extend(filter_args)
args.append(int(limit))
args.append(limit)
sql = (
"SELECT event_id, topological_ordering, stream_ordering"
"SELECT event_id, chunk_id, topological_ordering, stream_ordering"
" FROM events"
" WHERE outlier = ? AND room_id = ? AND %(bounds)s"
" ORDER BY topological_ordering %(order)s,"
@@ -740,9 +774,65 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
txn.execute(sql, args)
rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
rows = [_EventDictReturn(*row) for row in txn]
# If we are paginating topologically and we haven't hit the limit on
# number of events then we need to fetch events from the previous or
# next chunk.
iterated_chunks = []
chunk_id = None
if from_token.chunk: # FIXME: may be topological but no chunk.
if rows:
chunk_id = rows[-1].chunk_id
iterated_chunks = [r.chunk_id for r in rows]
else:
chunk_id = from_token.chunk
iterated_chunks = [chunk_id]
table = ChunkDBOrderedListStore(
txn, room_id, self.clock, self.database_engine,
)
if filter_clause:
filter_clause = "AND " + filter_clause
sql = (
"SELECT event_id, chunk_id, topological_ordering, stream_ordering"
" FROM events"
" WHERE outlier = ? AND room_id = ? %(filter_clause)s"
" ORDER BY topological_ordering %(order)s,"
" stream_ordering %(order)s LIMIT ?"
) % {
"filter_clause": filter_clause,
"order": order,
}
args = [False, room_id] + filter_args + [limit]
while chunk_id and (limit <= 0 or len(rows) < limit):
if chunk_id not in iterated_chunks:
iterated_chunks.append(chunk_id)
if direction == 'b':
chunk_id = table.get_prev(chunk_id)
else:
chunk_id = table.get_next(chunk_id)
if chunk_id is None:
break
txn.execute(sql, args)
new_rows = [_EventDictReturn(*row) for row in txn]
rows.extend(new_rows)
# We may have inserted more rows than necessary in the loop above
rows = rows[:limit]
if rows:
chunk = rows[-1].chunk_id
topo = rows[-1].topological_ordering
toke = rows[-1].stream_ordering
if direction == 'b':
@@ -752,12 +842,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# when we are going backwards so we subtract one from the
# stream part.
toke -= 1
next_token = RoomStreamToken(topo, toke)
next_token = RoomStreamToken(chunk, topo, toke)
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_token if to_token else from_token
return rows, str(next_token),
return rows, str(next_token), iterated_chunks,
@defer.inlineCallbacks
def paginate_room_events(self, room_id, from_key, to_key=None,
@@ -777,18 +867,43 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
those that match the filter.
Returns:
tuple[list[dict], str]: Returns the results as a list of dicts and
a token that points to the end of the result set. The dicts have
the keys "event_id", "topological_ordering" and "stream_orderign".
tuple[list[dict], str, list[str]]: Returns the results as a list of
dicts, a token that points to the end of the result set, and a list
of backwards extremities. The dicts have the keys "event_id",
"topological_ordering" and "stream_ordering".
"""
from_key = RoomStreamToken.parse(from_key)
if to_key:
to_key = RoomStreamToken.parse(to_key)
rows, token = yield self.runInteraction(
"paginate_room_events", self._paginate_room_events_txn,
room_id, from_key, to_key, direction, limit, event_filter,
def _do_paginate_room_events(txn):
rows, token, chunks = self._paginate_room_events_txn(
txn, room_id, from_key, to_key, direction, limit, event_filter,
)
# We now fetch the extremities by fetching the extremities for
# each chunk we iterated over.
extremities = []
seen = set()
for chunk_id in chunks:
if chunk_id in seen:
continue
seen.add(chunk_id)
event_ids = self._simple_select_onecol_txn(
txn,
table="chunk_backwards_extremities",
keyvalues={"chunk_id": chunk_id},
retcol="event_id"
)
extremities.extend(e for e in event_ids if e not in extremities)
return rows, token, extremities
rows, token, extremities = yield self.runInteraction(
"paginate_room_events", _do_paginate_room_events,
)
events = yield self._get_events(
@@ -798,7 +913,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
self._set_before_and_after(events, rows)
defer.returnValue((events, token))
defer.returnValue((events, token, extremities))
class StreamStore(StreamWorkerStore):

View File

@@ -306,7 +306,7 @@ StreamToken.START = StreamToken(
)
class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
class RoomStreamToken(namedtuple("_StreamToken", ("chunk", "topological", "stream"))):
"""Tokens are positions between events. The token "s1" comes after event 1.
s0 s1
@@ -319,14 +319,18 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
When traversing the live event stream events are ordered by when they
arrived at the homeserver.
When traversing historic events the events are ordered by their depth in
the event graph "topological_ordering" and then by when they arrived at the
homeserver "stream_ordering".
When traversing historic events the events are ordered by the topological
ordering of the room graph. This is done using event chunks and the
`topological_ordering` column.
Live tokens start with an "s" followed by the "stream_ordering" id of the
event it comes after. Historic tokens start with a "t" followed by the
"topological_ordering" id of the event it comes after, followed by "-",
followed by the "stream_ordering" id of the event it comes after.
Live tokens start with an 's' and include the stream_ordering of the event
it comes after. Historic tokens start with a 'c' and include the chunk ID,
topological ordering and stream ordering of the event it comes after.
(In previous versions, when chunks were not implemented, the historic tokens
started with 't' and included the topological and stream ordering. These
tokens can be roughly converted to the new format by looking up the chunk
and topological ordering of the event with the same stream ordering).
"""
__slots__ = []
@@ -334,10 +338,19 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
def parse(cls, string):
try:
if string[0] == 's':
return cls(topological=None, stream=int(string[1:]))
if string[0] == 't':
return cls(chunk=None, topological=None, stream=int(string[1:]))
if string[0] == 't': # For backwards compat with older tokens.
parts = string[1:].split('-', 1)
return cls(topological=int(parts[0]), stream=int(parts[1]))
return cls(chunk=None, topological=int(parts[0]), stream=int(parts[1]))
if string[0] == 'c':
# We use '~' as both stream ordering and topological ordering
# can be negative, so we can't use '-'
parts = string[1:].split('~', 2)
return cls(
chunk=int(parts[0]),
topological=int(parts[1]),
stream=int(parts[2]),
)
except Exception:
pass
raise SynapseError(400, "Invalid token %r" % (string,))
@@ -346,12 +359,16 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
def parse_stream_token(cls, string):
try:
if string[0] == 's':
return cls(topological=None, stream=int(string[1:]))
return cls(chunk=None, topological=None, stream=int(string[1:]))
except Exception:
pass
raise SynapseError(400, "Invalid token %r" % (string,))
def __str__(self):
if self.chunk is not None:
# We use '~' as both stream ordering and topological ordering
# can be negative, so we can't use '-'
return "c%d~%d~%d" % (self.chunk, self.topological, self.stream)
if self.topological is not None:
return "t%d-%d" % (self.topological, self.stream)
else:

View File

@@ -0,0 +1,337 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module contains an implementation of the Katriel-Bodlaender algorithm,
which is used to do online topological ordering of graphs.
Note that the ordering derived from the graph is such that the source node of
an edge comes before the target node of the edge, i.e. a graph of A -> B -> C
would produce the ordering [A, B, C].
This ordering is therefore opposite to what one might expect when considering
the room DAG, as newer messages would be added to the start rather than the
end.
***The ChunkDBOrderedListStore therefore inverts the direction of edges***
See:
A tight analysis of the KatrielBodlaender algorithm for online topological
ordering
Hsiao-Fei Liua and Kun-Mao Chao
https://www.sciencedirect.com/science/article/pii/S0304397507006573
and:
Online Topological Ordering
Irit Katriel and Hans L. Bodlaender
http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.78.7933 )
"""
from abc import ABCMeta, abstractmethod
class OrderedListStore(object):
"""An abstract base class that is used to store a graph and maintain a
topological consistent, total ordering.
Internally this uses the Katriel-Bodlaender algorithm, which requires the
store expose an interface for the total ordering that supports:
- Insertion of the node into the ordering either immediately before or
after another node.
- Deletion of the node from the ordering
- Comparing the relative ordering of two arbitary nodes
- Get the node immediately before or after a given node in the ordering
It also needs to be able to interact with the graph in the following ways:
- Query the number of edges from a node in the graph
- Query the number of edges into a node in the graph
- Add an edge to the graph
Users of subclasses should call `add_node` and `add_edge` whenever editing
the graph. The total ordering exposed will remain constant until the next
call to one of these methods.
Note: Calls to `add_node` and `add_edge` cannot overlap, and so callers
should perform some form of locking.
"""
__metaclass__ = ABCMeta
def add_node(self, node_id):
"""Adds a node to the graph.
Args:
node_id (str)
"""
self._insert_before(node_id, None)
def add_edge(self, source, target):
"""Adds a new edge to the graph and updates the ordering.
See module level docs.
Note that both the source and target nodes must have been inserted into
the store (at an arbitrary position) already.
Args:
source (str): The source node of the new edge
target (str): The target node of the new edge
"""
# The following is the Katriel-Bodlaender algorithm.
to_s = []
from_t = []
to_s_neighbours = []
from_t_neighbours = []
to_s_indegree = 0
from_t_outdegree = 0
s = source
t = target
while s and t and not self.is_before(s, t):
m_s = to_s_indegree
m_t = from_t_outdegree
# These functions return a tuple where the first term is a float
# that can be used to order the the list of neighbours.
# These are valid until the next write
pe_s = self.get_nodes_with_edges_to(s)
fe_t = self.get_nodes_with_edges_from(t)
l_s = len(pe_s)
l_t = len(fe_t)
if m_s + l_s <= m_t + l_t:
to_s.append(s)
to_s_neighbours.extend(pe_s)
to_s_indegree += l_s
if to_s_neighbours:
to_s_neighbours.sort()
_, s = to_s_neighbours.pop()
else:
s = None
if m_s + l_s >= m_t + l_t:
from_t.append(t)
from_t_neighbours.extend(fe_t)
from_t_outdegree += l_t
if from_t_neighbours:
from_t_neighbours.sort(reverse=True)
_, t = from_t_neighbours.pop()
else:
t = None
if s is None:
s = self.get_prev(target)
if t is None:
t = self.get_next(source)
while to_s:
s1 = to_s.pop()
self._delete_ordering(s1)
self._insert_after(s1, s)
s = s1
while from_t:
t1 = from_t.pop()
self._delete_ordering(t1)
self._insert_before(t1, t)
t = t1
self._add_edge_to_graph(source, target)
@abstractmethod
def is_before(self, first_node, second_node):
"""Returns whether the first node is before the second node.
Args:
first_node (str)
second_node (str)
Returns:
bool: True if first_node is before second_node
"""
pass
@abstractmethod
def get_prev(self, node_id):
"""Gets the node immediately before the given node in the topological
ordering.
Args:
node_id (str)
Returns:
str|None: A node ID or None if no preceding node exists
"""
pass
@abstractmethod
def get_next(self, node_id):
"""Gets the node immediately after the given node in the topological
ordering.
Args:
node_id (str)
Returns:
str|None: A node ID or None if no proceding node exists
"""
pass
@abstractmethod
def get_nodes_with_edges_to(self, node_id):
"""Get all nodes with edges to the given node
Args:
node_id (str)
Returns:
list[tuple[float, str]]: Returns a list of tuple of an ordering
term and the node ID. The ordering term can be used to sort the
returned list.
The ordering is valid until subsequent calls to `add_edge`
functions
"""
pass
@abstractmethod
def get_nodes_with_edges_from(self, node_id):
"""Get all nodes with edges from the given node
Args:
node_id (str)
Returns:
list[tuple[float, str]]: Returns a list of tuple of an ordering
term and the node ID. The ordering term can be used to sort the
returned list.
The ordering is valid until subsequent calls to `add_edge`
functions
"""
pass
@abstractmethod
def _insert_before(self, node_id, target_id):
"""Inserts node immediately before target node.
If target_id is None then the node is inserted at the end of the list
Args:
node_id (str)
target_id (str|None)
"""
pass
@abstractmethod
def _insert_after(self, node_id, target_id):
"""Inserts node immediately after target node.
If target_id is None then the node is inserted at the start of the list
Args:
node_id (str)
target_id (str|None)
"""
pass
@abstractmethod
def _delete_ordering(self, node_id):
"""Deletes the given node from the ordered list (but not the graph).
Used when we want to reinsert it into a different position
Args:
node_id (str)
"""
pass
@abstractmethod
def _add_edge_to_graph(self, source_id, target_id):
"""Adds an edge to the graph from source to target.
Does not update ordering.
Args:
source_id (str)
target_id (str)
"""
pass
class InMemoryOrderedListStore(OrderedListStore):
"""An in memory OrderedListStore
"""
def __init__(self):
# The ordered list of nodes
self.list = []
# Map from node to set of nodes that it references
self.edges_from = {}
# Map from node to set of nodes that it is referenced by
self.edges_to = {}
def is_before(self, first_node, second_node):
return self.list.index(first_node) < self.list.index(second_node)
def get_prev(self, node_id):
idx = self.list.index(node_id) - 1
if idx >= 0:
return self.list[idx]
else:
return None
def get_next(self, node_id):
idx = self.list.index(node_id) + 1
if idx < len(self.list):
return self.list[idx]
else:
return None
def _insert_before(self, node_id, target_id):
if target_id is not None:
idx = self.list.index(target_id)
self.list.insert(idx, node_id)
else:
self.list.append(node_id)
def _insert_after(self, node_id, target_id):
if target_id is not None:
idx = self.list.index(target_id) + 1
self.list.insert(idx, node_id)
else:
self.list.insert(0, node_id)
def _delete_ordering(self, node_id):
self.list.remove(node_id)
def get_nodes_with_edges_to(self, node_id):
to_nodes = self.edges_to.get(node_id, [])
return [(self.list.index(nid), nid) for nid in to_nodes]
def get_nodes_with_edges_from(self, node_id):
from_nodes = self.edges_from.get(node_id, [])
return [(self.list.index(nid), nid) for nid in from_nodes]
def _add_edge_to_graph(self, source_id, target_id):
self.edges_from.setdefault(source_id, set()).add(target_id)
self.edges_to.setdefault(target_id, set()).add(source_id)

View File

@@ -0,0 +1,302 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
import itertools
import random
import tests.unittest
import tests.utils
from fractions import Fraction
from synapse.storage.chunk_ordered_table import (
ChunkDBOrderedListStore, find_farey_terms, get_fraction_in_range,
)
class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
"""Tests to ensure that the ordering and rebalancing functions of
ChunkDBOrderedListStore work as expected.
"""
def __init__(self, *args, **kwargs):
super(ChunkLinearizerStoreTestCase, self).__init__(*args, **kwargs)
@defer.inlineCallbacks
def setUp(self):
hs = yield tests.utils.setup_test_homeserver()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
@defer.inlineCallbacks
def test_simple_insert_fetch(self):
room_id = "foo_room1"
def test_txn(txn):
table = ChunkDBOrderedListStore(
txn, room_id, self.clock,
self.store.database_engine,
5, 100,
)
table.add_node("A")
table._insert_after("B", "A")
table._insert_before("C", "A")
table._insert_after("D", "A")
sql = """
SELECT chunk_id, numerator, denominator FROM chunk_linearized
WHERE room_id = ?
"""
txn.execute(sql, (room_id,))
ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
ordered = [c for _, c in ordered]
self.assertEqual(["C", "A", "D", "B"], ordered)
yield self.store.runInteraction("test", test_txn)
@defer.inlineCallbacks
def test_many_insert_fetch(self):
room_id = "foo_room2"
def test_txn(txn):
table = ChunkDBOrderedListStore(
txn, room_id, self.clock,
self.store.database_engine,
5, 100,
)
nodes = [(i, "node_%d" % (i,)) for i in xrange(1, 1000)]
expected = [n for _, n in nodes]
already_inserted = []
random.shuffle(nodes)
while nodes:
i, node_id = nodes.pop()
if not already_inserted:
table.add_node(node_id)
else:
for j, target_id in already_inserted:
if j > i:
break
if j < i:
table._insert_after(node_id, target_id)
else:
table._insert_before(node_id, target_id)
already_inserted.append((i, node_id))
already_inserted.sort()
sql = """
SELECT chunk_id, numerator, denominator FROM chunk_linearized
WHERE room_id = ?
"""
txn.execute(sql, (room_id,))
ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
ordered = [c for _, c in ordered]
self.assertEqual(expected, ordered)
yield self.store.runInteraction("test", test_txn)
@defer.inlineCallbacks
def test_prepend_and_append(self):
room_id = "foo_room3"
def test_txn(txn):
table = ChunkDBOrderedListStore(
txn, room_id, self.clock,
self.store.database_engine,
5, 1000,
)
table.add_node("a")
expected = ["a"]
for i in xrange(1, 1000):
node_id = "node_id_before_%d" % i
table._insert_before(node_id, expected[0])
expected.insert(0, node_id)
for i in xrange(1, 1000):
node_id = "node_id_after_%d" % i
table._insert_after(node_id, expected[-1])
expected.append(node_id)
sql = """
SELECT chunk_id, numerator, denominator FROM chunk_linearized
WHERE room_id = ?
"""
txn.execute(sql, (room_id,))
ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
ordered = [c for _, c in ordered]
self.assertEqual(expected, ordered)
yield self.store.runInteraction("test", test_txn)
@defer.inlineCallbacks
def test_worst_case(self):
room_id = "foo_room3"
def test_txn(txn):
table = ChunkDBOrderedListStore(
txn, room_id, self.clock,
self.store.database_engine,
5, 100,
)
table.add_node("a")
prev_node = "a"
expected_prefix = ["a"]
expected_suffix = []
for i in xrange(1, 100):
node_id = "node_id_%d" % i
if i % 2 == 0:
table._insert_before(node_id, prev_node)
expected_prefix.append(node_id)
else:
table._insert_after(node_id, prev_node)
expected_suffix.append(node_id)
prev_node = node_id
sql = """
SELECT chunk_id, numerator, denominator FROM chunk_linearized
WHERE room_id = ?
"""
txn.execute(sql, (room_id,))
ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
ordered = [c for _, c in ordered]
expected = expected_prefix + list(reversed(expected_suffix))
self.assertEqual(expected, ordered)
yield self.store.runInteraction("test", test_txn)
@defer.inlineCallbacks
def test_get_edges_to(self):
room_id = "foo_room4"
def test_txn(txn):
table = ChunkDBOrderedListStore(
txn, room_id, self.clock,
self.store.database_engine,
5, 100,
)
table.add_node("A")
table._insert_after("B", "A")
table._add_edge_to_graph("A", "B")
table._insert_before("C", "A")
table._add_edge_to_graph("C", "A")
nodes = table.get_nodes_with_edges_from("A")
self.assertEqual([n for _, n in nodes], ["B"])
nodes = table.get_nodes_with_edges_to("A")
self.assertEqual([n for _, n in nodes], ["C"])
yield self.store.runInteraction("test", test_txn)
@defer.inlineCallbacks
def test_get_next_and_prev(self):
room_id = "foo_room5"
def test_txn(txn):
table = ChunkDBOrderedListStore(
txn, room_id, self.clock,
self.store.database_engine,
5, 100,
)
table.add_node("A")
table._insert_after("B", "A")
table._insert_before("C", "A")
self.assertEqual(table.get_next("A"), "B")
self.assertEqual(table.get_prev("A"), "C")
yield self.store.runInteraction("test", test_txn)
def test_find_farey_terms(self):
def _test(min_frac, max_denom):
""""Calls `find_farey_terms` with given values and checks they
are neighbours in the Farey Sequence.
"""
a, b, c, d = find_farey_terms(min_frac, max_denom)
p = Fraction(a, b)
q = Fraction(c, d)
assert min_frac < p < q
for x, y in _pairwise(_farey_generator(max_denom)):
if min_frac < x < y:
self.assertEqual(x, p)
self.assertEqual(y, q)
break
_test(Fraction(5, 3), 12)
_test(Fraction(1, 3), 12)
_test(Fraction(1, 2), 9)
_test(Fraction(1, 2), 10)
_test(Fraction(1, 2), 15)
def test_get_fraction_in_range(self):
def _test(x, y):
assert x < get_fraction_in_range(x, y) < y
_test(Fraction(1, 2), Fraction(2, 3))
_test(Fraction(1, 2), Fraction(3, 2))
_test(Fraction(5, 203), Fraction(6, 204))
def _farey_generator(n):
"""Generates Farey sequence of order `n`.
Note that this doesn't terminate.
Taken from https://en.wikipedia.org/wiki/Farey_sequence#Next_term
"""
a, b, c, d = 0, 1, 1, n
yield Fraction(a, b)
while True:
k = int((n + b) / d)
a, b, c, d = c, d, (k * c - a), (k * d - b)
yield Fraction(a, b)
def _pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = itertools.tee(iterable)
next(b, None)
return itertools.izip(a, b)

View File

@@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.util.katriel_bodlaender import InMemoryOrderedListStore
from tests import unittest
class KatrielBodlaenderTests(unittest.TestCase):
def test_simple_graph(self):
store = InMemoryOrderedListStore()
nodes = [
"node_1",
"node_2",
"node_3",
"node_4",
]
for node in nodes:
store.add_node(node)
store.add_edge("node_2", "node_3")
store.add_edge("node_1", "node_2")
store.add_edge("node_3", "node_4")
self.assertEqual(nodes, store.list)
def test_reverse_graph(self):
store = InMemoryOrderedListStore()
nodes = [
"node_1",
"node_2",
"node_3",
"node_4",
]
for node in nodes:
store.add_node(node)
store.add_edge("node_3", "node_2")
store.add_edge("node_2", "node_1")
store.add_edge("node_4", "node_3")
self.assertEqual(list(reversed(nodes)), store.list)
def test_divergent_graph(self):
store = InMemoryOrderedListStore()
nodes = [
"node_1",
"node_2",
"node_3",
"node_4",
"node_5",
"node_6",
]
for node in reversed(nodes):
store.add_node(node)
store.add_edge("node_2", "node_3")
store.add_edge("node_2", "node_5")
store.add_edge("node_1", "node_2")
store.add_edge("node_3", "node_4")
store.add_edge("node_1", "node_3")
store.add_edge("node_4", "node_5")
store.add_edge("node_5", "node_6")
store.add_edge("node_4", "node_6")
self.assertEqual(nodes, store.list)