mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-13 01:50:46 +00:00
Compare commits
35 Commits
madlittlem
...
erikj/chun
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
918a5055ff | ||
|
|
2d97fb6740 | ||
|
|
fbafc86aca | ||
|
|
9eaf69a386 | ||
|
|
c33810d9cc | ||
|
|
58aadd3dd4 | ||
|
|
e7bb34b72a | ||
|
|
9e7cf48461 | ||
|
|
5bf4fa0fc4 | ||
|
|
80a877e9d9 | ||
|
|
47b36e9a02 | ||
|
|
b671e57759 | ||
|
|
bf599cdba1 | ||
|
|
6188512b18 | ||
|
|
867132f28c | ||
|
|
384731330d | ||
|
|
9e1d3f119a | ||
|
|
f687d8fae2 | ||
|
|
ecd4931ab2 | ||
|
|
1cdd0d3b0d | ||
|
|
1810cc3f7e | ||
|
|
6c1d13a15a | ||
|
|
13dbcafb9b | ||
|
|
bcc9e7f777 | ||
|
|
6e11803ed3 | ||
|
|
0a325e5385 | ||
|
|
b725e128f8 | ||
|
|
0504d809fd | ||
|
|
12fd6d7688 | ||
|
|
a638649254 | ||
|
|
d4e4a7344f | ||
|
|
c771c124d5 | ||
|
|
3369354b56 | ||
|
|
3b505a80dc | ||
|
|
943f1029d6 |
@@ -1,3 +1,11 @@
|
|||||||
|
Changes in <unreleased>
|
||||||
|
=======================
|
||||||
|
|
||||||
|
This release adds an index to the events table. This means that on first
|
||||||
|
startup there will be an inceased amount of IO until the index is created, and
|
||||||
|
an increase in disk usage.
|
||||||
|
|
||||||
|
|
||||||
Changes in synapse v0.30.0 (2018-05-24)
|
Changes in synapse v0.30.0 (2018-05-24)
|
||||||
==========================================
|
==========================================
|
||||||
|
|
||||||
@@ -53,7 +61,6 @@ Bug Fixes:
|
|||||||
* Fix error in handling receipts (PR #3235)
|
* Fix error in handling receipts (PR #3235)
|
||||||
* Stop the transaction cache caching failures (PR #3255)
|
* Stop the transaction cache caching failures (PR #3255)
|
||||||
|
|
||||||
|
|
||||||
Changes in synapse v0.29.1 (2018-05-17)
|
Changes in synapse v0.29.1 (2018-05-17)
|
||||||
==========================================
|
==========================================
|
||||||
Changes:
|
Changes:
|
||||||
|
|||||||
@@ -235,7 +235,7 @@ class MessageHandler(BaseHandler):
|
|||||||
room_id, max_topo
|
room_id, max_topo
|
||||||
)
|
)
|
||||||
|
|
||||||
events, next_key = yield self.store.paginate_room_events(
|
events, next_key, extremities = yield self.store.paginate_room_events(
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
from_key=source_config.from_key,
|
from_key=source_config.from_key,
|
||||||
to_key=source_config.to_key,
|
to_key=source_config.to_key,
|
||||||
|
|||||||
@@ -514,7 +514,8 @@ class RoomEventSource(object):
|
|||||||
events = list(room_events)
|
events = list(room_events)
|
||||||
events.extend(e for evs, _ in room_to_events.values() for e in evs)
|
events.extend(e for evs, _ in room_to_events.values() for e in evs)
|
||||||
|
|
||||||
events.sort(key=lambda e: e.internal_metadata.order)
|
# Order by the stream ordering of the events.
|
||||||
|
events.sort(key=lambda e: e.internal_metadata.stream_ordering)
|
||||||
|
|
||||||
if limit:
|
if limit:
|
||||||
events[:] = events[:limit]
|
events[:] = events[:limit]
|
||||||
@@ -534,7 +535,7 @@ class RoomEventSource(object):
|
|||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_pagination_rows(self, user, config, key):
|
def get_pagination_rows(self, user, config, key):
|
||||||
events, next_key = yield self.store.paginate_room_events(
|
events, next_key, _ = yield self.store.paginate_room_events(
|
||||||
room_id=key,
|
room_id=key,
|
||||||
from_key=config.from_key,
|
from_key=config.from_key,
|
||||||
to_key=config.to_key,
|
to_key=config.to_key,
|
||||||
|
|||||||
@@ -131,6 +131,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||||||
self._group_updates_id_gen = StreamIdGenerator(
|
self._group_updates_id_gen = StreamIdGenerator(
|
||||||
db_conn, "local_group_updates", "stream_id",
|
db_conn, "local_group_updates", "stream_id",
|
||||||
)
|
)
|
||||||
|
self._chunk_id_gen = IdGenerator(db_conn, "events", "chunk_id")
|
||||||
|
|
||||||
if isinstance(self.database_engine, PostgresEngine):
|
if isinstance(self.database_engine, PostgresEngine):
|
||||||
self._cache_id_gen = StreamIdGenerator(
|
self._cache_id_gen = StreamIdGenerator(
|
||||||
|
|||||||
577
synapse/storage/chunk_ordered_table.py
Normal file
577
synapse/storage/chunk_ordered_table.py
Normal file
@@ -0,0 +1,577 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2018 New Vector Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import math
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from fractions import Fraction
|
||||||
|
|
||||||
|
from synapse.storage._base import SQLBaseStore
|
||||||
|
from synapse.storage.engines import PostgresEngine
|
||||||
|
from synapse.util.katriel_bodlaender import OrderedListStore
|
||||||
|
|
||||||
|
import synapse.metrics
|
||||||
|
|
||||||
|
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||||
|
rebalance_counter = metrics.register_counter("rebalances")
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ChunkDBOrderedListStore(OrderedListStore):
|
||||||
|
"""Used as the list store for room chunks, efficiently maintaining them in
|
||||||
|
topological order on updates.
|
||||||
|
|
||||||
|
A room chunk is a connected portion of the room events DAG. Chunks are
|
||||||
|
constructed so that they have the additional property that for all events in
|
||||||
|
the chunk, either all of their prev_events are in that chunk or none of them
|
||||||
|
are. This ensures that no event that is subsequently received needs to be
|
||||||
|
inserted into the middle of a chunk, since it cannot both reference an event
|
||||||
|
in the chunk and be referenced by an event in the chunk (assuming no
|
||||||
|
cycles).
|
||||||
|
|
||||||
|
As such the set of chunks in a room inherits a DAG, i.e. if an event in one
|
||||||
|
chunk references an event in a second chunk, then we say that the first
|
||||||
|
chunk references the second, and thus forming a DAG. (This means that chunks
|
||||||
|
start off disconnected until an event is received that connects the two
|
||||||
|
chunks.)
|
||||||
|
|
||||||
|
We can therefore end up with multiple chunks in a room when the server
|
||||||
|
misses some events, e.g. due to the server being offline for a time.
|
||||||
|
|
||||||
|
The server may only have a subset of all events in a room, in which case
|
||||||
|
its possible for the server to have chunks that are unconnected from each
|
||||||
|
other. The ordering between unconnected chunks is arbitrary.
|
||||||
|
|
||||||
|
The class is designed for use inside transactions and so takes a
|
||||||
|
transaction object in the constructor. This means that it needs to be
|
||||||
|
re-instantiated in each transaction, so all state needs to be stored
|
||||||
|
in the database.
|
||||||
|
|
||||||
|
Internally the ordering is implemented using a linked list and assigning
|
||||||
|
each chunk a fraction. `get_next` and `get_prev` are implemented via linked
|
||||||
|
lists, and comparisons implemented using the fractions. When inserting
|
||||||
|
chunks fractions are picked such that their denominator is the smallest
|
||||||
|
possible. However, if the denominators grow too big then a rebalancing has
|
||||||
|
to take place to reduce the denominators; see `_rebalance` for details.
|
||||||
|
|
||||||
|
Note that OrderedListStore orders nodes such that source of an edge
|
||||||
|
comes before the target. This is counter intuitive when edges represent
|
||||||
|
causality, so for the purposes of ordering algorithm we invert the edge
|
||||||
|
directions, i.e. if chunk A has a prev chunk of B then we say that the
|
||||||
|
edge is from B to A. This ensures that newer chunks get inserted at the
|
||||||
|
end (rather than the start).
|
||||||
|
|
||||||
|
Note: Calls to `add_node` and `add_edge` cannot overlap for the same room,
|
||||||
|
and so callers should perform some form of per-room locking when using
|
||||||
|
this class.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
txn
|
||||||
|
room_id (str)
|
||||||
|
clock
|
||||||
|
database_engine
|
||||||
|
rebalance_max_denominator (int): When a rebalance is triggered we
|
||||||
|
replace existing orders with those that have a denominator smaller
|
||||||
|
or equal to this
|
||||||
|
max_denominator (int): A rebalance is triggered when a node has an
|
||||||
|
ordering with a denominator greater than this
|
||||||
|
"""
|
||||||
|
def __init__(self,
|
||||||
|
txn, room_id, clock, database_engine,
|
||||||
|
rebalance_max_denominator=100,
|
||||||
|
max_denominator=100000):
|
||||||
|
self.txn = txn
|
||||||
|
self.room_id = room_id
|
||||||
|
self.clock = clock
|
||||||
|
self.database_engine = database_engine
|
||||||
|
|
||||||
|
self.rebalance_md = rebalance_max_denominator
|
||||||
|
self.max_denominator = max_denominator
|
||||||
|
|
||||||
|
def is_before(self, a, b):
|
||||||
|
"""Implements OrderedListStore"""
|
||||||
|
return self._get_order(a) < self._get_order(b)
|
||||||
|
|
||||||
|
def get_prev(self, node_id):
|
||||||
|
"""Implements OrderedListStore"""
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
SELECT chunk_id FROM chunk_linearized
|
||||||
|
WHERE next_chunk_id = ?
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.txn.execute(sql, (node_id,))
|
||||||
|
|
||||||
|
row = self.txn.fetchone()
|
||||||
|
if row:
|
||||||
|
return row[0]
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_next(self, node_id):
|
||||||
|
"""Implements OrderedListStore"""
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
SELECT next_chunk_id FROM chunk_linearized
|
||||||
|
WHERE chunk_id = ?
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.txn.execute(sql, (node_id,))
|
||||||
|
|
||||||
|
row = self.txn.fetchone()
|
||||||
|
if row:
|
||||||
|
return row[0]
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _insert_before(self, node_id, target_id):
|
||||||
|
"""Implements OrderedListStore"""
|
||||||
|
|
||||||
|
rebalance = False # Set to true if we need to trigger a rebalance
|
||||||
|
|
||||||
|
if target_id:
|
||||||
|
before_id = self.get_prev(target_id)
|
||||||
|
if before_id:
|
||||||
|
new_order = self._insert_between(node_id, before_id, target_id)
|
||||||
|
else:
|
||||||
|
new_order = self._insert_at_start(node_id, target_id)
|
||||||
|
else:
|
||||||
|
# If target_id is None then we insert at the end.
|
||||||
|
self.txn.execute("""
|
||||||
|
SELECT chunk_id
|
||||||
|
FROM chunk_linearized
|
||||||
|
WHERE room_id = ? AND next_chunk_id is NULL
|
||||||
|
""", (self.room_id,))
|
||||||
|
|
||||||
|
row = self.txn.fetchone()
|
||||||
|
if row:
|
||||||
|
new_order = self._insert_at_end(node_id, row[0])
|
||||||
|
else:
|
||||||
|
new_order = self._insert_first(node_id)
|
||||||
|
|
||||||
|
rebalance = new_order.denominator > self.max_denominator
|
||||||
|
|
||||||
|
if rebalance:
|
||||||
|
self._rebalance(node_id)
|
||||||
|
|
||||||
|
def _insert_after(self, node_id, target_id):
|
||||||
|
"""Implements OrderedListStore"""
|
||||||
|
|
||||||
|
rebalance = False # Set to true if we need to trigger a rebalance
|
||||||
|
|
||||||
|
next_chunk_id = None
|
||||||
|
if target_id:
|
||||||
|
next_chunk_id = self.get_next(target_id)
|
||||||
|
if next_chunk_id:
|
||||||
|
new_order = self._insert_between(node_id, target_id, next_chunk_id)
|
||||||
|
else:
|
||||||
|
new_order = self._insert_at_end(node_id, target_id)
|
||||||
|
else:
|
||||||
|
# If target_id is None then we insert at the start.
|
||||||
|
self.txn.execute("""
|
||||||
|
SELECT chunk_id
|
||||||
|
FROM chunk_linearized
|
||||||
|
NATURAL JOIN chunk_linearized_first
|
||||||
|
WHERE room_id = ?
|
||||||
|
""", (self.room_id,))
|
||||||
|
|
||||||
|
row = self.txn.fetchone()
|
||||||
|
if row:
|
||||||
|
new_order = self._insert_at_start(node_id, row[0])
|
||||||
|
else:
|
||||||
|
new_order = self._insert_first(node_id)
|
||||||
|
|
||||||
|
rebalance = new_order.denominator > self.max_denominator
|
||||||
|
|
||||||
|
if rebalance:
|
||||||
|
self._rebalance(node_id)
|
||||||
|
|
||||||
|
def _insert_between(self, node_id, left_id, right_id):
|
||||||
|
"""Inserts node between given existing nodes.
|
||||||
|
"""
|
||||||
|
|
||||||
|
left_order = self._get_order(left_id)
|
||||||
|
right_order = self._get_order(right_id)
|
||||||
|
|
||||||
|
assert left_order < right_order
|
||||||
|
|
||||||
|
new_order = get_fraction_in_range(left_order, right_order)
|
||||||
|
|
||||||
|
SQLBaseStore._simple_update_one_txn(
|
||||||
|
self.txn,
|
||||||
|
table="chunk_linearized",
|
||||||
|
keyvalues={"chunk_id": left_id},
|
||||||
|
updatevalues={"next_chunk_id": node_id},
|
||||||
|
)
|
||||||
|
|
||||||
|
SQLBaseStore._simple_insert_txn(
|
||||||
|
self.txn,
|
||||||
|
table="chunk_linearized",
|
||||||
|
values={
|
||||||
|
"chunk_id": node_id,
|
||||||
|
"room_id": self.room_id,
|
||||||
|
"next_chunk_id": right_id,
|
||||||
|
"numerator": int(new_order.numerator),
|
||||||
|
"denominator": int(new_order.denominator),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return new_order
|
||||||
|
|
||||||
|
def _insert_at_end(self, node_id, last_id):
|
||||||
|
"""Inserts node at the end using existing last node.
|
||||||
|
"""
|
||||||
|
|
||||||
|
last_order = self._get_order(last_id)
|
||||||
|
new_order = Fraction(int(math.ceil(last_order)) + 1, 1)
|
||||||
|
|
||||||
|
SQLBaseStore._simple_update_one_txn(
|
||||||
|
self.txn,
|
||||||
|
table="chunk_linearized",
|
||||||
|
keyvalues={"chunk_id": last_id},
|
||||||
|
updatevalues={"next_chunk_id": node_id},
|
||||||
|
)
|
||||||
|
|
||||||
|
SQLBaseStore._simple_insert_txn(
|
||||||
|
self.txn,
|
||||||
|
table="chunk_linearized",
|
||||||
|
values={
|
||||||
|
"chunk_id": node_id,
|
||||||
|
"room_id": self.room_id,
|
||||||
|
"next_chunk_id": None,
|
||||||
|
"numerator": int(new_order.numerator),
|
||||||
|
"denominator": int(new_order.denominator),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return new_order
|
||||||
|
|
||||||
|
def _insert_at_start(self, node_id, first_id):
|
||||||
|
"""Inserts node at the start using existing first node.
|
||||||
|
"""
|
||||||
|
|
||||||
|
first_order = self._get_order(first_id)
|
||||||
|
new_order = get_fraction_in_range(0, first_order)
|
||||||
|
|
||||||
|
SQLBaseStore._simple_update_one_txn(
|
||||||
|
self.txn,
|
||||||
|
table="chunk_linearized_first",
|
||||||
|
keyvalues={"room_id": self.room_id},
|
||||||
|
updatevalues={"chunk_id": node_id},
|
||||||
|
)
|
||||||
|
|
||||||
|
SQLBaseStore._simple_insert_txn(
|
||||||
|
self.txn,
|
||||||
|
table="chunk_linearized",
|
||||||
|
values={
|
||||||
|
"chunk_id": node_id,
|
||||||
|
"room_id": self.room_id,
|
||||||
|
"next_chunk_id": first_id,
|
||||||
|
"numerator": int(new_order.numerator),
|
||||||
|
"denominator": int(new_order.denominator),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return new_order
|
||||||
|
|
||||||
|
def _insert_first(self, node_id):
|
||||||
|
"""Inserts the first node for this room.
|
||||||
|
"""
|
||||||
|
|
||||||
|
SQLBaseStore._simple_insert_txn(
|
||||||
|
self.txn,
|
||||||
|
table="chunk_linearized_first",
|
||||||
|
values={
|
||||||
|
"room_id": self.room_id,
|
||||||
|
"chunk_id": node_id,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
SQLBaseStore._simple_insert_txn(
|
||||||
|
self.txn,
|
||||||
|
table="chunk_linearized",
|
||||||
|
values={
|
||||||
|
"chunk_id": node_id,
|
||||||
|
"room_id": self.room_id,
|
||||||
|
"next_chunk_id": None,
|
||||||
|
"numerator": 1,
|
||||||
|
"denominator": 1,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return Fraction(1, 1)
|
||||||
|
|
||||||
|
def get_nodes_with_edges_to(self, node_id):
|
||||||
|
"""Implements OrderedListStore"""
|
||||||
|
|
||||||
|
# Note that we use the inverse relation here
|
||||||
|
sql = """
|
||||||
|
SELECT l.chunk_id, l.numerator, l.denominator FROM chunk_graph AS g
|
||||||
|
INNER JOIN chunk_linearized AS l ON g.prev_id = l.chunk_id
|
||||||
|
WHERE g.chunk_id = ?
|
||||||
|
"""
|
||||||
|
self.txn.execute(sql, (node_id,))
|
||||||
|
return [(Fraction(n, d), c) for c, n, d in self.txn]
|
||||||
|
|
||||||
|
def get_nodes_with_edges_from(self, node_id):
|
||||||
|
"""Implements OrderedListStore"""
|
||||||
|
|
||||||
|
# Note that we use the inverse relation here
|
||||||
|
sql = """
|
||||||
|
SELECT l.chunk_id, l.numerator, l.denominator FROM chunk_graph AS g
|
||||||
|
INNER JOIN chunk_linearized AS l ON g.chunk_id = l.chunk_id
|
||||||
|
WHERE g.prev_id = ?
|
||||||
|
"""
|
||||||
|
self.txn.execute(sql, (node_id,))
|
||||||
|
return [(Fraction(n, d), c) for c, n, d in self.txn]
|
||||||
|
|
||||||
|
def _delete_ordering(self, node_id):
|
||||||
|
"""Implements OrderedListStore"""
|
||||||
|
|
||||||
|
next_chunk_id = SQLBaseStore._simple_select_one_onecol_txn(
|
||||||
|
self.txn,
|
||||||
|
table="chunk_linearized",
|
||||||
|
keyvalues={
|
||||||
|
"chunk_id": node_id,
|
||||||
|
},
|
||||||
|
retcol="next_chunk_id",
|
||||||
|
)
|
||||||
|
|
||||||
|
SQLBaseStore._simple_delete_txn(
|
||||||
|
self.txn,
|
||||||
|
table="chunk_linearized",
|
||||||
|
keyvalues={"chunk_id": node_id},
|
||||||
|
)
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
UPDATE chunk_linearized SET next_chunk_id = ?
|
||||||
|
WHERE next_chunk_id = ?
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.txn.execute(sql, (next_chunk_id, node_id,))
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
UPDATE chunk_linearized_first SET chunk_id = ?
|
||||||
|
WHERE chunk_id = ?
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.txn.execute(sql, (next_chunk_id, node_id,))
|
||||||
|
|
||||||
|
def _add_edge_to_graph(self, source_id, target_id):
|
||||||
|
"""Implements OrderedListStore"""
|
||||||
|
|
||||||
|
# Note that we use the inverse relation
|
||||||
|
SQLBaseStore._simple_insert_txn(
|
||||||
|
self.txn,
|
||||||
|
table="chunk_graph",
|
||||||
|
values={"chunk_id": target_id, "prev_id": source_id}
|
||||||
|
)
|
||||||
|
|
||||||
|
def _get_order(self, node_id):
|
||||||
|
"""Get the ordering of the given node.
|
||||||
|
"""
|
||||||
|
|
||||||
|
row = SQLBaseStore._simple_select_one_txn(
|
||||||
|
self.txn,
|
||||||
|
table="chunk_linearized",
|
||||||
|
keyvalues={"chunk_id": node_id},
|
||||||
|
retcols=("numerator", "denominator",),
|
||||||
|
)
|
||||||
|
return Fraction(row["numerator"], row["denominator"])
|
||||||
|
|
||||||
|
def _rebalance(self, node_id):
|
||||||
|
"""Rebalances the list around the given node to ensure that the
|
||||||
|
ordering denominators aren't too big.
|
||||||
|
|
||||||
|
This is done by starting at the given chunk and generating new orders
|
||||||
|
based on a Farey sequence of order `self.rebalance_md` for all
|
||||||
|
subsequent chunks that have an order less than that of the ordering
|
||||||
|
generated by the Farey sequence.
|
||||||
|
|
||||||
|
For example say we have chunks (and orders): A (23/90), B (24/91) and
|
||||||
|
C (2/3), and we have rebalance_md set to 5, a rebalancing would produce:
|
||||||
|
|
||||||
|
A: 23/90 -> 1/3
|
||||||
|
B: 24/91 -> 2/5
|
||||||
|
C: 2/3 (no change)
|
||||||
|
|
||||||
|
Since the farey sequence is 1/5, 1/4, 1/3, 2/5, 1/2, ... and 1/3 is the
|
||||||
|
smallest term greater than 23/90.
|
||||||
|
|
||||||
|
Note that we've extended Farey Sequence to be infinite by repeating the
|
||||||
|
sequence with an added integer. For example sequence with order 3:
|
||||||
|
|
||||||
|
0/1, 1/3, 2/3, 1/1, 4/3, 5/3, 2/1, 7/3, ...
|
||||||
|
"""
|
||||||
|
|
||||||
|
logger.info("Rebalancing room %s, chunk %s", self.room_id, node_id)
|
||||||
|
|
||||||
|
old_order = self._get_order(node_id)
|
||||||
|
|
||||||
|
a, b, c, d = find_farey_terms(old_order, self.rebalance_md)
|
||||||
|
assert old_order < Fraction(a, b)
|
||||||
|
assert b + d > self.rebalance_md
|
||||||
|
|
||||||
|
# Since we can easily produce farey sequence terms with an iterative
|
||||||
|
# algorithm, we can use WITH RECURSIVE to do so. This is less clear
|
||||||
|
# than doing it in python, but saves us being killed by the RTT to the
|
||||||
|
# DB if we need to rebalance a large number of nodes.
|
||||||
|
with_sql = """
|
||||||
|
WITH RECURSIVE chunks (chunk_id, next, n, a, b, c, d) AS (
|
||||||
|
SELECT chunk_id, next_chunk_id, ?, ?, ?, ?, ?
|
||||||
|
FROM chunk_linearized WHERE chunk_id = ?
|
||||||
|
UNION ALL
|
||||||
|
SELECT n.chunk_id, n.next_chunk_id, n,
|
||||||
|
c, d, ((n + b) / d) * c - a, ((n + b) / d) * d - b
|
||||||
|
FROM chunks AS c
|
||||||
|
INNER JOIN chunk_linearized AS l ON l.chunk_id = c.chunk_id
|
||||||
|
INNER JOIN chunk_linearized AS n ON n.chunk_id = l.next_chunk_id
|
||||||
|
WHERE c * 1.0 / d > n.numerator * 1.0 / n.denominator
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Annoyingly, postgres 9.4 doesn't support the standard SQL subquery
|
||||||
|
# syntax for updates.
|
||||||
|
if isinstance(self.database_engine, PostgresEngine):
|
||||||
|
sql = with_sql + """
|
||||||
|
UPDATE chunk_linearized AS l
|
||||||
|
SET numerator = a, denominator = b
|
||||||
|
FROM chunks AS c
|
||||||
|
WHERE c.chunk_id = l.chunk_id
|
||||||
|
"""
|
||||||
|
else:
|
||||||
|
sql = with_sql + """
|
||||||
|
UPDATE chunk_linearized
|
||||||
|
SET (numerator, denominator) = (
|
||||||
|
SELECT a, b FROM chunks
|
||||||
|
WHERE chunks.chunk_id = chunk_linearized.chunk_id
|
||||||
|
)
|
||||||
|
WHERE chunk_id in (SELECT chunk_id FROM chunks)
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.txn.execute(sql, (
|
||||||
|
self.rebalance_md, a, b, c, d, node_id
|
||||||
|
))
|
||||||
|
|
||||||
|
logger.info("Rebalanced %d chunks in room %s", self.txn.rowcount, self.room_id)
|
||||||
|
|
||||||
|
rebalance_counter.inc()
|
||||||
|
|
||||||
|
|
||||||
|
def get_fraction_in_range(min_frac, max_frac):
|
||||||
|
"""Gets a fraction in between the given numbers.
|
||||||
|
|
||||||
|
Uses Stern-Brocot tree to generate the fraction with the smallest
|
||||||
|
denominator.
|
||||||
|
|
||||||
|
See https://en.wikipedia.org/wiki/Stern%E2%80%93Brocot_tree
|
||||||
|
|
||||||
|
Args:
|
||||||
|
min_frac (numbers.Rational)
|
||||||
|
max_frac (numbers.Rational)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
numbers.Rational
|
||||||
|
"""
|
||||||
|
|
||||||
|
assert 0 <= min_frac < max_frac
|
||||||
|
|
||||||
|
# If the determinant is 1 then the fraction with smallest numerator and
|
||||||
|
# denominator in the range is the mediant, so we don't have to use the
|
||||||
|
# stern brocot tree to search for it.
|
||||||
|
determinant = (
|
||||||
|
min_frac.denominator * max_frac.numerator
|
||||||
|
- min_frac.numerator * max_frac.denominator
|
||||||
|
)
|
||||||
|
|
||||||
|
if determinant == 1:
|
||||||
|
return Fraction(
|
||||||
|
min_frac.numerator + max_frac.numerator,
|
||||||
|
min_frac.denominator + max_frac.denominator,
|
||||||
|
)
|
||||||
|
|
||||||
|
# This works by tracking two fractions a/b and c/d and repeatedly replacing
|
||||||
|
# one of them with their mediant, depending on if the mediant is smaller
|
||||||
|
# or greater than the specified range.
|
||||||
|
a, b, c, d = 0, 1, 1, 0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
f = Fraction(a + c, b + d)
|
||||||
|
|
||||||
|
if f <= min_frac:
|
||||||
|
a, b, c, d = a + c, b + d, c, d
|
||||||
|
elif min_frac < f < max_frac:
|
||||||
|
return f
|
||||||
|
else:
|
||||||
|
a, b, c, d = a, b, a + c, b + d
|
||||||
|
|
||||||
|
|
||||||
|
def find_farey_terms(min_frac, max_denom):
|
||||||
|
"""Find the smallest pair of fractions that are part of the Farey sequence
|
||||||
|
of order `max_denom` (the ordered sequence of all fraction with denominator
|
||||||
|
less than or equal to max_denom).
|
||||||
|
|
||||||
|
This is useful as it can be fed into a simple iterative algorithm to
|
||||||
|
generate subsequent entries in the sequence.
|
||||||
|
|
||||||
|
A pair of fractions a/b, c/d are neighbours in the sequence of order
|
||||||
|
max(b, d) if and only if their determinant is one, i.e. bc - ad = 1. Note
|
||||||
|
that the next order sequence is generate by taking the mediants of the
|
||||||
|
previous order, so a/b and c/d are neighbours in all sequences with orders
|
||||||
|
between max(b, d) and b + d.
|
||||||
|
|
||||||
|
We can therefore use the Stern-Brocot tree to find the closest pair of
|
||||||
|
fractions to min_frac such that b + d is strictly greater than max_denom,
|
||||||
|
since all neighbouring fractions in Stern-Brocot satisfy the necessary
|
||||||
|
determinant property.
|
||||||
|
|
||||||
|
Note that we've extended Farey Sequence to be infinite by repeating the
|
||||||
|
sequence with an added integer. For example sequence with order 3:
|
||||||
|
|
||||||
|
0/1, 1/3, 2/3, 1/1, 4/3, 5/3, 2/1, 7/3, ...
|
||||||
|
|
||||||
|
See https://en.wikipedia.org/wiki/Farey_sequence
|
||||||
|
|
||||||
|
Args:
|
||||||
|
min_frac (numbers.Rational)
|
||||||
|
max_frac (int)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple[int, int, int, int]
|
||||||
|
"""
|
||||||
|
|
||||||
|
a, b, c, d = 0, 1, 1, 0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
cur_frac = Fraction(a + c, b + d)
|
||||||
|
|
||||||
|
if b + d > max_denom:
|
||||||
|
break
|
||||||
|
|
||||||
|
if cur_frac <= min_frac:
|
||||||
|
a, b, c, d = a + c, b + d, c, d
|
||||||
|
elif min_frac < cur_frac:
|
||||||
|
a, b, c, d = a, b, a + c, b + d
|
||||||
|
|
||||||
|
# a/b may be smaller than min_frac, so we run the algorithm to generate
|
||||||
|
# next Farey sequence terms until a/b is strictly greater than min_frac
|
||||||
|
while Fraction(a, b) <= min_frac:
|
||||||
|
k = int((max_denom + b) / d)
|
||||||
|
a, b, c, d = c, d, k * c - a, k * d - b
|
||||||
|
|
||||||
|
assert min_frac < Fraction(a, b) < Fraction(c, d)
|
||||||
|
assert b * c - a * d == 1
|
||||||
|
|
||||||
|
return a, b, c, d
|
||||||
@@ -23,6 +23,7 @@ import simplejson as json
|
|||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.storage.events_worker import EventsWorkerStore
|
from synapse.storage.events_worker import EventsWorkerStore
|
||||||
|
from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore
|
||||||
from synapse.util.async import ObservableDeferred
|
from synapse.util.async import ObservableDeferred
|
||||||
from synapse.util.frozenutils import frozendict_json_encoder
|
from synapse.util.frozenutils import frozendict_json_encoder
|
||||||
from synapse.util.logcontext import (
|
from synapse.util.logcontext import (
|
||||||
@@ -201,6 +202,7 @@ def _retry_on_integrity_error(func):
|
|||||||
class EventsStore(EventsWorkerStore):
|
class EventsStore(EventsWorkerStore):
|
||||||
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
|
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
|
||||||
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
|
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
|
||||||
|
EVENT_FIELDS_CHUNK = "event_fields_chunk_id"
|
||||||
|
|
||||||
def __init__(self, db_conn, hs):
|
def __init__(self, db_conn, hs):
|
||||||
super(EventsStore, self).__init__(db_conn, hs)
|
super(EventsStore, self).__init__(db_conn, hs)
|
||||||
@@ -232,6 +234,20 @@ class EventsStore(EventsWorkerStore):
|
|||||||
psql_only=True,
|
psql_only=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self.register_background_index_update(
|
||||||
|
"events_chunk_index",
|
||||||
|
index_name="events_chunk_index",
|
||||||
|
table="events",
|
||||||
|
columns=["room_id", "chunk_id", "topological_ordering", "stream_ordering"],
|
||||||
|
unique=True,
|
||||||
|
psql_only=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.register_background_update_handler(
|
||||||
|
self.EVENT_FIELDS_CHUNK,
|
||||||
|
self._background_compute_chunks,
|
||||||
|
)
|
||||||
|
|
||||||
self._event_persist_queue = _EventPeristenceQueue()
|
self._event_persist_queue = _EventPeristenceQueue()
|
||||||
|
|
||||||
self._state_resolution_handler = hs.get_state_resolution_handler()
|
self._state_resolution_handler = hs.get_state_resolution_handler()
|
||||||
@@ -1010,13 +1026,20 @@ class EventsStore(EventsWorkerStore):
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
sql = (
|
chunk_id, topo = self._insert_into_chunk_txn(
|
||||||
"UPDATE events SET outlier = ?"
|
txn, event.room_id, event.event_id,
|
||||||
" WHERE event_id = ?"
|
[eid for eid, _ in event.prev_events],
|
||||||
)
|
)
|
||||||
txn.execute(
|
|
||||||
sql,
|
self._simple_update_txn(
|
||||||
(False, event.event_id,)
|
txn,
|
||||||
|
table="events",
|
||||||
|
keyvalues={"event_id": event.event_id},
|
||||||
|
updatevalues={
|
||||||
|
"outlier": False,
|
||||||
|
"chunk_id": chunk_id,
|
||||||
|
"topological_ordering": topo,
|
||||||
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
# Update the event_backward_extremities table now that this
|
# Update the event_backward_extremities table now that this
|
||||||
@@ -1099,13 +1122,22 @@ class EventsStore(EventsWorkerStore):
|
|||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
self._simple_insert_many_txn(
|
for event, _ in events_and_contexts:
|
||||||
txn,
|
if event.internal_metadata.is_outlier():
|
||||||
table="events",
|
chunk_id, topo = None, 0
|
||||||
values=[
|
else:
|
||||||
{
|
chunk_id, topo = self._insert_into_chunk_txn(
|
||||||
|
txn, event.room_id, event.event_id,
|
||||||
|
[eid for eid, _ in event.prev_events],
|
||||||
|
)
|
||||||
|
|
||||||
|
self._simple_insert_txn(
|
||||||
|
txn,
|
||||||
|
table="events",
|
||||||
|
values={
|
||||||
"stream_ordering": event.internal_metadata.stream_ordering,
|
"stream_ordering": event.internal_metadata.stream_ordering,
|
||||||
"topological_ordering": event.depth,
|
"chunk_id": chunk_id,
|
||||||
|
"topological_ordering": topo,
|
||||||
"depth": event.depth,
|
"depth": event.depth,
|
||||||
"event_id": event.event_id,
|
"event_id": event.event_id,
|
||||||
"room_id": event.room_id,
|
"room_id": event.room_id,
|
||||||
@@ -1120,10 +1152,8 @@ class EventsStore(EventsWorkerStore):
|
|||||||
"url" in event.content
|
"url" in event.content
|
||||||
and isinstance(event.content["url"], basestring)
|
and isinstance(event.content["url"], basestring)
|
||||||
),
|
),
|
||||||
}
|
},
|
||||||
for event, _ in events_and_contexts
|
)
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
def _store_rejected_events_txn(self, txn, events_and_contexts):
|
def _store_rejected_events_txn(self, txn, events_and_contexts):
|
||||||
"""Add rows to the 'rejections' table for received events which were
|
"""Add rows to the 'rejections' table for received events which were
|
||||||
@@ -1335,6 +1365,177 @@ class EventsStore(EventsWorkerStore):
|
|||||||
(event.event_id, event.redacts)
|
(event.event_id, event.redacts)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _insert_into_chunk_txn(self, txn, room_id, event_id, prev_event_ids):
|
||||||
|
"""Computes the chunk ID and topological ordering for an event and
|
||||||
|
handles updating chunk_graph table.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
txn,
|
||||||
|
room_id (str)
|
||||||
|
event_id (str)
|
||||||
|
prev_event_ids (list[str])
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple[int, int]: Returns the chunk_id, topological_ordering for
|
||||||
|
the event
|
||||||
|
"""
|
||||||
|
|
||||||
|
# We calculate the chunk for an event using the following rules:
|
||||||
|
#
|
||||||
|
# 1. If all prev events have the same chunk ID then use that chunk ID
|
||||||
|
# 2. If we have none of the prev events but do have events pointing to
|
||||||
|
# the event, then we use their chunk ID if:
|
||||||
|
# - They're all in the same chunk, and
|
||||||
|
# - All their prev events match the events being inserted
|
||||||
|
# 3. Otherwise, create a new chunk and use that
|
||||||
|
|
||||||
|
# Set of chunks that the event refers to. Includes None if there were
|
||||||
|
# prev events that we don't have (or don't have a chunk for)
|
||||||
|
prev_chunk_ids = set()
|
||||||
|
|
||||||
|
for eid in prev_event_ids:
|
||||||
|
chunk_id = self._simple_select_one_onecol_txn(
|
||||||
|
txn,
|
||||||
|
table="events",
|
||||||
|
keyvalues={"event_id": eid},
|
||||||
|
retcol="chunk_id",
|
||||||
|
allow_none=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
prev_chunk_ids.add(chunk_id)
|
||||||
|
|
||||||
|
forward_events = self._simple_select_onecol_txn(
|
||||||
|
txn,
|
||||||
|
table="event_edges",
|
||||||
|
keyvalues={
|
||||||
|
"prev_event_id": event_id,
|
||||||
|
"is_state": False,
|
||||||
|
},
|
||||||
|
retcol="event_id",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Set of chunks that refer to this event.
|
||||||
|
forward_chunk_ids = set()
|
||||||
|
|
||||||
|
# All the prev_events of events in `forward_events`.
|
||||||
|
# Note that this will include the current event_id.
|
||||||
|
sibling_events = set()
|
||||||
|
for eid in forward_events:
|
||||||
|
chunk_id = self._simple_select_one_onecol_txn(
|
||||||
|
txn,
|
||||||
|
table="events",
|
||||||
|
keyvalues={"event_id": eid},
|
||||||
|
retcol="chunk_id",
|
||||||
|
allow_none=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
if chunk_id is not None:
|
||||||
|
# chunk_id can be None if it's an outlier
|
||||||
|
forward_chunk_ids.add(chunk_id)
|
||||||
|
|
||||||
|
pes = self._simple_select_onecol_txn(
|
||||||
|
txn,
|
||||||
|
table="event_edges",
|
||||||
|
keyvalues={
|
||||||
|
"event_id": eid,
|
||||||
|
"is_state": False,
|
||||||
|
},
|
||||||
|
retcol="prev_event_id",
|
||||||
|
)
|
||||||
|
|
||||||
|
sibling_events.update(pes)
|
||||||
|
|
||||||
|
table = ChunkDBOrderedListStore(
|
||||||
|
txn, room_id, self.clock, self.database_engine,
|
||||||
|
)
|
||||||
|
|
||||||
|
# If there is only one previous chunk (and that isn't None), then this
|
||||||
|
# satisfies condition one.
|
||||||
|
if len(prev_chunk_ids) == 1 and None not in prev_chunk_ids:
|
||||||
|
chunk_id = list(prev_chunk_ids)[0]
|
||||||
|
|
||||||
|
# This event is being inserted at the end of the chunk
|
||||||
|
new_topo = self._simple_select_one_onecol_txn(
|
||||||
|
txn,
|
||||||
|
table="events",
|
||||||
|
keyvalues={
|
||||||
|
"room_id": room_id,
|
||||||
|
"chunk_id": chunk_id,
|
||||||
|
},
|
||||||
|
retcol="MAX(topological_ordering)",
|
||||||
|
)
|
||||||
|
new_topo += 1
|
||||||
|
|
||||||
|
# If there is only one forward chunk and only one sibling event (which
|
||||||
|
# would be the given event), then this satisfies condition two.
|
||||||
|
elif len(forward_chunk_ids) == 1 and len(sibling_events) == 1:
|
||||||
|
chunk_id = list(forward_chunk_ids)[0]
|
||||||
|
|
||||||
|
# This event is being inserted at the start of the chunk
|
||||||
|
new_topo = self._simple_select_one_onecol_txn(
|
||||||
|
txn,
|
||||||
|
table="events",
|
||||||
|
keyvalues={
|
||||||
|
"room_id": room_id,
|
||||||
|
"chunk_id": chunk_id,
|
||||||
|
},
|
||||||
|
retcol="MIN(topological_ordering)",
|
||||||
|
)
|
||||||
|
new_topo -= 1
|
||||||
|
else:
|
||||||
|
chunk_id = self._chunk_id_gen.get_next()
|
||||||
|
new_topo = 0
|
||||||
|
|
||||||
|
# We've generated a new chunk, so we have to tell the
|
||||||
|
# ChunkDBOrderedListStore about that.
|
||||||
|
table.add_node(chunk_id)
|
||||||
|
|
||||||
|
# We need to now update the database with any new edges between chunks
|
||||||
|
current_prev_ids = self._simple_select_onecol_txn(
|
||||||
|
txn,
|
||||||
|
table="chunk_graph",
|
||||||
|
keyvalues={
|
||||||
|
"chunk_id": chunk_id,
|
||||||
|
},
|
||||||
|
retcol="prev_id",
|
||||||
|
)
|
||||||
|
|
||||||
|
current_forward_ids = self._simple_select_onecol_txn(
|
||||||
|
txn,
|
||||||
|
table="chunk_graph",
|
||||||
|
keyvalues={
|
||||||
|
"prev_id": chunk_id,
|
||||||
|
},
|
||||||
|
retcol="chunk_id",
|
||||||
|
)
|
||||||
|
|
||||||
|
for pid in prev_chunk_ids:
|
||||||
|
if pid is not None and pid not in current_prev_ids and pid != chunk_id:
|
||||||
|
# Note that the edge direction is reversed than what you might
|
||||||
|
# expect. See ChunkDBOrderedListStore for more details.
|
||||||
|
table.add_edge(pid, chunk_id)
|
||||||
|
|
||||||
|
for fid in forward_chunk_ids:
|
||||||
|
# Note that the edge direction is reversed than what you might
|
||||||
|
# expect. See ChunkDBOrderedListStore for more details.
|
||||||
|
if fid not in current_forward_ids and fid != chunk_id:
|
||||||
|
table.add_edge(chunk_id, fid)
|
||||||
|
|
||||||
|
# We now need to update the backwards extremities for the chunks.
|
||||||
|
|
||||||
|
txn.executemany("""
|
||||||
|
INSERT INTO chunk_backwards_extremities (chunk_id, event_id)
|
||||||
|
SELECT ?, ? WHERE ? NOT IN (SELECT event_id FROM events)
|
||||||
|
""", [(chunk_id, eid, eid) for eid in prev_event_ids])
|
||||||
|
|
||||||
|
self._simple_delete_txn(
|
||||||
|
txn,
|
||||||
|
table="chunk_backwards_extremities",
|
||||||
|
keyvalues={"event_id": event_id},
|
||||||
|
)
|
||||||
|
|
||||||
|
return chunk_id, new_topo
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def have_events_in_timeline(self, event_ids):
|
def have_events_in_timeline(self, event_ids):
|
||||||
"""Given a list of event ids, check if we have already processed and
|
"""Given a list of event ids, check if we have already processed and
|
||||||
@@ -1628,6 +1829,72 @@ class EventsStore(EventsWorkerStore):
|
|||||||
|
|
||||||
defer.returnValue(result)
|
defer.returnValue(result)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _background_compute_chunks(self, progress, batch_size):
|
||||||
|
"""Iterates over events and assigns them chunk IDs
|
||||||
|
"""
|
||||||
|
|
||||||
|
up_to_stream_id = progress.get("up_to_stream_id")
|
||||||
|
if up_to_stream_id is None:
|
||||||
|
up_to_stream_id = self.get_current_events_token() + 1
|
||||||
|
|
||||||
|
rows_inserted = progress.get("rows_inserted", 0)
|
||||||
|
|
||||||
|
def reindex_chunks_txn(txn):
|
||||||
|
txn.execute("""
|
||||||
|
SELECT stream_ordering, room_id, event_id FROM events
|
||||||
|
WHERE stream_ordering < ? AND outlier = ? AND chunk_id IS NULL
|
||||||
|
ORDER BY stream_ordering DESC
|
||||||
|
LIMIT ?
|
||||||
|
""", (up_to_stream_id, False, batch_size))
|
||||||
|
|
||||||
|
rows = txn.fetchall()
|
||||||
|
|
||||||
|
stream_ordering = up_to_stream_id
|
||||||
|
for stream_ordering, room_id, event_id in rows:
|
||||||
|
prev_events = self._simple_select_onecol_txn(
|
||||||
|
txn,
|
||||||
|
table="event_edges",
|
||||||
|
keyvalues={
|
||||||
|
"event_id": event_id,
|
||||||
|
},
|
||||||
|
retcol="prev_event_id",
|
||||||
|
)
|
||||||
|
|
||||||
|
chunk_id, topo = self._insert_into_chunk_txn(
|
||||||
|
txn, room_id, event_id, prev_events,
|
||||||
|
)
|
||||||
|
|
||||||
|
self._simple_update_txn(
|
||||||
|
txn,
|
||||||
|
table="events",
|
||||||
|
keyvalues={"event_id": event_id},
|
||||||
|
updatevalues={
|
||||||
|
"chunk_id": chunk_id,
|
||||||
|
"topological_ordering": topo,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
progress = {
|
||||||
|
"up_to_stream_id": stream_ordering,
|
||||||
|
"rows_inserted": rows_inserted + len(rows)
|
||||||
|
}
|
||||||
|
|
||||||
|
self._background_update_progress_txn(
|
||||||
|
txn, self.EVENT_FIELDS_CHUNK, progress
|
||||||
|
)
|
||||||
|
|
||||||
|
return len(rows)
|
||||||
|
|
||||||
|
result = yield self.runInteraction(
|
||||||
|
self.EVENT_FIELDS_CHUNK, reindex_chunks_txn
|
||||||
|
)
|
||||||
|
|
||||||
|
if not result:
|
||||||
|
yield self._end_background_update(self.EVENT_FIELDS_CHUNK)
|
||||||
|
|
||||||
|
defer.returnValue(result)
|
||||||
|
|
||||||
def get_current_backfill_token(self):
|
def get_current_backfill_token(self):
|
||||||
"""The current minimum token that backfilled events have reached"""
|
"""The current minimum token that backfilled events have reached"""
|
||||||
return -self._backfill_id_gen.get_current_token()
|
return -self._backfill_id_gen.get_current_token()
|
||||||
|
|||||||
150
synapse/storage/schema/delta/49/event_chunks.py
Normal file
150
synapse/storage/schema/delta/49/event_chunks.py
Normal file
@@ -0,0 +1,150 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2018 New Vector Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from synapse.storage._base import SQLBaseStore, LoggingTransaction
|
||||||
|
from synapse.storage.prepare_database import get_statements
|
||||||
|
|
||||||
|
SQL = """
|
||||||
|
|
||||||
|
ALTER TABLE events ADD COLUMN chunk_id BIGINT;
|
||||||
|
|
||||||
|
-- FIXME: Add index on contains_url
|
||||||
|
|
||||||
|
INSERT INTO background_updates (update_name, progress_json) VALUES
|
||||||
|
('events_chunk_index', '{}');
|
||||||
|
|
||||||
|
-- Stores how chunks of graph relate to each other
|
||||||
|
CREATE TABLE chunk_graph (
|
||||||
|
chunk_id BIGINT NOT NULL,
|
||||||
|
prev_id BIGINT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE UNIQUE INDEX chunk_graph_id ON chunk_graph (chunk_id, prev_id);
|
||||||
|
CREATE INDEX chunk_graph_prev_id ON chunk_graph (prev_id);
|
||||||
|
|
||||||
|
-- The extremities in each chunk. Note that these are pointing to events that
|
||||||
|
-- we don't have, rather than boundary between chunks.
|
||||||
|
CREATE TABLE chunk_backwards_extremities (
|
||||||
|
chunk_id BIGINT NOT NULL,
|
||||||
|
event_id TEXT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX chunk_backwards_extremities_id ON chunk_backwards_extremities(
|
||||||
|
chunk_id, event_id
|
||||||
|
);
|
||||||
|
CREATE INDEX chunk_backwards_extremities_event_id ON chunk_backwards_extremities(
|
||||||
|
event_id
|
||||||
|
);
|
||||||
|
|
||||||
|
-- Maintains an absolute ordering of chunks. Gets updated when we see new
|
||||||
|
-- edges between chunks.
|
||||||
|
CREATE TABLE chunk_linearized (
|
||||||
|
chunk_id BIGINT NOT NULL,
|
||||||
|
room_id TEXT NOT NULL,
|
||||||
|
next_chunk_id BIGINT, -- The chunk directly after this chunk, or NULL if last chunk
|
||||||
|
numerator BIGINT NOT NULL,
|
||||||
|
denominator BIGINT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE UNIQUE INDEX chunk_linearized_id ON chunk_linearized (chunk_id);
|
||||||
|
CREATE UNIQUE INDEX chunk_linearized_next_id ON chunk_linearized (
|
||||||
|
next_chunk_id, room_id
|
||||||
|
);
|
||||||
|
|
||||||
|
-- Records the first chunk in a room.
|
||||||
|
CREATE TABLE chunk_linearized_first (
|
||||||
|
chunk_id BIGINT NOT NULL,
|
||||||
|
room_id TEXT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE UNIQUE INDEX chunk_linearized_first_id ON chunk_linearized_first (room_id);
|
||||||
|
|
||||||
|
INSERT into background_updates (update_name, progress_json)
|
||||||
|
VALUES ('event_fields_chunk_id', '{}');
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def run_create(cur, database_engine, *args, **kwargs):
|
||||||
|
for statement in get_statements(SQL.splitlines()):
|
||||||
|
cur.execute(statement)
|
||||||
|
|
||||||
|
txn = LoggingTransaction(
|
||||||
|
cur, "schema_update", database_engine, [], [],
|
||||||
|
)
|
||||||
|
|
||||||
|
rows = SQLBaseStore._simple_select_list_txn(
|
||||||
|
txn,
|
||||||
|
table="event_forward_extremities",
|
||||||
|
keyvalues={},
|
||||||
|
retcols=("event_id", "room_id",),
|
||||||
|
)
|
||||||
|
|
||||||
|
next_chunk_id = 1
|
||||||
|
room_to_next_order = {}
|
||||||
|
prev_chunks_by_room = {}
|
||||||
|
|
||||||
|
for row in rows:
|
||||||
|
chunk_id = next_chunk_id
|
||||||
|
next_chunk_id += 1
|
||||||
|
|
||||||
|
room_id = row["room_id"]
|
||||||
|
event_id = row["event_id"]
|
||||||
|
|
||||||
|
SQLBaseStore._simple_update_txn(
|
||||||
|
txn,
|
||||||
|
table="events",
|
||||||
|
keyvalues={"room_id": room_id, "event_id": event_id},
|
||||||
|
updatevalues={"chunk_id": chunk_id},
|
||||||
|
)
|
||||||
|
|
||||||
|
ordering = room_to_next_order.get(room_id, 1)
|
||||||
|
room_to_next_order[room_id] = ordering + 1
|
||||||
|
|
||||||
|
prev_chunks = prev_chunks_by_room.setdefault(room_id, [])
|
||||||
|
|
||||||
|
SQLBaseStore._simple_insert_txn(
|
||||||
|
txn,
|
||||||
|
table="chunk_linearized",
|
||||||
|
values={
|
||||||
|
"chunk_id": chunk_id,
|
||||||
|
"room_id": row["room_id"],
|
||||||
|
"numerator": ordering,
|
||||||
|
"denominator": 1,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
if prev_chunks:
|
||||||
|
SQLBaseStore._simple_update_one_txn(
|
||||||
|
txn,
|
||||||
|
table="chunk_linearized",
|
||||||
|
keyvalues={"chunk_id": prev_chunks[-1]},
|
||||||
|
updatevalues={"next_chunk_id": chunk_id},
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
SQLBaseStore._simple_insert_txn(
|
||||||
|
txn,
|
||||||
|
table="chunk_linearized_first",
|
||||||
|
values={
|
||||||
|
"chunk_id": chunk_id,
|
||||||
|
"room_id": row["room_id"],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
prev_chunks.append(chunk_id)
|
||||||
|
|
||||||
|
|
||||||
|
def run_upgrade(*args, **kwargs):
|
||||||
|
pass
|
||||||
@@ -14,7 +14,12 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS events(
|
CREATE TABLE IF NOT EXISTS events(
|
||||||
|
-- Defines an ordering used to stream new events to clients. Events
|
||||||
|
-- fetched via backfill have negative values.
|
||||||
stream_ordering INTEGER PRIMARY KEY,
|
stream_ordering INTEGER PRIMARY KEY,
|
||||||
|
-- Defines a topological ordering of events within a chunk
|
||||||
|
-- (The concept of a chunk was added in later schemas, this used to
|
||||||
|
-- be set to the same value as the `depth` field in an event)
|
||||||
topological_ordering BIGINT NOT NULL,
|
topological_ordering BIGINT NOT NULL,
|
||||||
event_id TEXT NOT NULL,
|
event_id TEXT NOT NULL,
|
||||||
type TEXT NOT NULL,
|
type TEXT NOT NULL,
|
||||||
|
|||||||
@@ -41,6 +41,7 @@ from synapse.storage.events import EventsWorkerStore
|
|||||||
from synapse.types import RoomStreamToken
|
from synapse.types import RoomStreamToken
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||||
|
from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore
|
||||||
from synapse.storage.engines import PostgresEngine
|
from synapse.storage.engines import PostgresEngine
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
@@ -62,24 +63,25 @@ _TOPOLOGICAL_TOKEN = "topological"
|
|||||||
|
|
||||||
# Used as return values for pagination APIs
|
# Used as return values for pagination APIs
|
||||||
_EventDictReturn = namedtuple("_EventDictReturn", (
|
_EventDictReturn = namedtuple("_EventDictReturn", (
|
||||||
"event_id", "topological_ordering", "stream_ordering",
|
"event_id", "chunk_id", "topological_ordering", "stream_ordering",
|
||||||
))
|
))
|
||||||
|
|
||||||
|
|
||||||
def lower_bound(token, engine, inclusive=False):
|
def lower_bound(token, engine, inclusive=False):
|
||||||
inclusive = "=" if inclusive else ""
|
inclusive = "=" if inclusive else ""
|
||||||
if token.topological is None:
|
if token.chunk is None:
|
||||||
return "(%d <%s %s)" % (token.stream, inclusive, "stream_ordering")
|
return "(%d <%s %s)" % (token.stream, inclusive, "stream_ordering")
|
||||||
else:
|
else:
|
||||||
if isinstance(engine, PostgresEngine):
|
if isinstance(engine, PostgresEngine):
|
||||||
# Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well
|
# Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well
|
||||||
# as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we
|
# as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we
|
||||||
# use the later form when running against postgres.
|
# use the later form when running against postgres.
|
||||||
return "((%d,%d) <%s (%s,%s))" % (
|
return "(chunk_id = %d AND (%d,%d) <%s (%s,%s))" % (
|
||||||
token.topological, token.stream, inclusive,
|
token.chunk, token.topological, token.stream, inclusive,
|
||||||
"topological_ordering", "stream_ordering",
|
"topological_ordering", "stream_ordering",
|
||||||
)
|
)
|
||||||
return "(%d < %s OR (%d = %s AND %d <%s %s))" % (
|
return "(chunk_id = %d AND (%d < %s OR (%d = %s AND %d <%s %s)))" % (
|
||||||
|
token.chunk,
|
||||||
token.topological, "topological_ordering",
|
token.topological, "topological_ordering",
|
||||||
token.topological, "topological_ordering",
|
token.topological, "topological_ordering",
|
||||||
token.stream, inclusive, "stream_ordering",
|
token.stream, inclusive, "stream_ordering",
|
||||||
@@ -88,18 +90,19 @@ def lower_bound(token, engine, inclusive=False):
|
|||||||
|
|
||||||
def upper_bound(token, engine, inclusive=True):
|
def upper_bound(token, engine, inclusive=True):
|
||||||
inclusive = "=" if inclusive else ""
|
inclusive = "=" if inclusive else ""
|
||||||
if token.topological is None:
|
if token.chunk is None:
|
||||||
return "(%d >%s %s)" % (token.stream, inclusive, "stream_ordering")
|
return "(%d >%s %s)" % (token.stream, inclusive, "stream_ordering")
|
||||||
else:
|
else:
|
||||||
if isinstance(engine, PostgresEngine):
|
if isinstance(engine, PostgresEngine):
|
||||||
# Postgres doesn't optimise ``(x > a) OR (x=a AND y>b)`` as well
|
# Postgres doesn't optimise ``(x > a) OR (x=a AND y>b)`` as well
|
||||||
# as it optimises ``(x,y) > (a,b)`` on multicolumn indexes. So we
|
# as it optimises ``(x,y) > (a,b)`` on multicolumn indexes. So we
|
||||||
# use the later form when running against postgres.
|
# use the later form when running against postgres.
|
||||||
return "((%d,%d) >%s (%s,%s))" % (
|
return "(chunk_id = %d AND (%d,%d) >%s (%s,%s))" % (
|
||||||
token.topological, token.stream, inclusive,
|
token.chunk, token.topological, token.stream, inclusive,
|
||||||
"topological_ordering", "stream_ordering",
|
"topological_ordering", "stream_ordering",
|
||||||
)
|
)
|
||||||
return "(%d > %s OR (%d = %s AND %d >%s %s))" % (
|
return "(chunk_id = %d AND (%d > %s OR (%d = %s AND %d >%s %s)))" % (
|
||||||
|
token.chunk,
|
||||||
token.topological, "topological_ordering",
|
token.topological, "topological_ordering",
|
||||||
token.topological, "topological_ordering",
|
token.topological, "topological_ordering",
|
||||||
token.stream, inclusive, "stream_ordering",
|
token.stream, inclusive, "stream_ordering",
|
||||||
@@ -275,7 +278,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||||||
) % (order,)
|
) % (order,)
|
||||||
txn.execute(sql, (room_id, from_id, to_id, limit))
|
txn.execute(sql, (room_id, from_id, to_id, limit))
|
||||||
|
|
||||||
rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
|
rows = [_EventDictReturn(row[0], None, None, row[1]) for row in txn]
|
||||||
return rows
|
return rows
|
||||||
|
|
||||||
rows = yield self.runInteraction("get_room_events_stream_for_room", f)
|
rows = yield self.runInteraction("get_room_events_stream_for_room", f)
|
||||||
@@ -325,7 +328,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||||||
)
|
)
|
||||||
txn.execute(sql, (user_id, from_id, to_id,))
|
txn.execute(sql, (user_id, from_id, to_id,))
|
||||||
|
|
||||||
rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
|
rows = [_EventDictReturn(row[0], None, None, row[1]) for row in txn]
|
||||||
|
|
||||||
return rows
|
return rows
|
||||||
|
|
||||||
@@ -392,7 +395,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||||||
|
|
||||||
end_token = RoomStreamToken.parse(end_token)
|
end_token = RoomStreamToken.parse(end_token)
|
||||||
|
|
||||||
rows, token = yield self.runInteraction(
|
rows, token, _ = yield self.runInteraction(
|
||||||
"get_recent_event_ids_for_room", self._paginate_room_events_txn,
|
"get_recent_event_ids_for_room", self._paginate_room_events_txn,
|
||||||
room_id, from_token=end_token, limit=limit,
|
room_id, from_token=end_token, limit=limit,
|
||||||
)
|
)
|
||||||
@@ -437,15 +440,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||||||
`room_id` causes it to return the current room specific topological
|
`room_id` causes it to return the current room specific topological
|
||||||
token.
|
token.
|
||||||
"""
|
"""
|
||||||
token = yield self.get_room_max_stream_ordering()
|
|
||||||
if room_id is None:
|
if room_id is None:
|
||||||
defer.returnValue("s%d" % (token,))
|
token = yield self.get_room_max_stream_ordering()
|
||||||
|
defer.returnValue(str(RoomStreamToken(None, None, token)))
|
||||||
else:
|
else:
|
||||||
topo = yield self.runInteraction(
|
token = yield self.runInteraction(
|
||||||
"_get_max_topological_txn", self._get_max_topological_txn,
|
"get_room_events_max_id", self._get_topological_token_for_room_txn,
|
||||||
room_id,
|
room_id,
|
||||||
)
|
)
|
||||||
defer.returnValue("t%d-%d" % (topo, token))
|
if not token:
|
||||||
|
raise Exception("Server not in room")
|
||||||
|
defer.returnValue(str(token))
|
||||||
|
|
||||||
def get_stream_token_for_event(self, event_id):
|
def get_stream_token_for_event(self, event_id):
|
||||||
"""The stream token for an event
|
"""The stream token for an event
|
||||||
@@ -460,7 +465,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||||||
table="events",
|
table="events",
|
||||||
keyvalues={"event_id": event_id},
|
keyvalues={"event_id": event_id},
|
||||||
retcol="stream_ordering",
|
retcol="stream_ordering",
|
||||||
).addCallback(lambda row: "s%d" % (row,))
|
).addCallback(lambda row: str(RoomStreamToken(None, None, row)))
|
||||||
|
|
||||||
def get_topological_token_for_event(self, event_id):
|
def get_topological_token_for_event(self, event_id):
|
||||||
"""The stream token for an event
|
"""The stream token for an event
|
||||||
@@ -469,16 +474,34 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||||||
Raises:
|
Raises:
|
||||||
StoreError if the event wasn't in the database.
|
StoreError if the event wasn't in the database.
|
||||||
Returns:
|
Returns:
|
||||||
A deferred "t%d-%d" topological token.
|
A deferred topological token.
|
||||||
"""
|
"""
|
||||||
return self._simple_select_one(
|
return self._simple_select_one(
|
||||||
table="events",
|
table="events",
|
||||||
keyvalues={"event_id": event_id},
|
keyvalues={"event_id": event_id},
|
||||||
retcols=("stream_ordering", "topological_ordering"),
|
retcols=("stream_ordering", "topological_ordering", "chunk_id"),
|
||||||
desc="get_topological_token_for_event",
|
desc="get_topological_token_for_event",
|
||||||
).addCallback(lambda row: "t%d-%d" % (
|
).addCallback(lambda row: str(RoomStreamToken(
|
||||||
row["topological_ordering"], row["stream_ordering"],)
|
row["chunk_id"],
|
||||||
)
|
row["topological_ordering"],
|
||||||
|
row["stream_ordering"],
|
||||||
|
)))
|
||||||
|
|
||||||
|
def _get_topological_token_for_room_txn(self, txn, room_id):
|
||||||
|
sql = """
|
||||||
|
SELECT chunk_id, topological_ordering, stream_ordering
|
||||||
|
FROM events
|
||||||
|
NATURAL JOIN event_forward_extremities
|
||||||
|
WHERE room_id = ?
|
||||||
|
ORDER BY stream_ordering DESC
|
||||||
|
LIMIT 1
|
||||||
|
"""
|
||||||
|
txn.execute(sql, (room_id,))
|
||||||
|
row = txn.fetchone()
|
||||||
|
if row:
|
||||||
|
c, t, s = row
|
||||||
|
return RoomStreamToken(c, t, s)
|
||||||
|
return None
|
||||||
|
|
||||||
def get_max_topological_token(self, room_id, stream_key):
|
def get_max_topological_token(self, room_id, stream_key):
|
||||||
sql = (
|
sql = (
|
||||||
@@ -515,18 +538,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||||||
null topological_ordering.
|
null topological_ordering.
|
||||||
"""
|
"""
|
||||||
for event, row in zip(events, rows):
|
for event, row in zip(events, rows):
|
||||||
|
chunk = row.chunk_id
|
||||||
|
topo = row.topological_ordering
|
||||||
stream = row.stream_ordering
|
stream = row.stream_ordering
|
||||||
if topo_order and row.topological_ordering:
|
|
||||||
topo = row.topological_ordering
|
|
||||||
else:
|
|
||||||
topo = None
|
|
||||||
internal = event.internal_metadata
|
internal = event.internal_metadata
|
||||||
internal.before = str(RoomStreamToken(topo, stream - 1))
|
|
||||||
internal.after = str(RoomStreamToken(topo, stream))
|
internal.stream_ordering = stream
|
||||||
internal.order = (
|
|
||||||
int(topo) if topo else 0,
|
if topo_order:
|
||||||
int(stream),
|
internal.before = str(RoomStreamToken(chunk, topo, stream - 1))
|
||||||
)
|
internal.after = str(RoomStreamToken(chunk, topo, stream))
|
||||||
|
else:
|
||||||
|
internal.before = str(RoomStreamToken(None, None, stream - 1))
|
||||||
|
internal.after = str(RoomStreamToken(None, None, stream))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_events_around(self, room_id, event_id, before_limit, after_limit):
|
def get_events_around(self, room_id, event_id, before_limit, after_limit):
|
||||||
@@ -586,27 +611,29 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||||||
"event_id": event_id,
|
"event_id": event_id,
|
||||||
"room_id": room_id,
|
"room_id": room_id,
|
||||||
},
|
},
|
||||||
retcols=["stream_ordering", "topological_ordering"],
|
retcols=["stream_ordering", "topological_ordering", "chunk_id"],
|
||||||
)
|
)
|
||||||
|
|
||||||
# Paginating backwards includes the event at the token, but paginating
|
# Paginating backwards includes the event at the token, but paginating
|
||||||
# forward doesn't.
|
# forward doesn't.
|
||||||
before_token = RoomStreamToken(
|
before_token = RoomStreamToken(
|
||||||
results["topological_ordering"] - 1,
|
results["chunk_id"],
|
||||||
results["stream_ordering"],
|
results["topological_ordering"],
|
||||||
|
results["stream_ordering"] - 1,
|
||||||
)
|
)
|
||||||
|
|
||||||
after_token = RoomStreamToken(
|
after_token = RoomStreamToken(
|
||||||
|
results["chunk_id"],
|
||||||
results["topological_ordering"],
|
results["topological_ordering"],
|
||||||
results["stream_ordering"],
|
results["stream_ordering"],
|
||||||
)
|
)
|
||||||
|
|
||||||
rows, start_token = self._paginate_room_events_txn(
|
rows, start_token, _ = self._paginate_room_events_txn(
|
||||||
txn, room_id, before_token, direction='b', limit=before_limit,
|
txn, room_id, before_token, direction='b', limit=before_limit,
|
||||||
)
|
)
|
||||||
events_before = [r.event_id for r in rows]
|
events_before = [r.event_id for r in rows]
|
||||||
|
|
||||||
rows, end_token = self._paginate_room_events_txn(
|
rows, end_token, _ = self._paginate_room_events_txn(
|
||||||
txn, room_id, after_token, direction='f', limit=after_limit,
|
txn, room_id, after_token, direction='f', limit=after_limit,
|
||||||
)
|
)
|
||||||
events_after = [r.event_id for r in rows]
|
events_after = [r.event_id for r in rows]
|
||||||
@@ -689,12 +716,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||||||
those that match the filter.
|
those that match the filter.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Deferred[tuple[list[_EventDictReturn], str]]: Returns the results
|
Deferred[tuple[list[_EventDictReturn], str, list[int]]: Returns
|
||||||
as a list of _EventDictReturn and a token that points to the end
|
the results as a list of _EventDictReturn, a token that points to
|
||||||
of the result set.
|
the end of the result set, and a list of chunks iterated over.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
assert int(limit) >= 0
|
limit = int(limit) # Sometimes we are passed a string from somewhere
|
||||||
|
assert limit >= 0
|
||||||
|
|
||||||
|
# There are two modes of fetching events: by stream order or by
|
||||||
|
# topological order. This is determined by whether the from_token is a
|
||||||
|
# stream or topological token. If stream then we can simply do a select
|
||||||
|
# ordered by stream_ordering column. If topological, then we need to
|
||||||
|
# fetch events from one chunk at a time until we hit the limit.
|
||||||
|
|
||||||
# Tokens really represent positions between elements, but we use
|
# Tokens really represent positions between elements, but we use
|
||||||
# the convention of pointing to the event before the gap. Hence
|
# the convention of pointing to the event before the gap. Hence
|
||||||
@@ -725,10 +759,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||||||
bounds += " AND " + filter_clause
|
bounds += " AND " + filter_clause
|
||||||
args.extend(filter_args)
|
args.extend(filter_args)
|
||||||
|
|
||||||
args.append(int(limit))
|
args.append(limit)
|
||||||
|
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT event_id, topological_ordering, stream_ordering"
|
"SELECT event_id, chunk_id, topological_ordering, stream_ordering"
|
||||||
" FROM events"
|
" FROM events"
|
||||||
" WHERE outlier = ? AND room_id = ? AND %(bounds)s"
|
" WHERE outlier = ? AND room_id = ? AND %(bounds)s"
|
||||||
" ORDER BY topological_ordering %(order)s,"
|
" ORDER BY topological_ordering %(order)s,"
|
||||||
@@ -740,9 +774,65 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||||||
|
|
||||||
txn.execute(sql, args)
|
txn.execute(sql, args)
|
||||||
|
|
||||||
rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
|
rows = [_EventDictReturn(*row) for row in txn]
|
||||||
|
|
||||||
|
# If we are paginating topologically and we haven't hit the limit on
|
||||||
|
# number of events then we need to fetch events from the previous or
|
||||||
|
# next chunk.
|
||||||
|
|
||||||
|
iterated_chunks = []
|
||||||
|
|
||||||
|
chunk_id = None
|
||||||
|
if from_token.chunk: # FIXME: may be topological but no chunk.
|
||||||
|
if rows:
|
||||||
|
chunk_id = rows[-1].chunk_id
|
||||||
|
iterated_chunks = [r.chunk_id for r in rows]
|
||||||
|
else:
|
||||||
|
chunk_id = from_token.chunk
|
||||||
|
iterated_chunks = [chunk_id]
|
||||||
|
|
||||||
|
table = ChunkDBOrderedListStore(
|
||||||
|
txn, room_id, self.clock, self.database_engine,
|
||||||
|
)
|
||||||
|
|
||||||
|
if filter_clause:
|
||||||
|
filter_clause = "AND " + filter_clause
|
||||||
|
|
||||||
|
sql = (
|
||||||
|
"SELECT event_id, chunk_id, topological_ordering, stream_ordering"
|
||||||
|
" FROM events"
|
||||||
|
" WHERE outlier = ? AND room_id = ? %(filter_clause)s"
|
||||||
|
" ORDER BY topological_ordering %(order)s,"
|
||||||
|
" stream_ordering %(order)s LIMIT ?"
|
||||||
|
) % {
|
||||||
|
"filter_clause": filter_clause,
|
||||||
|
"order": order,
|
||||||
|
}
|
||||||
|
|
||||||
|
args = [False, room_id] + filter_args + [limit]
|
||||||
|
|
||||||
|
while chunk_id and (limit <= 0 or len(rows) < limit):
|
||||||
|
if chunk_id not in iterated_chunks:
|
||||||
|
iterated_chunks.append(chunk_id)
|
||||||
|
|
||||||
|
if direction == 'b':
|
||||||
|
chunk_id = table.get_prev(chunk_id)
|
||||||
|
else:
|
||||||
|
chunk_id = table.get_next(chunk_id)
|
||||||
|
|
||||||
|
if chunk_id is None:
|
||||||
|
break
|
||||||
|
|
||||||
|
txn.execute(sql, args)
|
||||||
|
new_rows = [_EventDictReturn(*row) for row in txn]
|
||||||
|
|
||||||
|
rows.extend(new_rows)
|
||||||
|
|
||||||
|
# We may have inserted more rows than necessary in the loop above
|
||||||
|
rows = rows[:limit]
|
||||||
|
|
||||||
if rows:
|
if rows:
|
||||||
|
chunk = rows[-1].chunk_id
|
||||||
topo = rows[-1].topological_ordering
|
topo = rows[-1].topological_ordering
|
||||||
toke = rows[-1].stream_ordering
|
toke = rows[-1].stream_ordering
|
||||||
if direction == 'b':
|
if direction == 'b':
|
||||||
@@ -752,12 +842,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||||||
# when we are going backwards so we subtract one from the
|
# when we are going backwards so we subtract one from the
|
||||||
# stream part.
|
# stream part.
|
||||||
toke -= 1
|
toke -= 1
|
||||||
next_token = RoomStreamToken(topo, toke)
|
next_token = RoomStreamToken(chunk, topo, toke)
|
||||||
else:
|
else:
|
||||||
# TODO (erikj): We should work out what to do here instead.
|
# TODO (erikj): We should work out what to do here instead.
|
||||||
next_token = to_token if to_token else from_token
|
next_token = to_token if to_token else from_token
|
||||||
|
|
||||||
return rows, str(next_token),
|
return rows, str(next_token), iterated_chunks,
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def paginate_room_events(self, room_id, from_key, to_key=None,
|
def paginate_room_events(self, room_id, from_key, to_key=None,
|
||||||
@@ -777,18 +867,43 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||||||
those that match the filter.
|
those that match the filter.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
tuple[list[dict], str]: Returns the results as a list of dicts and
|
tuple[list[dict], str, list[str]]: Returns the results as a list of
|
||||||
a token that points to the end of the result set. The dicts have
|
dicts, a token that points to the end of the result set, and a list
|
||||||
the keys "event_id", "topological_ordering" and "stream_orderign".
|
of backwards extremities. The dicts have the keys "event_id",
|
||||||
|
"topological_ordering" and "stream_ordering".
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from_key = RoomStreamToken.parse(from_key)
|
from_key = RoomStreamToken.parse(from_key)
|
||||||
if to_key:
|
if to_key:
|
||||||
to_key = RoomStreamToken.parse(to_key)
|
to_key = RoomStreamToken.parse(to_key)
|
||||||
|
|
||||||
rows, token = yield self.runInteraction(
|
def _do_paginate_room_events(txn):
|
||||||
"paginate_room_events", self._paginate_room_events_txn,
|
rows, token, chunks = self._paginate_room_events_txn(
|
||||||
room_id, from_key, to_key, direction, limit, event_filter,
|
txn, room_id, from_key, to_key, direction, limit, event_filter,
|
||||||
|
)
|
||||||
|
|
||||||
|
# We now fetch the extremities by fetching the extremities for
|
||||||
|
# each chunk we iterated over.
|
||||||
|
extremities = []
|
||||||
|
seen = set()
|
||||||
|
for chunk_id in chunks:
|
||||||
|
if chunk_id in seen:
|
||||||
|
continue
|
||||||
|
seen.add(chunk_id)
|
||||||
|
|
||||||
|
event_ids = self._simple_select_onecol_txn(
|
||||||
|
txn,
|
||||||
|
table="chunk_backwards_extremities",
|
||||||
|
keyvalues={"chunk_id": chunk_id},
|
||||||
|
retcol="event_id"
|
||||||
|
)
|
||||||
|
|
||||||
|
extremities.extend(e for e in event_ids if e not in extremities)
|
||||||
|
|
||||||
|
return rows, token, extremities
|
||||||
|
|
||||||
|
rows, token, extremities = yield self.runInteraction(
|
||||||
|
"paginate_room_events", _do_paginate_room_events,
|
||||||
)
|
)
|
||||||
|
|
||||||
events = yield self._get_events(
|
events = yield self._get_events(
|
||||||
@@ -798,7 +913,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||||||
|
|
||||||
self._set_before_and_after(events, rows)
|
self._set_before_and_after(events, rows)
|
||||||
|
|
||||||
defer.returnValue((events, token))
|
defer.returnValue((events, token, extremities))
|
||||||
|
|
||||||
|
|
||||||
class StreamStore(StreamWorkerStore):
|
class StreamStore(StreamWorkerStore):
|
||||||
|
|||||||
@@ -306,7 +306,7 @@ StreamToken.START = StreamToken(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
|
class RoomStreamToken(namedtuple("_StreamToken", ("chunk", "topological", "stream"))):
|
||||||
"""Tokens are positions between events. The token "s1" comes after event 1.
|
"""Tokens are positions between events. The token "s1" comes after event 1.
|
||||||
|
|
||||||
s0 s1
|
s0 s1
|
||||||
@@ -319,14 +319,18 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
|
|||||||
When traversing the live event stream events are ordered by when they
|
When traversing the live event stream events are ordered by when they
|
||||||
arrived at the homeserver.
|
arrived at the homeserver.
|
||||||
|
|
||||||
When traversing historic events the events are ordered by their depth in
|
When traversing historic events the events are ordered by the topological
|
||||||
the event graph "topological_ordering" and then by when they arrived at the
|
ordering of the room graph. This is done using event chunks and the
|
||||||
homeserver "stream_ordering".
|
`topological_ordering` column.
|
||||||
|
|
||||||
Live tokens start with an "s" followed by the "stream_ordering" id of the
|
Live tokens start with an 's' and include the stream_ordering of the event
|
||||||
event it comes after. Historic tokens start with a "t" followed by the
|
it comes after. Historic tokens start with a 'c' and include the chunk ID,
|
||||||
"topological_ordering" id of the event it comes after, followed by "-",
|
topological ordering and stream ordering of the event it comes after.
|
||||||
followed by the "stream_ordering" id of the event it comes after.
|
|
||||||
|
(In previous versions, when chunks were not implemented, the historic tokens
|
||||||
|
started with 't' and included the topological and stream ordering. These
|
||||||
|
tokens can be roughly converted to the new format by looking up the chunk
|
||||||
|
and topological ordering of the event with the same stream ordering).
|
||||||
"""
|
"""
|
||||||
__slots__ = []
|
__slots__ = []
|
||||||
|
|
||||||
@@ -334,10 +338,19 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
|
|||||||
def parse(cls, string):
|
def parse(cls, string):
|
||||||
try:
|
try:
|
||||||
if string[0] == 's':
|
if string[0] == 's':
|
||||||
return cls(topological=None, stream=int(string[1:]))
|
return cls(chunk=None, topological=None, stream=int(string[1:]))
|
||||||
if string[0] == 't':
|
if string[0] == 't': # For backwards compat with older tokens.
|
||||||
parts = string[1:].split('-', 1)
|
parts = string[1:].split('-', 1)
|
||||||
return cls(topological=int(parts[0]), stream=int(parts[1]))
|
return cls(chunk=None, topological=int(parts[0]), stream=int(parts[1]))
|
||||||
|
if string[0] == 'c':
|
||||||
|
# We use '~' as both stream ordering and topological ordering
|
||||||
|
# can be negative, so we can't use '-'
|
||||||
|
parts = string[1:].split('~', 2)
|
||||||
|
return cls(
|
||||||
|
chunk=int(parts[0]),
|
||||||
|
topological=int(parts[1]),
|
||||||
|
stream=int(parts[2]),
|
||||||
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
raise SynapseError(400, "Invalid token %r" % (string,))
|
raise SynapseError(400, "Invalid token %r" % (string,))
|
||||||
@@ -346,12 +359,16 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
|
|||||||
def parse_stream_token(cls, string):
|
def parse_stream_token(cls, string):
|
||||||
try:
|
try:
|
||||||
if string[0] == 's':
|
if string[0] == 's':
|
||||||
return cls(topological=None, stream=int(string[1:]))
|
return cls(chunk=None, topological=None, stream=int(string[1:]))
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
raise SynapseError(400, "Invalid token %r" % (string,))
|
raise SynapseError(400, "Invalid token %r" % (string,))
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
|
if self.chunk is not None:
|
||||||
|
# We use '~' as both stream ordering and topological ordering
|
||||||
|
# can be negative, so we can't use '-'
|
||||||
|
return "c%d~%d~%d" % (self.chunk, self.topological, self.stream)
|
||||||
if self.topological is not None:
|
if self.topological is not None:
|
||||||
return "t%d-%d" % (self.topological, self.stream)
|
return "t%d-%d" % (self.topological, self.stream)
|
||||||
else:
|
else:
|
||||||
|
|||||||
337
synapse/util/katriel_bodlaender.py
Normal file
337
synapse/util/katriel_bodlaender.py
Normal file
@@ -0,0 +1,337 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2018 New Vector Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
"""This module contains an implementation of the Katriel-Bodlaender algorithm,
|
||||||
|
which is used to do online topological ordering of graphs.
|
||||||
|
|
||||||
|
Note that the ordering derived from the graph is such that the source node of
|
||||||
|
an edge comes before the target node of the edge, i.e. a graph of A -> B -> C
|
||||||
|
would produce the ordering [A, B, C].
|
||||||
|
|
||||||
|
This ordering is therefore opposite to what one might expect when considering
|
||||||
|
the room DAG, as newer messages would be added to the start rather than the
|
||||||
|
end.
|
||||||
|
|
||||||
|
***The ChunkDBOrderedListStore therefore inverts the direction of edges***
|
||||||
|
|
||||||
|
See:
|
||||||
|
A tight analysis of the Katriel–Bodlaender algorithm for online topological
|
||||||
|
ordering
|
||||||
|
Hsiao-Fei Liua and Kun-Mao Chao
|
||||||
|
https://www.sciencedirect.com/science/article/pii/S0304397507006573
|
||||||
|
and:
|
||||||
|
Online Topological Ordering
|
||||||
|
Irit Katriel and Hans L. Bodlaender
|
||||||
|
http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.78.7933 )
|
||||||
|
"""
|
||||||
|
|
||||||
|
from abc import ABCMeta, abstractmethod
|
||||||
|
|
||||||
|
|
||||||
|
class OrderedListStore(object):
|
||||||
|
"""An abstract base class that is used to store a graph and maintain a
|
||||||
|
topological consistent, total ordering.
|
||||||
|
|
||||||
|
Internally this uses the Katriel-Bodlaender algorithm, which requires the
|
||||||
|
store expose an interface for the total ordering that supports:
|
||||||
|
|
||||||
|
- Insertion of the node into the ordering either immediately before or
|
||||||
|
after another node.
|
||||||
|
- Deletion of the node from the ordering
|
||||||
|
- Comparing the relative ordering of two arbitary nodes
|
||||||
|
- Get the node immediately before or after a given node in the ordering
|
||||||
|
|
||||||
|
It also needs to be able to interact with the graph in the following ways:
|
||||||
|
|
||||||
|
- Query the number of edges from a node in the graph
|
||||||
|
- Query the number of edges into a node in the graph
|
||||||
|
- Add an edge to the graph
|
||||||
|
|
||||||
|
|
||||||
|
Users of subclasses should call `add_node` and `add_edge` whenever editing
|
||||||
|
the graph. The total ordering exposed will remain constant until the next
|
||||||
|
call to one of these methods.
|
||||||
|
|
||||||
|
Note: Calls to `add_node` and `add_edge` cannot overlap, and so callers
|
||||||
|
should perform some form of locking.
|
||||||
|
"""
|
||||||
|
|
||||||
|
__metaclass__ = ABCMeta
|
||||||
|
|
||||||
|
def add_node(self, node_id):
|
||||||
|
"""Adds a node to the graph.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
node_id (str)
|
||||||
|
"""
|
||||||
|
self._insert_before(node_id, None)
|
||||||
|
|
||||||
|
def add_edge(self, source, target):
|
||||||
|
"""Adds a new edge to the graph and updates the ordering.
|
||||||
|
|
||||||
|
See module level docs.
|
||||||
|
|
||||||
|
Note that both the source and target nodes must have been inserted into
|
||||||
|
the store (at an arbitrary position) already.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
source (str): The source node of the new edge
|
||||||
|
target (str): The target node of the new edge
|
||||||
|
"""
|
||||||
|
|
||||||
|
# The following is the Katriel-Bodlaender algorithm.
|
||||||
|
|
||||||
|
to_s = []
|
||||||
|
from_t = []
|
||||||
|
to_s_neighbours = []
|
||||||
|
from_t_neighbours = []
|
||||||
|
to_s_indegree = 0
|
||||||
|
from_t_outdegree = 0
|
||||||
|
s = source
|
||||||
|
t = target
|
||||||
|
|
||||||
|
while s and t and not self.is_before(s, t):
|
||||||
|
m_s = to_s_indegree
|
||||||
|
m_t = from_t_outdegree
|
||||||
|
|
||||||
|
# These functions return a tuple where the first term is a float
|
||||||
|
# that can be used to order the the list of neighbours.
|
||||||
|
# These are valid until the next write
|
||||||
|
pe_s = self.get_nodes_with_edges_to(s)
|
||||||
|
fe_t = self.get_nodes_with_edges_from(t)
|
||||||
|
|
||||||
|
l_s = len(pe_s)
|
||||||
|
l_t = len(fe_t)
|
||||||
|
|
||||||
|
if m_s + l_s <= m_t + l_t:
|
||||||
|
to_s.append(s)
|
||||||
|
to_s_neighbours.extend(pe_s)
|
||||||
|
to_s_indegree += l_s
|
||||||
|
|
||||||
|
if to_s_neighbours:
|
||||||
|
to_s_neighbours.sort()
|
||||||
|
_, s = to_s_neighbours.pop()
|
||||||
|
else:
|
||||||
|
s = None
|
||||||
|
|
||||||
|
if m_s + l_s >= m_t + l_t:
|
||||||
|
from_t.append(t)
|
||||||
|
from_t_neighbours.extend(fe_t)
|
||||||
|
from_t_outdegree += l_t
|
||||||
|
|
||||||
|
if from_t_neighbours:
|
||||||
|
from_t_neighbours.sort(reverse=True)
|
||||||
|
_, t = from_t_neighbours.pop()
|
||||||
|
else:
|
||||||
|
t = None
|
||||||
|
|
||||||
|
if s is None:
|
||||||
|
s = self.get_prev(target)
|
||||||
|
|
||||||
|
if t is None:
|
||||||
|
t = self.get_next(source)
|
||||||
|
|
||||||
|
while to_s:
|
||||||
|
s1 = to_s.pop()
|
||||||
|
self._delete_ordering(s1)
|
||||||
|
self._insert_after(s1, s)
|
||||||
|
s = s1
|
||||||
|
|
||||||
|
while from_t:
|
||||||
|
t1 = from_t.pop()
|
||||||
|
self._delete_ordering(t1)
|
||||||
|
self._insert_before(t1, t)
|
||||||
|
t = t1
|
||||||
|
|
||||||
|
self._add_edge_to_graph(source, target)
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def is_before(self, first_node, second_node):
|
||||||
|
"""Returns whether the first node is before the second node.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
first_node (str)
|
||||||
|
second_node (str)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if first_node is before second_node
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def get_prev(self, node_id):
|
||||||
|
"""Gets the node immediately before the given node in the topological
|
||||||
|
ordering.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
node_id (str)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str|None: A node ID or None if no preceding node exists
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def get_next(self, node_id):
|
||||||
|
"""Gets the node immediately after the given node in the topological
|
||||||
|
ordering.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
node_id (str)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str|None: A node ID or None if no proceding node exists
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def get_nodes_with_edges_to(self, node_id):
|
||||||
|
"""Get all nodes with edges to the given node
|
||||||
|
|
||||||
|
Args:
|
||||||
|
node_id (str)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list[tuple[float, str]]: Returns a list of tuple of an ordering
|
||||||
|
term and the node ID. The ordering term can be used to sort the
|
||||||
|
returned list.
|
||||||
|
The ordering is valid until subsequent calls to `add_edge`
|
||||||
|
functions
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def get_nodes_with_edges_from(self, node_id):
|
||||||
|
"""Get all nodes with edges from the given node
|
||||||
|
|
||||||
|
Args:
|
||||||
|
node_id (str)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list[tuple[float, str]]: Returns a list of tuple of an ordering
|
||||||
|
term and the node ID. The ordering term can be used to sort the
|
||||||
|
returned list.
|
||||||
|
The ordering is valid until subsequent calls to `add_edge`
|
||||||
|
functions
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _insert_before(self, node_id, target_id):
|
||||||
|
"""Inserts node immediately before target node.
|
||||||
|
|
||||||
|
If target_id is None then the node is inserted at the end of the list
|
||||||
|
|
||||||
|
Args:
|
||||||
|
node_id (str)
|
||||||
|
target_id (str|None)
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _insert_after(self, node_id, target_id):
|
||||||
|
"""Inserts node immediately after target node.
|
||||||
|
|
||||||
|
If target_id is None then the node is inserted at the start of the list
|
||||||
|
|
||||||
|
Args:
|
||||||
|
node_id (str)
|
||||||
|
target_id (str|None)
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _delete_ordering(self, node_id):
|
||||||
|
"""Deletes the given node from the ordered list (but not the graph).
|
||||||
|
|
||||||
|
Used when we want to reinsert it into a different position
|
||||||
|
|
||||||
|
Args:
|
||||||
|
node_id (str)
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _add_edge_to_graph(self, source_id, target_id):
|
||||||
|
"""Adds an edge to the graph from source to target.
|
||||||
|
|
||||||
|
Does not update ordering.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
source_id (str)
|
||||||
|
target_id (str)
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class InMemoryOrderedListStore(OrderedListStore):
|
||||||
|
"""An in memory OrderedListStore
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
# The ordered list of nodes
|
||||||
|
self.list = []
|
||||||
|
|
||||||
|
# Map from node to set of nodes that it references
|
||||||
|
self.edges_from = {}
|
||||||
|
|
||||||
|
# Map from node to set of nodes that it is referenced by
|
||||||
|
self.edges_to = {}
|
||||||
|
|
||||||
|
def is_before(self, first_node, second_node):
|
||||||
|
return self.list.index(first_node) < self.list.index(second_node)
|
||||||
|
|
||||||
|
def get_prev(self, node_id):
|
||||||
|
idx = self.list.index(node_id) - 1
|
||||||
|
if idx >= 0:
|
||||||
|
return self.list[idx]
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_next(self, node_id):
|
||||||
|
idx = self.list.index(node_id) + 1
|
||||||
|
if idx < len(self.list):
|
||||||
|
return self.list[idx]
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _insert_before(self, node_id, target_id):
|
||||||
|
if target_id is not None:
|
||||||
|
idx = self.list.index(target_id)
|
||||||
|
self.list.insert(idx, node_id)
|
||||||
|
else:
|
||||||
|
self.list.append(node_id)
|
||||||
|
|
||||||
|
def _insert_after(self, node_id, target_id):
|
||||||
|
if target_id is not None:
|
||||||
|
idx = self.list.index(target_id) + 1
|
||||||
|
self.list.insert(idx, node_id)
|
||||||
|
else:
|
||||||
|
self.list.insert(0, node_id)
|
||||||
|
|
||||||
|
def _delete_ordering(self, node_id):
|
||||||
|
self.list.remove(node_id)
|
||||||
|
|
||||||
|
def get_nodes_with_edges_to(self, node_id):
|
||||||
|
to_nodes = self.edges_to.get(node_id, [])
|
||||||
|
return [(self.list.index(nid), nid) for nid in to_nodes]
|
||||||
|
|
||||||
|
def get_nodes_with_edges_from(self, node_id):
|
||||||
|
from_nodes = self.edges_from.get(node_id, [])
|
||||||
|
return [(self.list.index(nid), nid) for nid in from_nodes]
|
||||||
|
|
||||||
|
def _add_edge_to_graph(self, source_id, target_id):
|
||||||
|
self.edges_from.setdefault(source_id, set()).add(target_id)
|
||||||
|
self.edges_to.setdefault(target_id, set()).add(source_id)
|
||||||
302
tests/storage/test_chunk_linearizer_table.py
Normal file
302
tests/storage/test_chunk_linearizer_table.py
Normal file
@@ -0,0 +1,302 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2018 New Vector Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from twisted.internet import defer
|
||||||
|
|
||||||
|
import itertools
|
||||||
|
import random
|
||||||
|
import tests.unittest
|
||||||
|
import tests.utils
|
||||||
|
|
||||||
|
from fractions import Fraction
|
||||||
|
|
||||||
|
from synapse.storage.chunk_ordered_table import (
|
||||||
|
ChunkDBOrderedListStore, find_farey_terms, get_fraction_in_range,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
|
||||||
|
"""Tests to ensure that the ordering and rebalancing functions of
|
||||||
|
ChunkDBOrderedListStore work as expected.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(ChunkLinearizerStoreTestCase, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def setUp(self):
|
||||||
|
hs = yield tests.utils.setup_test_homeserver()
|
||||||
|
self.store = hs.get_datastore()
|
||||||
|
self.clock = hs.get_clock()
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_simple_insert_fetch(self):
|
||||||
|
room_id = "foo_room1"
|
||||||
|
|
||||||
|
def test_txn(txn):
|
||||||
|
table = ChunkDBOrderedListStore(
|
||||||
|
txn, room_id, self.clock,
|
||||||
|
self.store.database_engine,
|
||||||
|
5, 100,
|
||||||
|
)
|
||||||
|
|
||||||
|
table.add_node("A")
|
||||||
|
table._insert_after("B", "A")
|
||||||
|
table._insert_before("C", "A")
|
||||||
|
table._insert_after("D", "A")
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
SELECT chunk_id, numerator, denominator FROM chunk_linearized
|
||||||
|
WHERE room_id = ?
|
||||||
|
"""
|
||||||
|
txn.execute(sql, (room_id,))
|
||||||
|
|
||||||
|
ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
|
||||||
|
ordered = [c for _, c in ordered]
|
||||||
|
|
||||||
|
self.assertEqual(["C", "A", "D", "B"], ordered)
|
||||||
|
|
||||||
|
yield self.store.runInteraction("test", test_txn)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_many_insert_fetch(self):
|
||||||
|
room_id = "foo_room2"
|
||||||
|
|
||||||
|
def test_txn(txn):
|
||||||
|
table = ChunkDBOrderedListStore(
|
||||||
|
txn, room_id, self.clock,
|
||||||
|
self.store.database_engine,
|
||||||
|
5, 100,
|
||||||
|
)
|
||||||
|
|
||||||
|
nodes = [(i, "node_%d" % (i,)) for i in xrange(1, 1000)]
|
||||||
|
expected = [n for _, n in nodes]
|
||||||
|
|
||||||
|
already_inserted = []
|
||||||
|
|
||||||
|
random.shuffle(nodes)
|
||||||
|
while nodes:
|
||||||
|
i, node_id = nodes.pop()
|
||||||
|
if not already_inserted:
|
||||||
|
table.add_node(node_id)
|
||||||
|
else:
|
||||||
|
for j, target_id in already_inserted:
|
||||||
|
if j > i:
|
||||||
|
break
|
||||||
|
|
||||||
|
if j < i:
|
||||||
|
table._insert_after(node_id, target_id)
|
||||||
|
else:
|
||||||
|
table._insert_before(node_id, target_id)
|
||||||
|
|
||||||
|
already_inserted.append((i, node_id))
|
||||||
|
already_inserted.sort()
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
SELECT chunk_id, numerator, denominator FROM chunk_linearized
|
||||||
|
WHERE room_id = ?
|
||||||
|
"""
|
||||||
|
txn.execute(sql, (room_id,))
|
||||||
|
|
||||||
|
ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
|
||||||
|
ordered = [c for _, c in ordered]
|
||||||
|
|
||||||
|
self.assertEqual(expected, ordered)
|
||||||
|
|
||||||
|
yield self.store.runInteraction("test", test_txn)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_prepend_and_append(self):
|
||||||
|
room_id = "foo_room3"
|
||||||
|
|
||||||
|
def test_txn(txn):
|
||||||
|
table = ChunkDBOrderedListStore(
|
||||||
|
txn, room_id, self.clock,
|
||||||
|
self.store.database_engine,
|
||||||
|
5, 1000,
|
||||||
|
)
|
||||||
|
|
||||||
|
table.add_node("a")
|
||||||
|
|
||||||
|
expected = ["a"]
|
||||||
|
|
||||||
|
for i in xrange(1, 1000):
|
||||||
|
node_id = "node_id_before_%d" % i
|
||||||
|
table._insert_before(node_id, expected[0])
|
||||||
|
expected.insert(0, node_id)
|
||||||
|
|
||||||
|
for i in xrange(1, 1000):
|
||||||
|
node_id = "node_id_after_%d" % i
|
||||||
|
table._insert_after(node_id, expected[-1])
|
||||||
|
expected.append(node_id)
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
SELECT chunk_id, numerator, denominator FROM chunk_linearized
|
||||||
|
WHERE room_id = ?
|
||||||
|
"""
|
||||||
|
txn.execute(sql, (room_id,))
|
||||||
|
|
||||||
|
ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
|
||||||
|
ordered = [c for _, c in ordered]
|
||||||
|
|
||||||
|
self.assertEqual(expected, ordered)
|
||||||
|
|
||||||
|
yield self.store.runInteraction("test", test_txn)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_worst_case(self):
|
||||||
|
room_id = "foo_room3"
|
||||||
|
|
||||||
|
def test_txn(txn):
|
||||||
|
table = ChunkDBOrderedListStore(
|
||||||
|
txn, room_id, self.clock,
|
||||||
|
self.store.database_engine,
|
||||||
|
5, 100,
|
||||||
|
)
|
||||||
|
|
||||||
|
table.add_node("a")
|
||||||
|
|
||||||
|
prev_node = "a"
|
||||||
|
|
||||||
|
expected_prefix = ["a"]
|
||||||
|
expected_suffix = []
|
||||||
|
|
||||||
|
for i in xrange(1, 100):
|
||||||
|
node_id = "node_id_%d" % i
|
||||||
|
if i % 2 == 0:
|
||||||
|
table._insert_before(node_id, prev_node)
|
||||||
|
expected_prefix.append(node_id)
|
||||||
|
else:
|
||||||
|
table._insert_after(node_id, prev_node)
|
||||||
|
expected_suffix.append(node_id)
|
||||||
|
prev_node = node_id
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
SELECT chunk_id, numerator, denominator FROM chunk_linearized
|
||||||
|
WHERE room_id = ?
|
||||||
|
"""
|
||||||
|
txn.execute(sql, (room_id,))
|
||||||
|
|
||||||
|
ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
|
||||||
|
ordered = [c for _, c in ordered]
|
||||||
|
|
||||||
|
expected = expected_prefix + list(reversed(expected_suffix))
|
||||||
|
|
||||||
|
self.assertEqual(expected, ordered)
|
||||||
|
|
||||||
|
yield self.store.runInteraction("test", test_txn)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_get_edges_to(self):
|
||||||
|
room_id = "foo_room4"
|
||||||
|
|
||||||
|
def test_txn(txn):
|
||||||
|
table = ChunkDBOrderedListStore(
|
||||||
|
txn, room_id, self.clock,
|
||||||
|
self.store.database_engine,
|
||||||
|
5, 100,
|
||||||
|
)
|
||||||
|
|
||||||
|
table.add_node("A")
|
||||||
|
table._insert_after("B", "A")
|
||||||
|
table._add_edge_to_graph("A", "B")
|
||||||
|
table._insert_before("C", "A")
|
||||||
|
table._add_edge_to_graph("C", "A")
|
||||||
|
|
||||||
|
nodes = table.get_nodes_with_edges_from("A")
|
||||||
|
self.assertEqual([n for _, n in nodes], ["B"])
|
||||||
|
|
||||||
|
nodes = table.get_nodes_with_edges_to("A")
|
||||||
|
self.assertEqual([n for _, n in nodes], ["C"])
|
||||||
|
|
||||||
|
yield self.store.runInteraction("test", test_txn)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_get_next_and_prev(self):
|
||||||
|
room_id = "foo_room5"
|
||||||
|
|
||||||
|
def test_txn(txn):
|
||||||
|
table = ChunkDBOrderedListStore(
|
||||||
|
txn, room_id, self.clock,
|
||||||
|
self.store.database_engine,
|
||||||
|
5, 100,
|
||||||
|
)
|
||||||
|
|
||||||
|
table.add_node("A")
|
||||||
|
table._insert_after("B", "A")
|
||||||
|
table._insert_before("C", "A")
|
||||||
|
|
||||||
|
self.assertEqual(table.get_next("A"), "B")
|
||||||
|
self.assertEqual(table.get_prev("A"), "C")
|
||||||
|
|
||||||
|
yield self.store.runInteraction("test", test_txn)
|
||||||
|
|
||||||
|
def test_find_farey_terms(self):
|
||||||
|
def _test(min_frac, max_denom):
|
||||||
|
""""Calls `find_farey_terms` with given values and checks they
|
||||||
|
are neighbours in the Farey Sequence.
|
||||||
|
"""
|
||||||
|
|
||||||
|
a, b, c, d = find_farey_terms(min_frac, max_denom)
|
||||||
|
|
||||||
|
p = Fraction(a, b)
|
||||||
|
q = Fraction(c, d)
|
||||||
|
|
||||||
|
assert min_frac < p < q
|
||||||
|
|
||||||
|
for x, y in _pairwise(_farey_generator(max_denom)):
|
||||||
|
if min_frac < x < y:
|
||||||
|
self.assertEqual(x, p)
|
||||||
|
self.assertEqual(y, q)
|
||||||
|
break
|
||||||
|
|
||||||
|
_test(Fraction(5, 3), 12)
|
||||||
|
_test(Fraction(1, 3), 12)
|
||||||
|
_test(Fraction(1, 2), 9)
|
||||||
|
_test(Fraction(1, 2), 10)
|
||||||
|
_test(Fraction(1, 2), 15)
|
||||||
|
|
||||||
|
def test_get_fraction_in_range(self):
|
||||||
|
def _test(x, y):
|
||||||
|
assert x < get_fraction_in_range(x, y) < y
|
||||||
|
|
||||||
|
_test(Fraction(1, 2), Fraction(2, 3))
|
||||||
|
_test(Fraction(1, 2), Fraction(3, 2))
|
||||||
|
_test(Fraction(5, 203), Fraction(6, 204))
|
||||||
|
|
||||||
|
|
||||||
|
def _farey_generator(n):
|
||||||
|
"""Generates Farey sequence of order `n`.
|
||||||
|
|
||||||
|
Note that this doesn't terminate.
|
||||||
|
|
||||||
|
Taken from https://en.wikipedia.org/wiki/Farey_sequence#Next_term
|
||||||
|
"""
|
||||||
|
|
||||||
|
a, b, c, d = 0, 1, 1, n
|
||||||
|
|
||||||
|
yield Fraction(a, b)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
k = int((n + b) / d)
|
||||||
|
a, b, c, d = c, d, (k * c - a), (k * d - b)
|
||||||
|
yield Fraction(a, b)
|
||||||
|
|
||||||
|
|
||||||
|
def _pairwise(iterable):
|
||||||
|
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
|
||||||
|
a, b = itertools.tee(iterable)
|
||||||
|
next(b, None)
|
||||||
|
return itertools.izip(a, b)
|
||||||
84
tests/util/test_katriel_bodlaender.py
Normal file
84
tests/util/test_katriel_bodlaender.py
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2018 New Vector Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from synapse.util.katriel_bodlaender import InMemoryOrderedListStore
|
||||||
|
|
||||||
|
from tests import unittest
|
||||||
|
|
||||||
|
|
||||||
|
class KatrielBodlaenderTests(unittest.TestCase):
|
||||||
|
def test_simple_graph(self):
|
||||||
|
store = InMemoryOrderedListStore()
|
||||||
|
|
||||||
|
nodes = [
|
||||||
|
"node_1",
|
||||||
|
"node_2",
|
||||||
|
"node_3",
|
||||||
|
"node_4",
|
||||||
|
]
|
||||||
|
|
||||||
|
for node in nodes:
|
||||||
|
store.add_node(node)
|
||||||
|
|
||||||
|
store.add_edge("node_2", "node_3")
|
||||||
|
store.add_edge("node_1", "node_2")
|
||||||
|
store.add_edge("node_3", "node_4")
|
||||||
|
|
||||||
|
self.assertEqual(nodes, store.list)
|
||||||
|
|
||||||
|
def test_reverse_graph(self):
|
||||||
|
store = InMemoryOrderedListStore()
|
||||||
|
|
||||||
|
nodes = [
|
||||||
|
"node_1",
|
||||||
|
"node_2",
|
||||||
|
"node_3",
|
||||||
|
"node_4",
|
||||||
|
]
|
||||||
|
|
||||||
|
for node in nodes:
|
||||||
|
store.add_node(node)
|
||||||
|
|
||||||
|
store.add_edge("node_3", "node_2")
|
||||||
|
store.add_edge("node_2", "node_1")
|
||||||
|
store.add_edge("node_4", "node_3")
|
||||||
|
|
||||||
|
self.assertEqual(list(reversed(nodes)), store.list)
|
||||||
|
|
||||||
|
def test_divergent_graph(self):
|
||||||
|
store = InMemoryOrderedListStore()
|
||||||
|
|
||||||
|
nodes = [
|
||||||
|
"node_1",
|
||||||
|
"node_2",
|
||||||
|
"node_3",
|
||||||
|
"node_4",
|
||||||
|
"node_5",
|
||||||
|
"node_6",
|
||||||
|
]
|
||||||
|
|
||||||
|
for node in reversed(nodes):
|
||||||
|
store.add_node(node)
|
||||||
|
|
||||||
|
store.add_edge("node_2", "node_3")
|
||||||
|
store.add_edge("node_2", "node_5")
|
||||||
|
store.add_edge("node_1", "node_2")
|
||||||
|
store.add_edge("node_3", "node_4")
|
||||||
|
store.add_edge("node_1", "node_3")
|
||||||
|
store.add_edge("node_4", "node_5")
|
||||||
|
store.add_edge("node_5", "node_6")
|
||||||
|
store.add_edge("node_4", "node_6")
|
||||||
|
|
||||||
|
self.assertEqual(nodes, store.list)
|
||||||
Reference in New Issue
Block a user