Compare commits

...

7 Commits

Author SHA1 Message Date
Olivier Wilkinson (reivilibre)
e79b3aed87 I mustn't forget my morning corn flake(8)s
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 14:40:50 +01:00
Olivier Wilkinson (reivilibre)
279e63aea2 Collect old current stats rows when updating stats with deltas
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 14:36:14 +01:00
Olivier Wilkinson (reivilibre)
e4cbea6c46 Handle state deltas and turn them into stats deltas
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 14:24:35 +01:00
Olivier Wilkinson (reivilibre)
80a1c6e9e5 Add storage function for storing stats deltas
Old collection is not included in this commit

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 14:08:02 +01:00
Olivier Wilkinson (reivilibre)
d7675e79e1 Add schema for Separated Statistics
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 13:45:05 +01:00
reivilibre
8de9ebe35d Tear out current room & user statistics (#5880)
* Tear out current room & user statistics.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>

* Black is back with more linting complaints

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 13:22:07 +01:00
Olivier Wilkinson (reivilibre)
8374bcb0a8 Newsfile
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-19 15:25:53 +01:00
6 changed files with 682 additions and 751 deletions

1
changelog.d/5879.misc Normal file
View File

@@ -0,0 +1 @@
Rework room and user statistics to separate current & historical rows, as well as track stats correctly.

View File

@@ -49,9 +49,6 @@ class StatsHandler(StateDeltasHandler):
# The current position in the current_state_delta stream
self.pos = None
# Guard to ensure we only process deltas one at a time
self._is_processing = False
if hs.config.stats_enabled:
self.notifier.add_replication_callback(self.notify_new_event)
@@ -65,43 +62,55 @@ class StatsHandler(StateDeltasHandler):
if not self.hs.config.stats_enabled:
return
if self._is_processing:
return
lock = self.store.stats_delta_processing_lock
@defer.inlineCallbacks
def process():
try:
yield self._unsafe_process()
finally:
self._is_processing = False
lock.release()
self._is_processing = True
run_as_background_process("stats.notify_new_event", process)
if lock.acquire(blocking=False):
# we only want to run this process one-at-a-time,
# and also, if the initial background updater wants us to keep out,
# we should respect that.
try:
run_as_background_process("stats.notify_new_event", process)
except: # noqa: E722 re-raised so fine
lock.release()
raise
@defer.inlineCallbacks
def _unsafe_process(self):
# If self.pos is None then means we haven't fetched it from DB
if self.pos is None:
self.pos = yield self.store.get_stats_stream_pos()
if self.pos is None or None in self.pos.values():
self.pos = yield self.store.get_stats_positions()
# If still None then the initial background update hasn't happened yet
if self.pos is None:
# If still None then the initial background update hasn't started yet
if self.pos is None or None in self.pos.values():
return None
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "stats_delta"):
deltas = yield self.store.get_current_state_deltas(self.pos)
with Measure(self.clock, "stats_delta"):
while True:
deltas = yield self.store.get_current_state_deltas(
self.pos["state_delta_stream_id"]
)
if not deltas:
return
break
logger.info("Handling %d state deltas", len(deltas))
logger.debug("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
self.pos = deltas[-1]["stream_id"]
yield self.store.update_stats_stream_pos(self.pos)
self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
event_processing_positions.labels("stats").set(self.pos)
event_processing_positions.labels("stats").set(
self.pos["state_delta_stream_id"]
)
if self.pos is not None:
yield self.store.update_stats_positions(self.pos)
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
@@ -119,7 +128,7 @@ class StatsHandler(StateDeltasHandler):
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
token = yield self.store.get_earliest_token_for_room_stats(room_id)
token = yield self.store.get_earliest_token_for_stats("room", room_id)
# If the earliest token to begin from is larger than our current
# stream ID, skip processing this delta.
@@ -144,44 +153,56 @@ class StatsHandler(StateDeltasHandler):
# We use stream_pos here rather than fetch by event_id as event_id
# may be None
now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
now = int(now) // 1000
# quantise time to the nearest bucket
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
room_stats_delta = {}
room_stats_complete = False
if prev_event_id is None:
# this state event doesn't overwrite another,
# so it is a new effective/current state event
room_stats_delta["current_state_events"] = (
room_stats_delta.get("current_state_events", 0) + 1
)
if typ == EventTypes.Member:
# we could use _get_key_change here but it's a bit inefficient
# given we're not testing for a specific result; might as well
# just grab the prev_membership and membership strings and
# compare them.
prev_event_content = {}
# We take None rather than leave as a previous membership
# in the absence of a previous event because we do not want to
# reduce the leave count when a new-to-the-room user joins.
prev_membership = None
if prev_event_id is not None:
prev_event = yield self.store.get_event(
prev_event_id, allow_none=True
)
if prev_event:
prev_event_content = prev_event.content
prev_membership = prev_event_content.get(
"membership", Membership.LEAVE
)
membership = event_content.get("membership", Membership.LEAVE)
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
if prev_membership == membership:
continue
if prev_membership == Membership.JOIN:
yield self.store.update_stats_delta(
now, "room", room_id, "joined_members", -1
if prev_membership is None:
logger.debug("No previous membership for this user.")
elif prev_membership == Membership.JOIN:
room_stats_delta["joined_members"] = (
room_stats_delta.get("joined_members", 0) - 1
)
elif prev_membership == Membership.INVITE:
yield self.store.update_stats_delta(
now, "room", room_id, "invited_members", -1
room_stats_delta["invited_members"] = (
room_stats_delta.get("invited_members", 0) - 1
)
elif prev_membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now, "room", room_id, "left_members", -1
room_stats_delta["left_members"] = (
room_stats_delta.get("left_members", 0) - 1
)
elif prev_membership == Membership.BAN:
yield self.store.update_stats_delta(
now, "room", room_id, "banned_members", -1
room_stats_delta["banned_members"] = (
room_stats_delta.get("banned_members", 0) - 1
)
else:
err = "%s is not a valid prev_membership" % (repr(prev_membership),)
@@ -189,20 +210,20 @@ class StatsHandler(StateDeltasHandler):
raise ValueError(err)
if membership == Membership.JOIN:
yield self.store.update_stats_delta(
now, "room", room_id, "joined_members", +1
room_stats_delta["joined_members"] = (
room_stats_delta.get("joined_members", 0) + 1
)
elif membership == Membership.INVITE:
yield self.store.update_stats_delta(
now, "room", room_id, "invited_members", +1
room_stats_delta["invited_members"] = (
room_stats_delta.get("invited_members", 0) + 1
)
elif membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now, "room", room_id, "left_members", +1
room_stats_delta["left_members"] = (
room_stats_delta.get("left_members", 0) + 1
)
elif membership == Membership.BAN:
yield self.store.update_stats_delta(
now, "room", room_id, "banned_members", +1
room_stats_delta["banned_members"] = (
room_stats_delta.get("banned_members", 0) + 1
)
else:
err = "%s is not a valid membership" % (repr(membership),)
@@ -210,26 +231,19 @@ class StatsHandler(StateDeltasHandler):
raise ValueError(err)
user_id = state_key
if self.is_mine_id(user_id):
if self.is_mine_id(user_id) and membership in (
Membership.JOIN,
Membership.LEAVE,
):
# update user_stats as it's one of our users
public = yield self._is_public_room(room_id)
if membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now,
"user",
user_id,
"public_rooms" if public else "private_rooms",
-1,
)
elif membership == Membership.JOIN:
yield self.store.update_stats_delta(
now,
"user",
user_id,
"public_rooms" if public else "private_rooms",
+1,
)
field = "public_rooms" if public else "private_rooms"
delta = +1 if membership == Membership.JOIN else -1
yield self.store.update_stats_delta(
now, "user", user_id, {field: delta}
)
elif typ == EventTypes.Create:
# Newly created room. Add it with all blank portions.
@@ -246,28 +260,46 @@ class StatsHandler(StateDeltasHandler):
},
)
room_stats_complete = True
elif typ == EventTypes.JoinRules:
old_room_state = yield self.store.get_room_state(room_id)
yield self.store.update_room_state(
room_id, {"join_rules": event_content.get("join_rule")}
)
is_public = yield self._get_key_change(
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
# whether the room would be public anyway,
# because of history_visibility
other_field_gives_publicity = (
old_room_state["history_visibility"] == "world_readable"
)
if is_public is not None:
yield self.update_public_room_stats(now, room_id, is_public)
if not other_field_gives_publicity:
is_public = yield self._get_key_change(
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
)
if is_public is not None:
yield self.update_public_room_stats(now, room_id, is_public)
elif typ == EventTypes.RoomHistoryVisibility:
old_room_state = yield self.store.get_room_state(room_id)
yield self.store.update_room_state(
room_id,
{"history_visibility": event_content.get("history_visibility")},
)
is_public = yield self._get_key_change(
prev_event_id, event_id, "history_visibility", "world_readable"
# whether the room would be public anyway,
# because of join_rule
other_field_gives_publicity = (
old_room_state["join_rules"] == JoinRules.PUBLIC
)
if is_public is not None:
yield self.update_public_room_stats(now, room_id, is_public)
if not other_field_gives_publicity:
is_public = yield self._get_key_change(
prev_event_id, event_id, "history_visibility", "world_readable"
)
if is_public is not None:
yield self.update_public_room_stats(now, room_id, is_public)
elif typ == EventTypes.Encryption:
yield self.store.update_room_state(
@@ -290,6 +322,20 @@ class StatsHandler(StateDeltasHandler):
room_id, {"canonical_alias": event_content.get("alias")}
)
if room_stats_complete:
yield self.store.update_stats_delta(
now,
"room",
room_id,
room_stats_delta,
complete_with_stream_id=stream_id,
)
elif len(room_stats_delta) > 0:
yield self.store.update_stats_delta(
now, "room", room_id, room_stats_delta
)
@defer.inlineCallbacks
def update_public_room_stats(self, ts, room_id, is_public):
"""
@@ -308,10 +354,13 @@ class StatsHandler(StateDeltasHandler):
for user_id in user_ids:
if self.hs.is_mine(UserID.from_string(user_id)):
yield self.store.update_stats_delta(
ts, "user", user_id, "public_rooms", +1 if is_public else -1
)
yield self.store.update_stats_delta(
ts, "user", user_id, "private_rooms", -1 if is_public else +1
ts,
"user",
user_id,
{
"public_rooms": +1 if is_public else -1,
"private_rooms": -1 if is_public else +1,
},
)
@defer.inlineCallbacks

View File

@@ -0,0 +1,148 @@
/* Copyright 2018 New Vector Ltd
* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
----- First clean up from previous versions of room stats.
-- First remove old stats stuff
DROP TABLE IF EXISTS room_stats;
DROP TABLE IF EXISTS user_stats;
DROP TABLE IF EXISTS room_stats_earliest_tokens;
DROP TABLE IF EXISTS _temp_populate_stats_position;
DROP TABLE IF EXISTS _temp_populate_stats_rooms;
DROP TABLE IF EXISTS stats_stream_pos;
-- Unschedule old background updates if they're still scheduled
DELETE FROM background_updates WHERE update_name IN (
'populate_stats_createtables',
'populate_stats_process_rooms',
'populate_stats_cleanup'
);
----- Create tables for our version of room stats.
-- single-row table to track position of incremental updates
CREATE TABLE IF NOT EXISTS stats_incremental_position (
-- the stream_id of the last-processed state delta
state_delta_stream_id BIGINT,
-- the stream_ordering of the last-processed backfilled event
-- (this is negative)
total_events_min_stream_ordering BIGINT,
-- the stream_ordering of the last-processed normally-created event
-- (this is positive)
total_events_max_stream_ordering BIGINT,
-- If true, this represents the contract agreed upon by the background
-- population processor.
-- If false, this is suitable for use by the delta/incremental processor.
is_background_contract BOOLEAN NOT NULL PRIMARY KEY
);
-- insert a null row and make sure it is the only one.
DELETE FROM stats_incremental_position;
INSERT INTO stats_incremental_position (
state_delta_stream_id,
total_events_min_stream_ordering,
total_events_max_stream_ordering,
is_background_contract
) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1));
-- represents PRESENT room statistics for a room
CREATE TABLE IF NOT EXISTS room_stats_current (
room_id TEXT NOT NULL PRIMARY KEY,
-- These starts cover the time from start_ts...end_ts (in seconds).
-- Note that end_ts is quantised, and start_ts usually so.
start_ts BIGINT,
end_ts BIGINT,
current_state_events INT NOT NULL DEFAULT 0,
total_events INT NOT NULL DEFAULT 0,
joined_members INT NOT NULL DEFAULT 0,
invited_members INT NOT NULL DEFAULT 0,
left_members INT NOT NULL DEFAULT 0,
banned_members INT NOT NULL DEFAULT 0,
-- If initial background count is still to be performed: NULL
-- If initial background count has been performed: the maximum delta stream
-- position that this row takes into account.
completed_delta_stream_id BIGINT,
CONSTRAINT timestamp_nullity_equality CHECK ((start_ts IS NULL) = (end_ts IS NULL))
);
-- represents HISTORICAL room statistics for a room
CREATE TABLE IF NOT EXISTS room_stats_historical (
room_id TEXT NOT NULL,
-- These stats cover the time from (end_ts - bucket_size)...end_ts (in seconds).
-- Note that end_ts is quantised, and start_ts usually so.
end_ts BIGINT NOT NULL,
bucket_size INT NOT NULL,
current_state_events INT NOT NULL,
total_events INT NOT NULL,
joined_members INT NOT NULL,
invited_members INT NOT NULL,
left_members INT NOT NULL,
banned_members INT NOT NULL,
PRIMARY KEY (room_id, end_ts)
);
-- We use this index to speed up deletion of ancient room stats.
CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical (end_ts);
-- We don't need an index on (room_id, end_ts) because PRIMARY KEY sorts that
-- out for us. (We would want it to review stats for a particular room.)
-- represents PRESENT statistics for a user
CREATE TABLE IF NOT EXISTS user_stats_current (
user_id TEXT NOT NULL PRIMARY KEY,
-- The timestamp that represents the start of the
start_ts BIGINT,
end_ts BIGINT,
public_rooms INT DEFAULT 0 NOT NULL,
private_rooms INT DEFAULT 0 NOT NULL,
-- If initial background count is still to be performed: NULL
-- If initial background count has been performed: the maximum delta stream
-- position that this row takes into account.
completed_delta_stream_id BIGINT
);
-- represents HISTORICAL statistics for a user
CREATE TABLE IF NOT EXISTS user_stats_historical (
user_id TEXT NOT NULL,
end_ts BIGINT NOT NULL,
bucket_size INT NOT NULL,
public_rooms INT NOT NULL,
private_rooms INT NOT NULL,
PRIMARY KEY (user_id, end_ts)
);
-- We use this index to speed up deletion of ancient user stats.
CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical (end_ts);
-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that
-- out for us. (We would want it to review stats for a particular user.)

View File

@@ -0,0 +1,87 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This schema delta will be run after 'stats_separated1.sql' due to lexicographic
# ordering. Note that it MUST be so.
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
def _run_create_generic(stats_type, cursor, database_engine):
"""
Creates the pertinent (partial, if supported) indices for one kind of stats.
Args:
stats_type: "room" or "user" - the type of stats
cursor: Database Cursor
database_engine: Database Engine
"""
if isinstance(database_engine, Sqlite3Engine):
# even though SQLite >= 3.8 can support partial indices, we won't enable
# them, in case the SQLite database may be later used on another system.
# It's also the case that SQLite is only likely to be used in small
# deployments or testing, where the optimisations gained by use of a
# partial index are not a big concern.
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS %s_stats_current_dirty
ON %s_stats_current (end_ts);
"""
% (stats_type, stats_type)
)
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS %s_stats_not_complete
ON %s_stats_current (completed_delta_stream_id, %s_id);
"""
% (stats_type, stats_type, stats_type)
)
elif isinstance(database_engine, PostgresEngine):
# This partial index helps us with finding dirty stats rows
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS %s_stats_current_dirty
ON %s_stats_current (end_ts)
WHERE end_ts IS NOT NULL;
"""
% (stats_type, stats_type)
)
# This partial index helps us with old collection
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS %s_stats_not_complete
ON %s_stats_current (%s_id)
WHERE completed_delta_stream_id IS NULL;
"""
% (stats_type, stats_type, stats_type)
)
else:
raise NotImplementedError("Unknown database engine.")
def run_create(cursor, database_engine):
"""
This function is called as part of the schema delta.
It will create indices - partial, if supported - for the new 'separated'
room & user statistics.
"""
_run_create_generic("room", cursor, database_engine)
_run_create_generic("user", cursor, database_engine)
def run_upgrade(cur, database_engine, config):
"""
This function is run on a database upgrade (of a non-empty database).
We have no need to do anything specific here.
"""
pass

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2018, 2019 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,11 +15,12 @@
# limitations under the License.
import logging
from itertools import chain
from threading import Lock
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.storage.prepare_database import get_statements
from synapse.storage.engines import Sqlite3Engine
from synapse.storage.state_deltas import StateDeltasStore
from synapse.util.caches.descriptors import cached
@@ -32,14 +34,21 @@ ABSOLUTE_STATS_FIELDS = {
"invited_members",
"left_members",
"banned_members",
"state_events",
"total_events",
),
"user": ("public_rooms", "private_rooms"),
}
TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
# these fields are per-timeslice and so should be reset to 0 upon a new slice
PER_SLICE_FIELDS = {"room": (), "user": ()}
TEMP_TABLE = "_temp_populate_stats"
TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
class OldCollectionRequired(Exception):
""" Signal that we need to collect old stats rows and retry. """
pass
class StatsStore(StateDeltasStore):
@@ -51,291 +60,110 @@ class StatsStore(StateDeltasStore):
self.stats_enabled = hs.config.stats_enabled
self.stats_bucket_size = hs.config.stats_bucket_size
self.register_background_update_handler(
"populate_stats_createtables", self._populate_stats_createtables
)
self.register_background_update_handler(
"populate_stats_process_rooms", self._populate_stats_process_rooms
)
self.register_background_update_handler(
"populate_stats_cleanup", self._populate_stats_cleanup
)
self.stats_delta_processing_lock = Lock()
@defer.inlineCallbacks
def _populate_stats_createtables(self, progress, batch_size):
self.register_noop_background_update("populate_stats_createtables")
self.register_noop_background_update("populate_stats_process_rooms")
self.register_noop_background_update("populate_stats_cleanup")
if not self.stats_enabled:
yield self._end_background_update("populate_stats_createtables")
return 1
# Get all the rooms that we want to process.
def _make_staging_area(txn):
# Create the temporary tables
stmts = get_statements(
"""
-- We just recreate the table, we'll be reinserting the
-- correct entries again later anyway.
DROP TABLE IF EXISTS {temp}_rooms;
CREATE TABLE IF NOT EXISTS {temp}_rooms(
room_id TEXT NOT NULL,
events BIGINT NOT NULL
);
CREATE INDEX {temp}_rooms_events
ON {temp}_rooms(events);
CREATE INDEX {temp}_rooms_id
ON {temp}_rooms(room_id);
""".format(
temp=TEMP_TABLE
).splitlines()
)
for statement in stmts:
txn.execute(statement)
sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_position(position TEXT NOT NULL)"
)
txn.execute(sql)
# Get rooms we want to process from the database, only adding
# those that we haven't (i.e. those not in room_stats_earliest_token)
sql = """
INSERT INTO %s_rooms (room_id, events)
SELECT c.room_id, count(*) FROM current_state_events AS c
LEFT JOIN room_stats_earliest_token AS t USING (room_id)
WHERE t.room_id IS NULL
GROUP BY c.room_id
""" % (
TEMP_TABLE,
)
txn.execute(sql)
new_pos = yield self.get_max_stream_id_in_current_state_deltas()
yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
self.get_earliest_token_for_room_stats.invalidate_all()
yield self._end_background_update("populate_stats_createtables")
return 1
@defer.inlineCallbacks
def _populate_stats_cleanup(self, progress, batch_size):
def quantise_stats_time(self, ts):
"""
Update the user directory stream position, then clean up the old tables.
Quantises a timestamp to be a multiple of the bucket size.
Args:
ts: the timestamp to quantise, in seconds since the Unix Epoch
Returns:
a timestamp which
- is divisible by the bucket size;
- is no later than `ts`; and
- is the largest such timestamp.
"""
if not self.stats_enabled:
yield self._end_background_update("populate_stats_cleanup")
return 1
return (ts // self.stats_bucket_size) * self.stats_bucket_size
position = yield self._simple_select_one_onecol(
TEMP_TABLE + "_position", None, "position"
)
yield self.update_stats_stream_pos(position)
def _delete_staging_area(txn):
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
yield self._end_background_update("populate_stats_cleanup")
return 1
@defer.inlineCallbacks
def _populate_stats_process_rooms(self, progress, batch_size):
if not self.stats_enabled:
yield self._end_background_update("populate_stats_process_rooms")
return 1
# If we don't have progress filed, delete everything.
if not progress:
yield self.delete_all_stats()
def _get_next_batch(txn):
# Only fetch 250 rooms, so we don't fetch too many at once, even
# if those 250 rooms have less than batch_size state events.
sql = """
SELECT room_id, events FROM %s_rooms
ORDER BY events DESC
LIMIT 250
""" % (
TEMP_TABLE,
)
txn.execute(sql)
rooms_to_work_on = txn.fetchall()
if not rooms_to_work_on:
return None
# Get how many are left to process, so we can give status on how
# far we are in processing
txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
progress["remaining"] = txn.fetchone()[0]
return rooms_to_work_on
rooms_to_work_on = yield self.runInteraction(
"populate_stats_temp_read", _get_next_batch
)
# No more rooms -- complete the transaction.
if not rooms_to_work_on:
yield self._end_background_update("populate_stats_process_rooms")
return 1
logger.info(
"Processing the next %d rooms of %d remaining",
len(rooms_to_work_on),
progress["remaining"],
)
# Number of state events we've processed by going through each room
processed_event_count = 0
for room_id, event_count in rooms_to_work_on:
current_state_ids = yield self.get_current_state_ids(room_id)
join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
history_visibility_id = current_state_ids.get(
(EventTypes.RoomHistoryVisibility, "")
)
encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
name_id = current_state_ids.get((EventTypes.Name, ""))
topic_id = current_state_ids.get((EventTypes.Topic, ""))
avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
event_ids = [
join_rules_id,
history_visibility_id,
encryption_id,
name_id,
topic_id,
avatar_id,
canonical_alias_id,
]
state_events = yield self.get_events(
[ev for ev in event_ids if ev is not None]
)
def _get_or_none(event_id, arg):
event = state_events.get(event_id)
if event:
return event.content.get(arg)
return None
yield self.update_room_state(
room_id,
{
"join_rules": _get_or_none(join_rules_id, "join_rule"),
"history_visibility": _get_or_none(
history_visibility_id, "history_visibility"
),
"encryption": _get_or_none(encryption_id, "algorithm"),
"name": _get_or_none(name_id, "name"),
"topic": _get_or_none(topic_id, "topic"),
"avatar": _get_or_none(avatar_id, "url"),
"canonical_alias": _get_or_none(canonical_alias_id, "alias"),
},
)
now = self.hs.get_reactor().seconds()
# quantise time to the nearest bucket
now = (now // self.stats_bucket_size) * self.stats_bucket_size
def _fetch_data(txn):
# Get the current token of the room
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
current_state_events = len(current_state_ids)
membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
total_state_events = self._get_total_state_event_counts_txn(
txn, room_id
)
self._update_stats_txn(
txn,
"room",
room_id,
now,
{
"bucket_size": self.stats_bucket_size,
"current_state_events": current_state_events,
"joined_members": membership_counts.get(Membership.JOIN, 0),
"invited_members": membership_counts.get(Membership.INVITE, 0),
"left_members": membership_counts.get(Membership.LEAVE, 0),
"banned_members": membership_counts.get(Membership.BAN, 0),
"state_events": total_state_events,
},
)
self._simple_insert_txn(
txn,
"room_stats_earliest_token",
{"room_id": room_id, "token": current_token},
)
# We've finished a room. Delete it from the table.
self._simple_delete_one_txn(
txn, TEMP_TABLE + "_rooms", {"room_id": room_id}
)
yield self.runInteraction("update_room_stats", _fetch_data)
# Update the remaining counter.
progress["remaining"] -= 1
yield self.runInteraction(
"populate_stats",
self._background_update_progress_txn,
"populate_stats_process_rooms",
progress,
)
processed_event_count += event_count
if processed_event_count > batch_size:
# Don't process any more rooms, we've hit our batch size.
return processed_event_count
return processed_event_count
def delete_all_stats(self):
def get_stats_positions(self, for_initial_processor=False):
"""
Delete all statistics records.
Returns the stats processor positions.
Args:
for_initial_processor (bool, optional): If true, returns the position
promised by the latest stats regeneration, rather than the current
incremental processor's position.
Otherwise (if false), return the incremental processor's position.
Returns (dict):
Dict containing :-
state_delta_stream_id: stream_id of last-processed state delta
total_events_min_stream_ordering: stream_ordering of latest-processed
backfilled event, in the context of total_events counting.
total_events_max_stream_ordering: stream_ordering of latest-processed
non-backfilled event, in the context of total_events counting.
"""
def _delete_all_stats_txn(txn):
txn.execute("DELETE FROM room_state")
txn.execute("DELETE FROM room_stats")
txn.execute("DELETE FROM room_stats_earliest_token")
txn.execute("DELETE FROM user_stats")
return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
def get_stats_stream_pos(self):
return self._simple_select_one_onecol(
table="stats_stream_pos",
keyvalues={},
retcol="stream_id",
desc="stats_stream_pos",
return self._simple_select_one(
table="stats_incremental_position",
keyvalues={"is_background_contract": for_initial_processor},
retcols=(
"state_delta_stream_id",
"total_events_min_stream_ordering",
"total_events_max_stream_ordering",
),
desc="stats_incremental_position",
)
def update_stats_stream_pos(self, stream_id):
def _get_stats_positions_txn(self, txn, for_initial_processor=False):
"""
See L{get_stats_positions}.
Args:
txn (cursor): Database cursor
"""
return self._simple_select_one_txn(
txn=txn,
table="stats_incremental_position",
keyvalues={"is_background_contract": for_initial_processor},
retcols=(
"state_delta_stream_id",
"total_events_min_stream_ordering",
"total_events_max_stream_ordering",
),
)
def update_stats_positions(self, positions, for_initial_processor=False):
"""
Updates the stats processor positions.
Args:
positions: See L{get_stats_positions}
for_initial_processor: See L{get_stats_positions}
"""
if positions is None:
positions = {
"state_delta_stream_id": None,
"total_events_min_stream_ordering": None,
"total_events_max_stream_ordering": None,
}
return self._simple_update_one(
table="stats_stream_pos",
keyvalues={},
updatevalues={"stream_id": stream_id},
desc="update_stats_stream_pos",
table="stats_incremental_position",
keyvalues={"is_background_contract": for_initial_processor},
updatevalues=positions,
desc="update_stats_incremental_position",
)
def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False):
"""
See L{update_stats_positions}
"""
if positions is None:
positions = {
"state_delta_stream_id": None,
"total_events_min_stream_ordering": None,
"total_events_max_stream_ordering": None,
}
return self._simple_update_one_txn(
txn,
table="stats_incremental_position",
keyvalues={"is_background_contract": for_initial_processor},
updatevalues=positions,
)
def update_room_state(self, room_id, fields):
@@ -367,36 +195,8 @@ class StatsStore(StateDeltasStore):
desc="update_room_state",
)
def get_deltas_for_room(self, room_id, start, size=100):
"""
Get statistics deltas for a given room.
Args:
room_id (str)
start (int): Pagination start. Number of entries, not timestamp.
size (int): How many entries to return.
Returns:
Deferred[list[dict]], where the dict has the keys of
ABSOLUTE_STATS_FIELDS["room"] and "ts".
"""
return self._simple_select_list_paginate(
"room_stats",
{"room_id": room_id},
"ts",
start,
size,
retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
order_direction="DESC",
)
def get_all_room_state(self):
return self._simple_select_list(
"room_state", None, retcols=("name", "topic", "canonical_alias")
)
@cached()
def get_earliest_token_for_room_stats(self, room_id):
def get_earliest_token_for_stats(self, stats_type, id):
"""
Fetch the "earliest token". This is used by the room stats delta
processor to ignore deltas that have been processed between the
@@ -406,79 +206,229 @@ class StatsStore(StateDeltasStore):
Returns:
Deferred[int]
"""
table, id_col = TYPE_TO_TABLE[stats_type]
return self._simple_select_one_onecol(
"room_stats_earliest_token",
{"room_id": room_id},
retcol="token",
"%s_current" % (table,),
{id_col: id},
retcol="completed_delta_stream_id",
allow_none=True,
)
def update_stats(self, stats_type, stats_id, ts, fields):
table, id_col = TYPE_TO_ROOM[stats_type]
return self._simple_upsert(
table=table,
keyvalues={id_col: stats_id, "ts": ts},
values=fields,
desc="update_stats",
def _collect_old_txn(self, txn, stats_type, limit=500):
"""
See {collect_old}. Runs only a small batch, specified by limit.
Returns (bool):
True iff there is possibly more to do (i.e. this needs re-running),
False otherwise.
"""
# we do them in batches to prevent concurrent updates from
# messing us over with lots of retries
now = self.hs.get_reactor().seconds()
quantised_ts = self.quantise_stats_time(now)
table, id_col = TYPE_TO_TABLE[stats_type]
fields = ", ".join(
field
for field in chain(
ABSOLUTE_STATS_FIELDS[stats_type], PER_SLICE_FIELDS[stats_type]
)
)
def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
table, id_col = TYPE_TO_ROOM[stats_type]
return self._simple_upsert_txn(
txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
)
def update_stats_delta(self, ts, stats_type, stats_id, field, value):
def _update_stats_delta(txn):
table, id_col = TYPE_TO_ROOM[stats_type]
# `end_ts IS NOT NULL` is for partial index optimisation
if isinstance(self.database_engine, Sqlite3Engine):
# SQLite doesn't support SELECT FOR UPDATE
sql = (
"SELECT * FROM %s"
" WHERE %s=? and ts=("
" SELECT MAX(ts) FROM %s"
" WHERE %s=?"
")"
) % (table, id_col, table, id_col)
txn.execute(sql, (stats_id, stats_id))
rows = self.cursor_to_dict(txn)
if len(rows) == 0:
# silently skip as we don't have anything to apply a delta to yet.
# this tries to minimise any race between the initial sync and
# subsequent deltas arriving.
return
"SELECT %s FROM %s_current"
" WHERE end_ts <= ? AND end_ts IS NOT NULL"
" LIMIT %d"
) % (id_col, table, limit)
else:
sql = (
"SELECT %s FROM %s_current"
" WHERE end_ts <= ? AND end_ts IS NOT NULL"
" LIMIT %d FOR UPDATE"
) % (id_col, table, limit)
txn.execute(sql, (quantised_ts,))
maybe_more = txn.rowcount == limit
updates = txn.fetchall()
current_ts = ts
latest_ts = rows[0]["ts"]
if current_ts < latest_ts:
# This one is in the past, but we're just encountering it now.
# Mark it as part of the current bucket.
current_ts = latest_ts
elif ts != latest_ts:
# we have to copy our absolute counters over to the new entry.
values = {
key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
}
values[id_col] = stats_id
values["ts"] = ts
values["bucket_size"] = self.stats_bucket_size
sql = (
"INSERT INTO %s_historical (%s, %s, bucket_size, end_ts)"
" SELECT %s, %s, end_ts - start_ts AS bucket_size, end_ts"
" FROM %s_current WHERE %s = ?"
) % (table, id_col, fields, id_col, fields, table, id_col)
txn.executemany(sql, updates)
self._simple_insert_txn(txn, table=table, values=values)
sql = ("UPDATE %s_current SET start_ts = NULL, end_ts = NULL WHERE %s = ?") % (
table,
id_col,
)
txn.executemany(sql, updates)
# actually update the new value
if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
self._simple_update_txn(
txn,
table=table,
keyvalues={id_col: stats_id, "ts": current_ts},
updatevalues={field: value},
return maybe_more
@defer.inlineCallbacks
def collect_old(self, stats_type):
"""
Run 'old collection' on current stats rows.
Old collection is the process of copying dirty (updated) stats rows
from the current table to the historical table, when those rows have
finished their stats time slice.
Collected rows are then cleared of their dirty status.
Args:
stats_type: "room" or "user" the type of stats to run old collection
on.
"""
while True:
maybe_more = yield self.runInteraction(
"stats_collect_old", self._collect_old_txn, stats_type
)
if not maybe_more:
return None
@defer.inlineCallbacks
def update_stats_delta(
self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
):
"""
Updates the statistics for a subject, with a delta (difference/relative
change).
Args:
ts (int): timestamp of the change
stats_type (str): "room" or "user" the kind of subject
stats_id (str): the subject's ID (room ID or user ID)
fields (dict[str, int]): Deltas of stats values.
complete_with_stream_id (int, optional):
If supplied, converts an incomplete row into a complete row,
with the supplied stream_id marked as the stream_id where the
row was completed.
"""
while True:
try:
res = yield self.runInteraction(
"update_stats_delta",
self._update_stats_delta_txn,
ts,
stats_type,
stats_id,
fields,
complete_with_stream_id=complete_with_stream_id,
)
else:
sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
table,
field,
field,
id_col,
)
txn.execute(sql, (value, stats_id, current_ts))
return res
except OldCollectionRequired:
# retry after collecting old rows
yield self.collect_old(stats_type)
return self.runInteraction("update_stats_delta", _update_stats_delta)
def _update_stats_delta_txn(
self,
txn,
ts,
stats_type,
stats_id,
fields,
complete_with_stream_id=None,
absolute_fields=None,
):
"""
See L{update_stats_delta}
Additional Args:
absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas).
"""
table, id_col = TYPE_TO_TABLE[stats_type]
quantised_ts = self.quantise_stats_time(int(ts))
end_ts = quantised_ts + self.stats_bucket_size
field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()]
field_values = list(fields.values())
if absolute_fields is not None:
field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()]
field_values += list(absolute_fields.values())
if complete_with_stream_id is not None:
field_sqls.append("completed_delta_stream_id = ?")
field_values.append(complete_with_stream_id)
sql = (
"UPDATE %s_current SET end_ts = ?, %s"
" WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))"
" AND %s = ?"
) % (table, ", ".join(field_sqls), id_col)
qargs = [end_ts] + list(field_values) + [end_ts, stats_id]
txn.execute(sql, qargs)
if txn.rowcount > 0:
# success.
return
# if we're here, it's because we didn't succeed in updating a stats
# row. Why? Let's find out…
current_row = self._simple_select_one_txn(
txn,
table + "_current",
{id_col: stats_id},
("end_ts", "completed_delta_stream_id"),
allow_none=True,
)
if current_row is None:
# we need to insert a row! (insert a dirty, incomplete row)
insertee = {
id_col: stats_id,
"end_ts": end_ts,
"start_ts": ts,
"completed_delta_stream_id": complete_with_stream_id,
}
# we assume that, by default, blank fields should be zero.
for field_name in ABSOLUTE_STATS_FIELDS[stats_type]:
insertee[field_name] = 0
for field_name in PER_SLICE_FIELDS[stats_type]:
insertee[field_name] = 0
for (field, value) in fields.items():
insertee[field] = value
if absolute_fields is not None:
for (field, value) in absolute_fields.items():
insertee[field] = value
self._simple_insert_txn(txn, table + "_current", insertee)
elif current_row["end_ts"] is None:
# update the row, including start_ts
sql = (
"UPDATE %s_current SET start_ts = ?, end_ts = ?, %s"
" WHERE end_ts IS NULL AND %s = ?"
) % (table, ", ".join(field_sqls), id_col)
qargs = (
[end_ts - self.stats_bucket_size, end_ts]
+ list(field_values)
+ [stats_id]
)
txn.execute(sql, qargs)
if txn.rowcount == 0:
raise RuntimeError(
"Should be impossible: No rows updated"
" but all conditions are known to be met."
)
elif current_row["end_ts"] < end_ts:
# we need to perform old collection first
raise OldCollectionRequired()

View File

@@ -1,304 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import Mock
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from tests import unittest
class StatsRoomTests(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets_for_client_rest_resource,
room.register_servlets,
login.register_servlets,
]
def prepare(self, reactor, clock, hs):
self.store = hs.get_datastore()
self.handler = self.hs.get_stats_handler()
def _add_background_updates(self):
"""
Add the background updates we need to run.
"""
# Ugh, have to reset this flag
self.store._all_done = False
self.get_success(
self.store._simple_insert(
"background_updates",
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_process_rooms",
"progress_json": "{}",
"depends_on": "populate_stats_createtables",
},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_cleanup",
"progress_json": "{}",
"depends_on": "populate_stats_process_rooms",
},
)
)
def test_initial_room(self):
"""
The background updates will build the table from scratch.
"""
r = self.get_success(self.store.get_all_room_state())
self.assertEqual(len(r), 0)
# Disable stats
self.hs.config.stats_enabled = False
self.handler.stats_enabled = False
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")
room_1 = self.helper.create_room_as(u1, tok=u1_token)
self.helper.send_state(
room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
)
# Stats disabled, shouldn't have done anything
r = self.get_success(self.store.get_all_room_state())
self.assertEqual(len(r), 0)
# Enable stats
self.hs.config.stats_enabled = True
self.handler.stats_enabled = True
# Do the initial population of the user directory via the background update
self._add_background_updates()
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
r = self.get_success(self.store.get_all_room_state())
self.assertEqual(len(r), 1)
self.assertEqual(r[0]["topic"], "foo")
def test_initial_earliest_token(self):
"""
Ingestion via notify_new_event will ignore tokens that the background
update have already processed.
"""
self.reactor.advance(86401)
self.hs.config.stats_enabled = False
self.handler.stats_enabled = False
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")
u2 = self.register_user("u2", "pass")
u2_token = self.login("u2", "pass")
u3 = self.register_user("u3", "pass")
u3_token = self.login("u3", "pass")
room_1 = self.helper.create_room_as(u1, tok=u1_token)
self.helper.send_state(
room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
)
# Begin the ingestion by creating the temp tables. This will also store
# the position that the deltas should begin at, once they take over.
self.hs.config.stats_enabled = True
self.handler.stats_enabled = True
self.store._all_done = False
self.get_success(self.store.update_stats_stream_pos(None))
self.get_success(
self.store._simple_insert(
"background_updates",
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
)
)
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
# Now, before the table is actually ingested, add some more events.
self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token)
self.helper.join(room=room_1, user=u2, tok=u2_token)
# Now do the initial ingestion.
self.get_success(
self.store._simple_insert(
"background_updates",
{"update_name": "populate_stats_process_rooms", "progress_json": "{}"},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_cleanup",
"progress_json": "{}",
"depends_on": "populate_stats_process_rooms",
},
)
)
self.store._all_done = False
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
self.reactor.advance(86401)
# Now add some more events, triggering ingestion. Because of the stream
# position being set to before the events sent in the middle, a simpler
# implementation would reprocess those events, and say there were four
# users, not three.
self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token)
self.helper.join(room=room_1, user=u3, tok=u3_token)
# Get the deltas! There should be two -- day 1, and day 2.
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
# The oldest has 2 joined members
self.assertEqual(r[-1]["joined_members"], 2)
# The newest has 3
self.assertEqual(r[0]["joined_members"], 3)
def test_incorrect_state_transition(self):
"""
If the state transition is not one of (JOIN, INVITE, LEAVE, BAN) to
(JOIN, INVITE, LEAVE, BAN), an error is raised.
"""
events = {
"a1": {"membership": Membership.LEAVE},
"a2": {"membership": "not a real thing"},
}
def get_event(event_id, allow_none=True):
m = Mock()
m.content = events[event_id]
d = defer.Deferred()
self.reactor.callLater(0.0, d.callback, m)
return d
def get_received_ts(event_id):
return defer.succeed(1)
self.store.get_received_ts = get_received_ts
self.store.get_event = get_event
deltas = [
{
"type": EventTypes.Member,
"state_key": "some_user",
"room_id": "room",
"event_id": "a1",
"prev_event_id": "a2",
"stream_id": 60,
}
]
f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
self.assertEqual(
f.value.args[0], "'not a real thing' is not a valid prev_membership"
)
# And the other way...
deltas = [
{
"type": EventTypes.Member,
"state_key": "some_user",
"room_id": "room",
"event_id": "a2",
"prev_event_id": "a1",
"stream_id": 100,
}
]
f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
self.assertEqual(
f.value.args[0], "'not a real thing' is not a valid membership"
)
def test_redacted_prev_event(self):
"""
If the prev_event does not exist, then it is assumed to be a LEAVE.
"""
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")
room_1 = self.helper.create_room_as(u1, tok=u1_token)
# Do the initial population of the user directory via the background update
self._add_background_updates()
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
events = {"a1": None, "a2": {"membership": Membership.JOIN}}
def get_event(event_id, allow_none=True):
if events.get(event_id):
m = Mock()
m.content = events[event_id]
else:
m = None
d = defer.Deferred()
self.reactor.callLater(0.0, d.callback, m)
return d
def get_received_ts(event_id):
return defer.succeed(1)
self.store.get_received_ts = get_received_ts
self.store.get_event = get_event
deltas = [
{
"type": EventTypes.Member,
"state_key": "some_user:test",
"room_id": room_1,
"event_id": "a2",
"prev_event_id": "a1",
"stream_id": 100,
}
]
# Handle our fake deltas, which has a user going from LEAVE -> JOIN.
self.get_success(self.handler._handle_deltas(deltas))
# One delta, with two joined members -- the room creator, and our fake
# user.
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
self.assertEqual(len(r), 1)
self.assertEqual(r[0]["joined_members"], 2)