Compare commits

...

33 Commits

Author SHA1 Message Date
Olivier Wilkinson (reivilibre)
a344ad3d3f Code formatting (Black)
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-28 09:33:03 +01:00
reivilibre
b9f1adc370 Update synapse/storage/stats.py
Co-Authored-By: Erik Johnston <erik@matrix.org>
2019-08-28 09:01:25 +01:00
Olivier Wilkinson (reivilibre)
1af7866562 Clean up code with improved naming and hoist around functions.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 16:16:53 +01:00
Olivier Wilkinson (reivilibre)
324f21b216 Fix logic error.
`absolute_fields` being None shouldn't preclude completion of a current
stats row.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 14:53:45 +01:00
Olivier Wilkinson (reivilibre)
62b1250629 Update _purge_room_txn to take account of separated stats tables
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 14:24:56 +01:00
Olivier Wilkinson (reivilibre)
11c4e506bd Rename room_state table to room_stats_state
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 14:24:25 +01:00
Olivier Wilkinson (reivilibre)
491eaf0808 Remove obsolete OldCollectionRequired as old collection is obsolete.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 14:19:38 +01:00
Olivier Wilkinson (reivilibre)
c775f310e9 Don't include the room & user stats docs in this PR.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 13:51:25 +01:00
Olivier Wilkinson (reivilibre)
09cbc3a8e9 Switch to milliseconds in room/user stats for consistency.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 13:50:58 +01:00
Olivier Wilkinson (reivilibre)
736ac58e11 Code formatting (Black)
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 13:42:33 +01:00
Olivier Wilkinson (reivilibre)
a6c102009e Lock tables in upsert fall-backs.
Should not be too much of a performance concern as this code won't be
hit on Postgres, which large deployments should be using.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 13:41:48 +01:00
Olivier Wilkinson (reivilibre)
544ba2c2e9 Apply minor suggestions from review
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 13:40:00 +01:00
Olivier Wilkinson (reivilibre)
81c5289c83 Clarify _update_stats_delta_txn by adding code comments and kwargs.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 13:35:37 +01:00
reivilibre
4b7bf2e413 Apply suggestions from code review
Co-Authored-By: Erik Johnston <erik@matrix.org>
2019-08-27 13:26:08 +01:00
Olivier Wilkinson (reivilibre)
5043ef801a Merge branch 'rei/rss_target' into rei/rss_inc2 2019-08-27 11:56:34 +01:00
Olivier Wilkinson (reivilibre)
baeaf00a12 Merge branch 'develop' into rei/rss_target 2019-08-27 11:55:27 +01:00
Olivier Wilkinson (reivilibre)
1ecd1a6a5f Use engine-specific delta SQL files rather than delta written in Python.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 09:50:49 +01:00
Olivier Wilkinson (reivilibre)
c3d2bf2807 Allow schema deltas to be engine-specific
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 08:52:20 +01:00
Olivier Wilkinson (reivilibre)
79252d1c83 Fix up historical stats support.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-22 16:10:32 +01:00
Olivier Wilkinson (reivilibre)
e8fc180d4d Fix up SQL schema delta
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-22 16:10:05 +01:00
Olivier Wilkinson (reivilibre)
7b657f1148 Simplify table structure
This obviates the need for old collection, but comes at the minor cost
of not being able to track historical stats or per-slice fields until
after the statistics regenerator is finished.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-22 15:40:58 +01:00
Olivier Wilkinson (reivilibre)
18a4c03c50 Remove needless defaults.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 16:04:04 +01:00
Olivier Wilkinson (reivilibre)
eafa8d3c54 Unify name of 'stats regenerator' in schema comments.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 16:03:37 +01:00
Olivier Wilkinson (reivilibre)
977310ee27 Clarify _update_stats_delta_txn
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 15:49:00 +01:00
Olivier Wilkinson (reivilibre)
981c6cf544 Sanitise accepted fields in _update_stats_delta_txn
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 15:41:10 +01:00
Olivier Wilkinson (reivilibre)
6a19f7e101 Add room and user statistics documentation.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 15:35:20 +01:00
reivilibre
4a97eef0dc Update synapse/storage/stats.py
Co-Authored-By: Erik Johnston <erik@matrix.org>
2019-08-20 15:12:21 +01:00
reivilibre
b5573c0ffb Update synapse/storage/stats.py
Co-Authored-By: Erik Johnston <erik@matrix.org>
2019-08-20 15:02:49 +01:00
Olivier Wilkinson (reivilibre)
1819563640 Ack, isort!
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 14:27:47 +01:00
Olivier Wilkinson (reivilibre)
80a1c6e9e5 Add storage function for storing stats deltas
Old collection is not included in this commit

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 14:08:02 +01:00
Olivier Wilkinson (reivilibre)
d7675e79e1 Add schema for Separated Statistics
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 13:45:05 +01:00
reivilibre
8de9ebe35d Tear out current room & user statistics (#5880)
* Tear out current room & user statistics.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>

* Black is back with more linting complaints

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 13:22:07 +01:00
Olivier Wilkinson (reivilibre)
8374bcb0a8 Newsfile
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-19 15:25:53 +01:00
9 changed files with 480 additions and 987 deletions

1
changelog.d/5879.misc Normal file
View File

@@ -0,0 +1 @@
Rework room and user statistics to separate current & historical rows, as well as track stats correctly.

View File

@@ -27,19 +27,16 @@ class StatsConfig(Config):
def read_config(self, config, **kwargs):
self.stats_enabled = True
self.stats_bucket_size = 86400
self.stats_bucket_size = 86400 * 1000
self.stats_retention = sys.maxsize
stats_config = config.get("stats", None)
if stats_config:
self.stats_enabled = stats_config.get("enabled", self.stats_enabled)
self.stats_bucket_size = (
self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000
self.stats_bucket_size = self.parse_duration(
stats_config.get("bucket_size", "1d")
)
self.stats_retention = (
self.parse_duration(
stats_config.get("retention", "%ds" % (sys.maxsize,))
)
/ 1000
self.stats_retention = self.parse_duration(
stats_config.get("retention", "%ds" % (sys.maxsize,))
)
def generate_config_section(self, config_dir_path, server_name, **kwargs):

View File

@@ -15,14 +15,7 @@
import logging
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.handlers.state_deltas import StateDeltasHandler
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@@ -49,9 +42,6 @@ class StatsHandler(StateDeltasHandler):
# The current position in the current_state_delta stream
self.pos = None
# Guard to ensure we only process deltas one at a time
self._is_processing = False
if hs.config.stats_enabled:
self.notifier.add_replication_callback(self.notify_new_event)
@@ -62,272 +52,4 @@ class StatsHandler(StateDeltasHandler):
def notify_new_event(self):
"""Called when there may be more deltas to process
"""
if not self.hs.config.stats_enabled:
return
if self._is_processing:
return
@defer.inlineCallbacks
def process():
try:
yield self._unsafe_process()
finally:
self._is_processing = False
self._is_processing = True
run_as_background_process("stats.notify_new_event", process)
@defer.inlineCallbacks
def _unsafe_process(self):
# If self.pos is None then means we haven't fetched it from DB
if self.pos is None:
self.pos = yield self.store.get_stats_stream_pos()
# If still None then the initial background update hasn't happened yet
if self.pos is None:
return None
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "stats_delta"):
deltas = yield self.store.get_current_state_deltas(self.pos)
if not deltas:
return
logger.info("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
self.pos = deltas[-1]["stream_id"]
yield self.store.update_stats_stream_pos(self.pos)
event_processing_positions.labels("stats").set(self.pos)
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
"""
Called with the state deltas to process
"""
for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
room_id = delta["room_id"]
event_id = delta["event_id"]
stream_id = delta["stream_id"]
prev_event_id = delta["prev_event_id"]
stream_pos = delta["stream_id"]
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
token = yield self.store.get_earliest_token_for_room_stats(room_id)
# If the earliest token to begin from is larger than our current
# stream ID, skip processing this delta.
if token is not None and token >= stream_id:
logger.debug(
"Ignoring: %s as earlier than this room's initial ingestion event",
event_id,
)
continue
if event_id is None and prev_event_id is None:
# Errr...
continue
event_content = {}
if event_id is not None:
event = yield self.store.get_event(event_id, allow_none=True)
if event:
event_content = event.content or {}
# We use stream_pos here rather than fetch by event_id as event_id
# may be None
now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
# quantise time to the nearest bucket
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
if typ == EventTypes.Member:
# we could use _get_key_change here but it's a bit inefficient
# given we're not testing for a specific result; might as well
# just grab the prev_membership and membership strings and
# compare them.
prev_event_content = {}
if prev_event_id is not None:
prev_event = yield self.store.get_event(
prev_event_id, allow_none=True
)
if prev_event:
prev_event_content = prev_event.content
membership = event_content.get("membership", Membership.LEAVE)
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
if prev_membership == membership:
continue
if prev_membership == Membership.JOIN:
yield self.store.update_stats_delta(
now, "room", room_id, "joined_members", -1
)
elif prev_membership == Membership.INVITE:
yield self.store.update_stats_delta(
now, "room", room_id, "invited_members", -1
)
elif prev_membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now, "room", room_id, "left_members", -1
)
elif prev_membership == Membership.BAN:
yield self.store.update_stats_delta(
now, "room", room_id, "banned_members", -1
)
else:
err = "%s is not a valid prev_membership" % (repr(prev_membership),)
logger.error(err)
raise ValueError(err)
if membership == Membership.JOIN:
yield self.store.update_stats_delta(
now, "room", room_id, "joined_members", +1
)
elif membership == Membership.INVITE:
yield self.store.update_stats_delta(
now, "room", room_id, "invited_members", +1
)
elif membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now, "room", room_id, "left_members", +1
)
elif membership == Membership.BAN:
yield self.store.update_stats_delta(
now, "room", room_id, "banned_members", +1
)
else:
err = "%s is not a valid membership" % (repr(membership),)
logger.error(err)
raise ValueError(err)
user_id = state_key
if self.is_mine_id(user_id):
# update user_stats as it's one of our users
public = yield self._is_public_room(room_id)
if membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now,
"user",
user_id,
"public_rooms" if public else "private_rooms",
-1,
)
elif membership == Membership.JOIN:
yield self.store.update_stats_delta(
now,
"user",
user_id,
"public_rooms" if public else "private_rooms",
+1,
)
elif typ == EventTypes.Create:
# Newly created room. Add it with all blank portions.
yield self.store.update_room_state(
room_id,
{
"join_rules": None,
"history_visibility": None,
"encryption": None,
"name": None,
"topic": None,
"avatar": None,
"canonical_alias": None,
},
)
elif typ == EventTypes.JoinRules:
yield self.store.update_room_state(
room_id, {"join_rules": event_content.get("join_rule")}
)
is_public = yield self._get_key_change(
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
)
if is_public is not None:
yield self.update_public_room_stats(now, room_id, is_public)
elif typ == EventTypes.RoomHistoryVisibility:
yield self.store.update_room_state(
room_id,
{"history_visibility": event_content.get("history_visibility")},
)
is_public = yield self._get_key_change(
prev_event_id, event_id, "history_visibility", "world_readable"
)
if is_public is not None:
yield self.update_public_room_stats(now, room_id, is_public)
elif typ == EventTypes.Encryption:
yield self.store.update_room_state(
room_id, {"encryption": event_content.get("algorithm")}
)
elif typ == EventTypes.Name:
yield self.store.update_room_state(
room_id, {"name": event_content.get("name")}
)
elif typ == EventTypes.Topic:
yield self.store.update_room_state(
room_id, {"topic": event_content.get("topic")}
)
elif typ == EventTypes.RoomAvatar:
yield self.store.update_room_state(
room_id, {"avatar": event_content.get("url")}
)
elif typ == EventTypes.CanonicalAlias:
yield self.store.update_room_state(
room_id, {"canonical_alias": event_content.get("alias")}
)
@defer.inlineCallbacks
def update_public_room_stats(self, ts, room_id, is_public):
"""
Increment/decrement a user's number of public rooms when a room they are
in changes to/from public visibility.
Args:
ts (int): Timestamp in seconds
room_id (str)
is_public (bool)
"""
# For now, blindly iterate over all local users in the room so that
# we can handle the whole problem of copying buckets over as needed
user_ids = yield self.store.get_users_in_room(room_id)
for user_id in user_ids:
if self.hs.is_mine(UserID.from_string(user_id)):
yield self.store.update_stats_delta(
ts, "user", user_id, "public_rooms", +1 if is_public else -1
)
yield self.store.update_stats_delta(
ts, "user", user_id, "private_rooms", -1 if is_public else +1
)
@defer.inlineCallbacks
def _is_public_room(self, room_id):
join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
history_visibility = yield self.state.get_current_state(
room_id, EventTypes.RoomHistoryVisibility
)
if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
(
history_visibility
and history_visibility.content.get("history_visibility")
== "world_readable"
)
):
return True
else:
return False
pass

View File

@@ -2270,8 +2270,9 @@ class EventsStore(
"room_aliases",
"room_depth",
"room_memberships",
"room_state",
"room_stats",
"room_stats_state",
"room_stats_current",
"room_stats_historical",
"room_stats_earliest_token",
"rooms",
"stream_ordering_to_exterm",

View File

@@ -0,0 +1,142 @@
/* Copyright 2018 New Vector Ltd
* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
----- First clean up from previous versions of room stats.
-- First remove old stats stuff
DROP TABLE IF EXISTS room_stats;
DROP TABLE IF EXISTS user_stats;
DROP TABLE IF EXISTS room_stats_earliest_tokens;
DROP TABLE IF EXISTS _temp_populate_stats_position;
DROP TABLE IF EXISTS _temp_populate_stats_rooms;
DROP TABLE IF EXISTS stats_stream_pos;
-- Unschedule old background updates if they're still scheduled
DELETE FROM background_updates WHERE update_name IN (
'populate_stats_createtables',
'populate_stats_process_rooms',
'populate_stats_cleanup'
);
----- Create tables for our version of room stats.
-- single-row table to track position of incremental updates
CREATE TABLE IF NOT EXISTS stats_incremental_position (
-- the stream_id of the last-processed state delta
state_delta_stream_id BIGINT,
-- the stream_ordering of the last-processed backfilled event
-- (this is negative)
total_events_min_stream_ordering BIGINT,
-- the stream_ordering of the last-processed normally-created event
-- (this is positive)
total_events_max_stream_ordering BIGINT,
-- If true, this represents the contract agreed upon by the stats
-- regenerator.
-- If false, this is suitable for use by the delta/incremental processor.
is_background_contract BOOLEAN NOT NULL PRIMARY KEY
);
-- insert a null row and make sure it is the only one.
DELETE FROM stats_incremental_position;
INSERT INTO stats_incremental_position (
state_delta_stream_id,
total_events_min_stream_ordering,
total_events_max_stream_ordering,
is_background_contract
) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1));
-- represents PRESENT room statistics for a room
-- only holds absolute fields
CREATE TABLE IF NOT EXISTS room_stats_current (
room_id TEXT NOT NULL PRIMARY KEY,
current_state_events INT NOT NULL,
total_events INT NOT NULL,
joined_members INT NOT NULL,
invited_members INT NOT NULL,
left_members INT NOT NULL,
banned_members INT NOT NULL,
-- If initial stats regen is still to be performed: NULL
-- If initial stats regen has been performed: the maximum delta stream
-- position that this row takes into account.
completed_delta_stream_id BIGINT
);
-- represents HISTORICAL room statistics for a room
CREATE TABLE IF NOT EXISTS room_stats_historical (
room_id TEXT NOT NULL,
-- These stats cover the time from (end_ts - bucket_size)...end_ts (in ms).
-- Note that end_ts is quantised.
end_ts BIGINT NOT NULL,
bucket_size INT NOT NULL,
current_state_events INT NOT NULL,
total_events INT NOT NULL,
joined_members INT NOT NULL,
invited_members INT NOT NULL,
left_members INT NOT NULL,
banned_members INT NOT NULL,
PRIMARY KEY (room_id, end_ts)
);
-- We use this index to speed up deletion of ancient room stats.
CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical (end_ts);
-- We don't need an index on (room_id, end_ts) because PRIMARY KEY sorts that
-- out for us. (We would want it to review stats for a particular room.)
-- represents PRESENT statistics for a user
-- only holds absolute fields
CREATE TABLE IF NOT EXISTS user_stats_current (
user_id TEXT NOT NULL PRIMARY KEY,
public_rooms INT NOT NULL,
private_rooms INT NOT NULL,
-- If initial stats regen is still to be performed: NULL
-- If initial stats regen has been performed: the maximum delta stream
-- position that this row takes into account.
completed_delta_stream_id BIGINT
);
-- represents HISTORICAL statistics for a user
CREATE TABLE IF NOT EXISTS user_stats_historical (
user_id TEXT NOT NULL,
end_ts BIGINT NOT NULL,
bucket_size INT NOT NULL,
public_rooms INT NOT NULL,
private_rooms INT NOT NULL,
PRIMARY KEY (user_id, end_ts)
);
-- We use this index to speed up deletion of ancient user stats.
CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical (end_ts);
-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that
-- out for us. (We would want it to review stats for a particular user.)
-- Also rename room_state to room_stats_state to make its ownership clear.
ALTER TABLE room_state RENAME TO room_stats_state;

View File

@@ -0,0 +1,24 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- These partial indices helps us with finding incomplete stats row
CREATE INDEX IF NOT EXISTS room_stats_not_complete
ON room_stats_current (room_id)
WHERE completed_delta_stream_id IS NULL;
CREATE INDEX IF NOT EXISTS user_stats_not_complete
ON user_stats_current (user_id)
WHERE completed_delta_stream_id IS NULL;

View File

@@ -0,0 +1,27 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- even though SQLite >= 3.8 can support partial indices, we won't enable
-- them, in case the SQLite database may be later used on another system.
-- It's also the case that SQLite is only likely to be used in small
-- deployments or testing, where the optimisations gained by use of a
-- partial index are not a big concern.
CREATE INDEX IF NOT EXISTS room_stats_not_complete
ON room_stats_current (completed_delta_stream_id, room_id);
CREATE INDEX IF NOT EXISTS user_stats_not_complete
ON user_stats_current (completed_delta_stream_id, user_id);

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2018, 2019 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,17 +15,16 @@
# limitations under the License.
import logging
from itertools import chain
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.storage.prepare_database import get_statements
from synapse.storage.state_deltas import StateDeltasStore
from synapse.util.caches.descriptors import cached
logger = logging.getLogger(__name__)
# these fields track absolutes (e.g. total number of rooms on the server)
# You can think of these as Prometheus Gauges.
# You can draw these stats on a line graph.
# Example: number of users in a room
ABSOLUTE_STATS_FIELDS = {
"room": (
"current_state_events",
@@ -32,14 +32,17 @@ ABSOLUTE_STATS_FIELDS = {
"invited_members",
"left_members",
"banned_members",
"state_events",
"total_events",
),
"user": ("public_rooms", "private_rooms"),
}
TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
# these fields are per-timeslice and so should be reset to 0 upon a new slice
# You can draw these stats on a histogram.
# Example: number of events sent locally during a time slice
PER_SLICE_FIELDS = {"room": (), "user": ()}
TEMP_TABLE = "_temp_populate_stats"
TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
class StatsStore(StateDeltasStore):
@@ -51,292 +54,25 @@ class StatsStore(StateDeltasStore):
self.stats_enabled = hs.config.stats_enabled
self.stats_bucket_size = hs.config.stats_bucket_size
self.register_background_update_handler(
"populate_stats_createtables", self._populate_stats_createtables
)
self.register_background_update_handler(
"populate_stats_process_rooms", self._populate_stats_process_rooms
)
self.register_background_update_handler(
"populate_stats_cleanup", self._populate_stats_cleanup
)
self.register_noop_background_update("populate_stats_createtables")
self.register_noop_background_update("populate_stats_process_rooms")
self.register_noop_background_update("populate_stats_cleanup")
@defer.inlineCallbacks
def _populate_stats_createtables(self, progress, batch_size):
if not self.stats_enabled:
yield self._end_background_update("populate_stats_createtables")
return 1
# Get all the rooms that we want to process.
def _make_staging_area(txn):
# Create the temporary tables
stmts = get_statements(
"""
-- We just recreate the table, we'll be reinserting the
-- correct entries again later anyway.
DROP TABLE IF EXISTS {temp}_rooms;
CREATE TABLE IF NOT EXISTS {temp}_rooms(
room_id TEXT NOT NULL,
events BIGINT NOT NULL
);
CREATE INDEX {temp}_rooms_events
ON {temp}_rooms(events);
CREATE INDEX {temp}_rooms_id
ON {temp}_rooms(room_id);
""".format(
temp=TEMP_TABLE
).splitlines()
)
for statement in stmts:
txn.execute(statement)
sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_position(position TEXT NOT NULL)"
)
txn.execute(sql)
# Get rooms we want to process from the database, only adding
# those that we haven't (i.e. those not in room_stats_earliest_token)
sql = """
INSERT INTO %s_rooms (room_id, events)
SELECT c.room_id, count(*) FROM current_state_events AS c
LEFT JOIN room_stats_earliest_token AS t USING (room_id)
WHERE t.room_id IS NULL
GROUP BY c.room_id
""" % (
TEMP_TABLE,
)
txn.execute(sql)
new_pos = yield self.get_max_stream_id_in_current_state_deltas()
yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
self.get_earliest_token_for_room_stats.invalidate_all()
yield self._end_background_update("populate_stats_createtables")
return 1
@defer.inlineCallbacks
def _populate_stats_cleanup(self, progress, batch_size):
def quantise_stats_time(self, ts):
"""
Update the user directory stream position, then clean up the old tables.
Quantises a timestamp to be a multiple of the bucket size.
Args:
ts (int): the timestamp to quantise, in milliseconds since the Unix
Epoch
Returns:
int: a timestamp which
- is divisible by the bucket size;
- is no later than `ts`; and
- is the largest such timestamp.
"""
if not self.stats_enabled:
yield self._end_background_update("populate_stats_cleanup")
return 1
position = yield self._simple_select_one_onecol(
TEMP_TABLE + "_position", None, "position"
)
yield self.update_stats_stream_pos(position)
def _delete_staging_area(txn):
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
yield self._end_background_update("populate_stats_cleanup")
return 1
@defer.inlineCallbacks
def _populate_stats_process_rooms(self, progress, batch_size):
if not self.stats_enabled:
yield self._end_background_update("populate_stats_process_rooms")
return 1
# If we don't have progress filed, delete everything.
if not progress:
yield self.delete_all_stats()
def _get_next_batch(txn):
# Only fetch 250 rooms, so we don't fetch too many at once, even
# if those 250 rooms have less than batch_size state events.
sql = """
SELECT room_id, events FROM %s_rooms
ORDER BY events DESC
LIMIT 250
""" % (
TEMP_TABLE,
)
txn.execute(sql)
rooms_to_work_on = txn.fetchall()
if not rooms_to_work_on:
return None
# Get how many are left to process, so we can give status on how
# far we are in processing
txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
progress["remaining"] = txn.fetchone()[0]
return rooms_to_work_on
rooms_to_work_on = yield self.runInteraction(
"populate_stats_temp_read", _get_next_batch
)
# No more rooms -- complete the transaction.
if not rooms_to_work_on:
yield self._end_background_update("populate_stats_process_rooms")
return 1
logger.info(
"Processing the next %d rooms of %d remaining",
len(rooms_to_work_on),
progress["remaining"],
)
# Number of state events we've processed by going through each room
processed_event_count = 0
for room_id, event_count in rooms_to_work_on:
current_state_ids = yield self.get_current_state_ids(room_id)
join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
history_visibility_id = current_state_ids.get(
(EventTypes.RoomHistoryVisibility, "")
)
encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
name_id = current_state_ids.get((EventTypes.Name, ""))
topic_id = current_state_ids.get((EventTypes.Topic, ""))
avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
event_ids = [
join_rules_id,
history_visibility_id,
encryption_id,
name_id,
topic_id,
avatar_id,
canonical_alias_id,
]
state_events = yield self.get_events(
[ev for ev in event_ids if ev is not None]
)
def _get_or_none(event_id, arg):
event = state_events.get(event_id)
if event:
return event.content.get(arg)
return None
yield self.update_room_state(
room_id,
{
"join_rules": _get_or_none(join_rules_id, "join_rule"),
"history_visibility": _get_or_none(
history_visibility_id, "history_visibility"
),
"encryption": _get_or_none(encryption_id, "algorithm"),
"name": _get_or_none(name_id, "name"),
"topic": _get_or_none(topic_id, "topic"),
"avatar": _get_or_none(avatar_id, "url"),
"canonical_alias": _get_or_none(canonical_alias_id, "alias"),
},
)
now = self.hs.get_reactor().seconds()
# quantise time to the nearest bucket
now = (now // self.stats_bucket_size) * self.stats_bucket_size
def _fetch_data(txn):
# Get the current token of the room
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
current_state_events = len(current_state_ids)
membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
total_state_events = self._get_total_state_event_counts_txn(
txn, room_id
)
self._update_stats_txn(
txn,
"room",
room_id,
now,
{
"bucket_size": self.stats_bucket_size,
"current_state_events": current_state_events,
"joined_members": membership_counts.get(Membership.JOIN, 0),
"invited_members": membership_counts.get(Membership.INVITE, 0),
"left_members": membership_counts.get(Membership.LEAVE, 0),
"banned_members": membership_counts.get(Membership.BAN, 0),
"state_events": total_state_events,
},
)
self._simple_insert_txn(
txn,
"room_stats_earliest_token",
{"room_id": room_id, "token": current_token},
)
# We've finished a room. Delete it from the table.
self._simple_delete_one_txn(
txn, TEMP_TABLE + "_rooms", {"room_id": room_id}
)
yield self.runInteraction("update_room_stats", _fetch_data)
# Update the remaining counter.
progress["remaining"] -= 1
yield self.runInteraction(
"populate_stats",
self._background_update_progress_txn,
"populate_stats_process_rooms",
progress,
)
processed_event_count += event_count
if processed_event_count > batch_size:
# Don't process any more rooms, we've hit our batch size.
return processed_event_count
return processed_event_count
def delete_all_stats(self):
"""
Delete all statistics records.
"""
def _delete_all_stats_txn(txn):
txn.execute("DELETE FROM room_state")
txn.execute("DELETE FROM room_stats")
txn.execute("DELETE FROM room_stats_earliest_token")
txn.execute("DELETE FROM user_stats")
return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
def get_stats_stream_pos(self):
return self._simple_select_one_onecol(
table="stats_stream_pos",
keyvalues={},
retcol="stream_id",
desc="stats_stream_pos",
)
def update_stats_stream_pos(self, stream_id):
return self._simple_update_one(
table="stats_stream_pos",
keyvalues={},
updatevalues={"stream_id": stream_id},
desc="update_stats_stream_pos",
)
return (ts // self.stats_bucket_size) * self.stats_bucket_size
def update_room_state(self, room_id, fields):
"""
@@ -361,124 +97,271 @@ class StatsStore(StateDeltasStore):
fields[col] = None
return self._simple_upsert(
table="room_state",
table="room_stats_state",
keyvalues={"room_id": room_id},
values=fields,
desc="update_room_state",
)
def get_deltas_for_room(self, room_id, start, size=100):
def update_stats_delta(
self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
):
"""
Get statistics deltas for a given room.
Updates the statistics for a subject, with a delta (difference/relative
change).
Args:
room_id (str)
start (int): Pagination start. Number of entries, not timestamp.
size (int): How many entries to return.
Returns:
Deferred[list[dict]], where the dict has the keys of
ABSOLUTE_STATS_FIELDS["room"] and "ts".
ts (int): timestamp of the change
stats_type (str): "room" or "user" the kind of subject
stats_id (str): the subject's ID (room ID or user ID)
fields (dict[str, int]): Deltas of stats values.
complete_with_stream_id (int, optional):
If supplied, converts an incomplete row into a complete row,
with the supplied stream_id marked as the stream_id where the
row was completed.
"""
return self._simple_select_list_paginate(
"room_stats",
{"room_id": room_id},
"ts",
start,
size,
retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
order_direction="DESC",
return self.runInteraction(
"update_stats_delta",
self._update_stats_delta_txn,
ts,
stats_type,
stats_id,
fields,
complete_with_stream_id=complete_with_stream_id,
)
def get_all_room_state(self):
return self._simple_select_list(
"room_state", None, retcols=("name", "topic", "canonical_alias")
)
@cached()
def get_earliest_token_for_room_stats(self, room_id):
def _update_stats_delta_txn(
self,
txn,
ts,
stats_type,
stats_id,
fields,
complete_with_stream_id=None,
absolute_field_overrides=None,
):
"""
Fetch the "earliest token". This is used by the room stats delta
processor to ignore deltas that have been processed between the
start of the background task and any particular room's stats
being calculated.
Returns:
Deferred[int]
See L{update_stats_delta}
Additional Args:
absolute_field_overrides (dict[str, int]): Current stats values
(i.e. not deltas) of absolute fields.
Does not work with per-slice fields.
"""
return self._simple_select_one_onecol(
"room_stats_earliest_token",
{"room_id": room_id},
retcol="token",
allow_none=True,
)
table, id_col = TYPE_TO_TABLE[stats_type]
def update_stats(self, stats_type, stats_id, ts, fields):
table, id_col = TYPE_TO_ROOM[stats_type]
return self._simple_upsert(
table=table,
keyvalues={id_col: stats_id, "ts": ts},
values=fields,
desc="update_stats",
)
quantised_ts = self.quantise_stats_time(int(ts))
end_ts = quantised_ts + self.stats_bucket_size
def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
table, id_col = TYPE_TO_ROOM[stats_type]
return self._simple_upsert_txn(
txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
)
def update_stats_delta(self, ts, stats_type, stats_id, field, value):
def _update_stats_delta(txn):
table, id_col = TYPE_TO_ROOM[stats_type]
sql = (
"SELECT * FROM %s"
" WHERE %s=? and ts=("
" SELECT MAX(ts) FROM %s"
" WHERE %s=?"
")"
) % (table, id_col, table, id_col)
txn.execute(sql, (stats_id, stats_id))
rows = self.cursor_to_dict(txn)
if len(rows) == 0:
# silently skip as we don't have anything to apply a delta to yet.
# this tries to minimise any race between the initial sync and
# subsequent deltas arriving.
return
current_ts = ts
latest_ts = rows[0]["ts"]
if current_ts < latest_ts:
# This one is in the past, but we're just encountering it now.
# Mark it as part of the current bucket.
current_ts = latest_ts
elif ts != latest_ts:
# we have to copy our absolute counters over to the new entry.
values = {
key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
}
values[id_col] = stats_id
values["ts"] = ts
values["bucket_size"] = self.stats_bucket_size
self._simple_insert_txn(txn, table=table, values=values)
# actually update the new value
if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
self._simple_update_txn(
txn,
table=table,
keyvalues={id_col: stats_id, "ts": current_ts},
updatevalues={field: value},
abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
slice_field_names = PER_SLICE_FIELDS[stats_type]
for field in chain(fields.keys(), absolute_field_overrides.keys()):
if field not in abs_field_names and field not in slice_field_names:
# guard against potential SQL injection dodginess
raise ValueError(
"%s is not a recognised field"
" for stats type %s" % (field, stats_type)
)
# only absolute stats fields are tracked in the `_current` stats tables,
# so those are the only ones that we process deltas for when
# we upsert against the `_current` table.
# This calculates the deltas (`field = field + ?` values)
# for absolute fields,
# * defaulting to 0 if not specified
# (required for the INSERT part of upserting to work)
# * omitting overrides specified in `absolute_field_overrides`
deltas_of_absolute_fields = {
key: fields.get(key, 0)
for key in abs_field_names
if key not in absolute_field_overrides
}
if absolute_field_overrides is None:
absolute_field_overrides = {}
if complete_with_stream_id is not None:
absolute_field_overrides = absolute_field_overrides.copy()
absolute_field_overrides[
"completed_delta_stream_id"
] = complete_with_stream_id
# first upsert the `_current` table
self._upsert_with_additive_relatives_txn(
txn=txn,
table=table + "_current",
keyvalues={id_col: stats_id},
absolutes=absolute_field_overrides,
additive_relatives=deltas_of_absolute_fields,
)
if self.has_completed_background_updates():
# TODO want to check specifically for stats regenerator, not all
# background updates…
# then upsert the `_historical` table.
# we don't support absolute_fields for per-slice fields as it makes
# no sense.
per_slice_additive_relatives = {
key: fields.get(key, 0) for key in slice_field_names
}
self._upsert_copy_from_table_with_additive_relatives_txn(
txn=txn,
into_table=table + "_historical",
keyvalues={id_col: stats_id},
extra_dst_keyvalues={
"end_ts": end_ts,
"bucket_size": self.stats_bucket_size,
},
additive_relatives=per_slice_additive_relatives,
src_table=table + "_current",
copy_columns=abs_field_names,
additional_where=" AND completed_delta_stream_id IS NOT NULL",
)
def _upsert_with_additive_relatives_txn(
self, txn, table, keyvalues, absolutes, additive_relatives
):
"""Used to update values in the stats tables.
Args:
txn: Transaction
table (str): Table name
keyvalues (dict[str, any]): Row-identifying key values
absolutes (dict[str, any]): Absolute (set) fields
additive_relatives (dict[str, int]): Fields that will be added onto
if existing row present.
"""
if self.database_engine.can_native_upsert:
absolute_updates = [
"%(field)s = EXCLUDED.%(field)s" % {"field": field}
for field in absolutes.keys()
]
relative_updates = [
"%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
% {"table": table, "field": field}
for field in additive_relatives.keys()
]
insert_cols = []
qargs = [table]
for (key, val) in chain(
keyvalues.items(), absolutes.items(), additive_relatives.items()
):
insert_cols.append(key)
qargs.append(val)
sql = """
INSERT INTO %(table)s (%(insert_cols_cs)s)
VALUES (%(insert_vals_qs)s)
ON CONFLICT DO UPDATE SET %(updates)s
""" % {
"table": table,
"insert_cols_cs": ", ".join(insert_cols),
"insert_vals_qs": ", ".join(
["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
),
"updates": ", ".join(chain(absolute_updates, relative_updates)),
}
txn.execute(sql, qargs)
else:
self.database_engine.lock_table(txn, table)
retcols = chain(absolutes.keys(), additive_relatives.keys())
current_row = self._simple_select_one_txn(
txn, table, keyvalues, retcols, allow_none=True
)
if current_row is None:
merged_dict = {**keyvalues, **absolutes, **additive_relatives}
self._simple_insert_txn(txn, table, merged_dict)
else:
sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
table,
field,
field,
id_col,
)
txn.execute(sql, (value, stats_id, current_ts))
for (key, val) in additive_relatives.items():
current_row[key] += val
current_row.update(absolutes)
self._simple_update_one_txn(txn, table, keyvalues, current_row)
return self.runInteraction("update_stats_delta", _update_stats_delta)
def _upsert_copy_from_table_with_additive_relatives_txn(
self,
txn,
into_table,
keyvalues,
extra_dst_keyvalues,
additive_relatives,
src_table,
copy_columns,
additional_where="",
):
"""
Args:
txn: Transaction
into_table (str): The destination table to UPSERT the row into
keyvalues (dict[str, any]): Row-identifying key values
extra_dst_keyvalues (dict[str, any]): Additional keyvalues
for `into_table`.
additive_relatives (dict[str, any]): Fields that will be added onto
if existing row present. (Must be disjoint from copy_columns.)
src_table (str): The source table to copy from
copy_columns (iterable[str]): The list of columns to copy
additional_where (str): Additional SQL for where (prefix with AND
if using).
"""
if self.database_engine.can_native_upsert:
ins_columns = chain(
keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues
)
sel_exprs = chain(
keyvalues,
copy_columns,
("?" for _ in chain(additive_relatives, extra_dst_keyvalues)),
)
keyvalues_where = ("%s = ?" % f for f in keyvalues)
sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
sets_ar = (
"%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns
)
sql = """
INSERT INTO %(into_table)s (%(ins_columns)s)
SELECT %(sel_exprs)s
FROM %(src_table)s
WHERE %(keyvalues_where)s %(additional_where)s
ON CONFLICT (%(keyvalues)s)
DO UPDATE SET %(sets)s
""" % {
"into_table": into_table,
"ins_columns": ", ".join(ins_columns),
"sel_exprs": ", ".join(sel_exprs),
"keyvalues_where": " AND ".join(keyvalues_where),
"src_table": src_table,
"keyvalues": ", ".join(
chain(keyvalues.keys(), extra_dst_keyvalues.keys())
),
"sets": ", ".join(chain(sets_cc, sets_ar)),
"additional_where": additional_where,
}
qargs = chain(additive_relatives.values(), keyvalues.values())
txn.execute(sql, qargs)
else:
self.database_engine.lock_table(txn, into_table)
src_row = self._simple_select_one_txn(
txn, src_table, keyvalues, copy_columns
)
dest_current_row = self._simple_select_one_txn(
txn,
into_table,
keyvalues,
chain(additive_relatives.keys(), copy_columns),
allow_none=True,
)
if dest_current_row is None:
merged_dict = {**keyvalues, **src_row, **additive_relatives}
self._simple_insert_txn(txn, into_table, merged_dict)
else:
for (key, val) in additive_relatives.items():
src_row[key] = dest_current_row[key] + val
self._simple_update_txn(txn, into_table, keyvalues, src_row)

View File

@@ -1,304 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import Mock
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from tests import unittest
class StatsRoomTests(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets_for_client_rest_resource,
room.register_servlets,
login.register_servlets,
]
def prepare(self, reactor, clock, hs):
self.store = hs.get_datastore()
self.handler = self.hs.get_stats_handler()
def _add_background_updates(self):
"""
Add the background updates we need to run.
"""
# Ugh, have to reset this flag
self.store._all_done = False
self.get_success(
self.store._simple_insert(
"background_updates",
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_process_rooms",
"progress_json": "{}",
"depends_on": "populate_stats_createtables",
},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_cleanup",
"progress_json": "{}",
"depends_on": "populate_stats_process_rooms",
},
)
)
def test_initial_room(self):
"""
The background updates will build the table from scratch.
"""
r = self.get_success(self.store.get_all_room_state())
self.assertEqual(len(r), 0)
# Disable stats
self.hs.config.stats_enabled = False
self.handler.stats_enabled = False
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")
room_1 = self.helper.create_room_as(u1, tok=u1_token)
self.helper.send_state(
room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
)
# Stats disabled, shouldn't have done anything
r = self.get_success(self.store.get_all_room_state())
self.assertEqual(len(r), 0)
# Enable stats
self.hs.config.stats_enabled = True
self.handler.stats_enabled = True
# Do the initial population of the user directory via the background update
self._add_background_updates()
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
r = self.get_success(self.store.get_all_room_state())
self.assertEqual(len(r), 1)
self.assertEqual(r[0]["topic"], "foo")
def test_initial_earliest_token(self):
"""
Ingestion via notify_new_event will ignore tokens that the background
update have already processed.
"""
self.reactor.advance(86401)
self.hs.config.stats_enabled = False
self.handler.stats_enabled = False
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")
u2 = self.register_user("u2", "pass")
u2_token = self.login("u2", "pass")
u3 = self.register_user("u3", "pass")
u3_token = self.login("u3", "pass")
room_1 = self.helper.create_room_as(u1, tok=u1_token)
self.helper.send_state(
room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
)
# Begin the ingestion by creating the temp tables. This will also store
# the position that the deltas should begin at, once they take over.
self.hs.config.stats_enabled = True
self.handler.stats_enabled = True
self.store._all_done = False
self.get_success(self.store.update_stats_stream_pos(None))
self.get_success(
self.store._simple_insert(
"background_updates",
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
)
)
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
# Now, before the table is actually ingested, add some more events.
self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token)
self.helper.join(room=room_1, user=u2, tok=u2_token)
# Now do the initial ingestion.
self.get_success(
self.store._simple_insert(
"background_updates",
{"update_name": "populate_stats_process_rooms", "progress_json": "{}"},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_cleanup",
"progress_json": "{}",
"depends_on": "populate_stats_process_rooms",
},
)
)
self.store._all_done = False
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
self.reactor.advance(86401)
# Now add some more events, triggering ingestion. Because of the stream
# position being set to before the events sent in the middle, a simpler
# implementation would reprocess those events, and say there were four
# users, not three.
self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token)
self.helper.join(room=room_1, user=u3, tok=u3_token)
# Get the deltas! There should be two -- day 1, and day 2.
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
# The oldest has 2 joined members
self.assertEqual(r[-1]["joined_members"], 2)
# The newest has 3
self.assertEqual(r[0]["joined_members"], 3)
def test_incorrect_state_transition(self):
"""
If the state transition is not one of (JOIN, INVITE, LEAVE, BAN) to
(JOIN, INVITE, LEAVE, BAN), an error is raised.
"""
events = {
"a1": {"membership": Membership.LEAVE},
"a2": {"membership": "not a real thing"},
}
def get_event(event_id, allow_none=True):
m = Mock()
m.content = events[event_id]
d = defer.Deferred()
self.reactor.callLater(0.0, d.callback, m)
return d
def get_received_ts(event_id):
return defer.succeed(1)
self.store.get_received_ts = get_received_ts
self.store.get_event = get_event
deltas = [
{
"type": EventTypes.Member,
"state_key": "some_user",
"room_id": "room",
"event_id": "a1",
"prev_event_id": "a2",
"stream_id": 60,
}
]
f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
self.assertEqual(
f.value.args[0], "'not a real thing' is not a valid prev_membership"
)
# And the other way...
deltas = [
{
"type": EventTypes.Member,
"state_key": "some_user",
"room_id": "room",
"event_id": "a2",
"prev_event_id": "a1",
"stream_id": 100,
}
]
f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
self.assertEqual(
f.value.args[0], "'not a real thing' is not a valid membership"
)
def test_redacted_prev_event(self):
"""
If the prev_event does not exist, then it is assumed to be a LEAVE.
"""
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")
room_1 = self.helper.create_room_as(u1, tok=u1_token)
# Do the initial population of the user directory via the background update
self._add_background_updates()
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
events = {"a1": None, "a2": {"membership": Membership.JOIN}}
def get_event(event_id, allow_none=True):
if events.get(event_id):
m = Mock()
m.content = events[event_id]
else:
m = None
d = defer.Deferred()
self.reactor.callLater(0.0, d.callback, m)
return d
def get_received_ts(event_id):
return defer.succeed(1)
self.store.get_received_ts = get_received_ts
self.store.get_event = get_event
deltas = [
{
"type": EventTypes.Member,
"state_key": "some_user:test",
"room_id": room_1,
"event_id": "a2",
"prev_event_id": "a1",
"stream_id": 100,
}
]
# Handle our fake deltas, which has a user going from LEAVE -> JOIN.
self.get_success(self.handler._handle_deltas(deltas))
# One delta, with two joined members -- the room creator, and our fake
# user.
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
self.assertEqual(len(r), 1)
self.assertEqual(r[0]["joined_members"], 2)