mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-09 01:30:18 +00:00
Compare commits
64 Commits
madlittlem
...
rei/rss_in
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fca3a9c121 | ||
|
|
50c321d8f3 | ||
|
|
d39c09c284 | ||
|
|
4ecc62b0b6 | ||
|
|
b379a11dc2 | ||
|
|
0f2e59f4f1 | ||
|
|
065042c325 | ||
|
|
440c60ef8f | ||
|
|
893729a3a6 | ||
|
|
44b036779e | ||
|
|
4c13f2b282 | ||
|
|
9dbf42af8a | ||
|
|
60481031f2 | ||
|
|
f7ececb0ac | ||
|
|
39dbee2a3e | ||
|
|
d7a692f860 | ||
|
|
a13ad21abf | ||
|
|
bc2c284dbe | ||
|
|
3cdce28d3b | ||
|
|
81aa6d53b0 | ||
|
|
dfb22fec48 | ||
|
|
cc66cf1238 | ||
|
|
a344ad3d3f | ||
|
|
b9f1adc370 | ||
|
|
1af7866562 | ||
|
|
064143c130 | ||
|
|
324f21b216 | ||
|
|
10c1a233f9 | ||
|
|
44d3c2e80b | ||
|
|
07c267c516 | ||
|
|
62b1250629 | ||
|
|
11c4e506bd | ||
|
|
491eaf0808 | ||
|
|
dd8e6020d8 | ||
|
|
99c88ac84e | ||
|
|
3b09a37682 | ||
|
|
bc754cdeed | ||
|
|
c775f310e9 | ||
|
|
09cbc3a8e9 | ||
|
|
736ac58e11 | ||
|
|
a6c102009e | ||
|
|
544ba2c2e9 | ||
|
|
81c5289c83 | ||
|
|
4b7bf2e413 | ||
|
|
5043ef801a | ||
|
|
baeaf00a12 | ||
|
|
1ecd1a6a5f | ||
|
|
c3d2bf2807 | ||
|
|
79252d1c83 | ||
|
|
e8fc180d4d | ||
|
|
7b657f1148 | ||
|
|
18a4c03c50 | ||
|
|
eafa8d3c54 | ||
|
|
977310ee27 | ||
|
|
981c6cf544 | ||
|
|
6a19f7e101 | ||
|
|
4a97eef0dc | ||
|
|
b5573c0ffb | ||
|
|
1819563640 | ||
|
|
e4cbea6c46 | ||
|
|
80a1c6e9e5 | ||
|
|
d7675e79e1 | ||
|
|
8de9ebe35d | ||
|
|
8374bcb0a8 |
1
changelog.d/5879.misc
Normal file
1
changelog.d/5879.misc
Normal file
@@ -0,0 +1 @@
|
||||
Rework room and user statistics to separate current & historical rows, as well as track stats correctly.
|
||||
@@ -27,19 +27,16 @@ class StatsConfig(Config):
|
||||
|
||||
def read_config(self, config, **kwargs):
|
||||
self.stats_enabled = True
|
||||
self.stats_bucket_size = 86400
|
||||
self.stats_bucket_size = 86400 * 1000
|
||||
self.stats_retention = sys.maxsize
|
||||
stats_config = config.get("stats", None)
|
||||
if stats_config:
|
||||
self.stats_enabled = stats_config.get("enabled", self.stats_enabled)
|
||||
self.stats_bucket_size = (
|
||||
self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000
|
||||
self.stats_bucket_size = self.parse_duration(
|
||||
stats_config.get("bucket_size", "1d")
|
||||
)
|
||||
self.stats_retention = (
|
||||
self.parse_duration(
|
||||
stats_config.get("retention", "%ds" % (sys.maxsize,))
|
||||
)
|
||||
/ 1000
|
||||
self.stats_retention = self.parse_duration(
|
||||
stats_config.get("retention", "%ds" % (sys.maxsize,))
|
||||
)
|
||||
|
||||
def generate_config_section(self, config_dir_path, server_name, **kwargs):
|
||||
|
||||
@@ -49,9 +49,6 @@ class StatsHandler(StateDeltasHandler):
|
||||
# The current position in the current_state_delta stream
|
||||
self.pos = None
|
||||
|
||||
# Guard to ensure we only process deltas one at a time
|
||||
self._is_processing = False
|
||||
|
||||
if hs.config.stats_enabled:
|
||||
self.notifier.add_replication_callback(self.notify_new_event)
|
||||
|
||||
@@ -65,43 +62,60 @@ class StatsHandler(StateDeltasHandler):
|
||||
if not self.hs.config.stats_enabled:
|
||||
return
|
||||
|
||||
if self._is_processing:
|
||||
return
|
||||
lock = self.store.stats_delta_processing_lock
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def process():
|
||||
yield lock.acquire()
|
||||
try:
|
||||
yield self._unsafe_process()
|
||||
finally:
|
||||
self._is_processing = False
|
||||
yield lock.release()
|
||||
|
||||
self._is_processing = True
|
||||
run_as_background_process("stats.notify_new_event", process)
|
||||
if not lock.locked:
|
||||
# we only want to run this process one-at-a-time,
|
||||
# and also, if the initial background updater wants us to keep out,
|
||||
# we should respect that.
|
||||
run_as_background_process("stats.notify_new_event", process)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _unsafe_process(self):
|
||||
# If self.pos is None then means we haven't fetched it from DB
|
||||
if self.pos is None:
|
||||
self.pos = yield self.store.get_stats_stream_pos()
|
||||
# If None is one of the values, then means that the stats regenerator has not (or had not) yet unwedged us
|
||||
# but note that this might be outdated, so we retrieve the positions again.
|
||||
if self.pos is None or None in self.pos.values():
|
||||
self.pos = yield self.store.get_stats_positions()
|
||||
|
||||
# If still None then the initial background update hasn't happened yet
|
||||
if self.pos is None:
|
||||
# If still contains a None position, then the stats regenerator hasn't started yet
|
||||
if None in self.pos.values():
|
||||
return None
|
||||
|
||||
# Loop round handling deltas until we're up to date
|
||||
|
||||
while True:
|
||||
with Measure(self.clock, "stats_delta"):
|
||||
deltas = yield self.store.get_current_state_deltas(self.pos)
|
||||
if not deltas:
|
||||
return
|
||||
deltas = yield self.store.get_current_state_deltas(
|
||||
self.pos["state_delta_stream_id"]
|
||||
)
|
||||
|
||||
logger.info("Handling %d state deltas", len(deltas))
|
||||
logger.debug("Handling %d state deltas", len(deltas))
|
||||
yield self._handle_deltas(deltas)
|
||||
|
||||
self.pos = deltas[-1]["stream_id"]
|
||||
yield self.store.update_stats_stream_pos(self.pos)
|
||||
self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
|
||||
yield self.store.update_stats_positions(self.pos)
|
||||
|
||||
event_processing_positions.labels("stats").set(self.pos)
|
||||
event_processing_positions.labels("stats").set(
|
||||
self.pos["state_delta_stream_id"]
|
||||
)
|
||||
|
||||
# Then count deltas for total_events and total_event_bytes.
|
||||
with Measure(self.clock, "stats_total_events_and_bytes"):
|
||||
self.pos, had_counts = yield self.store.incremental_update_room_total_events_and_bytes(
|
||||
self.pos
|
||||
)
|
||||
|
||||
if not deltas and not had_counts:
|
||||
break
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _handle_deltas(self, deltas):
|
||||
@@ -119,7 +133,7 @@ class StatsHandler(StateDeltasHandler):
|
||||
|
||||
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
|
||||
|
||||
token = yield self.store.get_earliest_token_for_room_stats(room_id)
|
||||
token = yield self.store.get_earliest_token_for_stats("room", room_id)
|
||||
|
||||
# If the earliest token to begin from is larger than our current
|
||||
# stream ID, skip processing this delta.
|
||||
@@ -131,7 +145,10 @@ class StatsHandler(StateDeltasHandler):
|
||||
continue
|
||||
|
||||
if event_id is None and prev_event_id is None:
|
||||
# Errr...
|
||||
logger.error(
|
||||
"event ID is None and so is the previous event ID. stream_id: %s",
|
||||
stream_id,
|
||||
)
|
||||
continue
|
||||
|
||||
event_content = {}
|
||||
@@ -143,92 +160,87 @@ class StatsHandler(StateDeltasHandler):
|
||||
|
||||
# We use stream_pos here rather than fetch by event_id as event_id
|
||||
# may be None
|
||||
now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
|
||||
stream_timestamp = yield self.store.get_received_ts_by_stream_pos(
|
||||
stream_pos
|
||||
)
|
||||
stream_timestamp = int(stream_timestamp)
|
||||
|
||||
# quantise time to the nearest bucket
|
||||
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
|
||||
# All the values in this dict are deltas (RELATIVE changes)
|
||||
room_stats_delta = {}
|
||||
is_newly_created = False
|
||||
|
||||
if prev_event_id is None:
|
||||
# this state event doesn't overwrite another,
|
||||
# so it is a new effective/current state event
|
||||
room_stats_delta["current_state_events"] = 1
|
||||
|
||||
if typ == EventTypes.Member:
|
||||
# we could use _get_key_change here but it's a bit inefficient
|
||||
# given we're not testing for a specific result; might as well
|
||||
# just grab the prev_membership and membership strings and
|
||||
# compare them.
|
||||
prev_event_content = {}
|
||||
# We take None rather than leave as a previous membership
|
||||
# in the absence of a previous event because we do not want to
|
||||
# reduce the leave count when a new-to-the-room user joins.
|
||||
prev_membership = None
|
||||
if prev_event_id is not None:
|
||||
prev_event = yield self.store.get_event(
|
||||
prev_event_id, allow_none=True
|
||||
)
|
||||
if prev_event:
|
||||
prev_event_content = prev_event.content
|
||||
prev_membership = prev_event_content.get(
|
||||
"membership", Membership.LEAVE
|
||||
)
|
||||
|
||||
membership = event_content.get("membership", Membership.LEAVE)
|
||||
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
|
||||
|
||||
if prev_membership == membership:
|
||||
continue
|
||||
|
||||
if prev_membership == Membership.JOIN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "joined_members", -1
|
||||
)
|
||||
if prev_membership is None:
|
||||
logger.debug("No previous membership for this user.")
|
||||
elif membership == prev_membership:
|
||||
pass # noop
|
||||
elif prev_membership == Membership.JOIN:
|
||||
room_stats_delta["joined_members"] = -1
|
||||
elif prev_membership == Membership.INVITE:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "invited_members", -1
|
||||
)
|
||||
room_stats_delta["invited_members"] = -1
|
||||
elif prev_membership == Membership.LEAVE:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "left_members", -1
|
||||
)
|
||||
room_stats_delta["left_members"] = -1
|
||||
elif prev_membership == Membership.BAN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "banned_members", -1
|
||||
)
|
||||
room_stats_delta["banned_members"] = -1
|
||||
else:
|
||||
err = "%s is not a valid prev_membership" % (repr(prev_membership),)
|
||||
logger.error(err)
|
||||
raise ValueError(err)
|
||||
raise ValueError(
|
||||
"%r is not a valid prev_membership" % (prev_membership,)
|
||||
)
|
||||
|
||||
if membership == prev_membership:
|
||||
pass # noop
|
||||
if membership == Membership.JOIN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "joined_members", +1
|
||||
)
|
||||
room_stats_delta["joined_members"] = +1
|
||||
elif membership == Membership.INVITE:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "invited_members", +1
|
||||
)
|
||||
room_stats_delta["invited_members"] = +1
|
||||
elif membership == Membership.LEAVE:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "left_members", +1
|
||||
)
|
||||
room_stats_delta["left_members"] = +1
|
||||
elif membership == Membership.BAN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "banned_members", +1
|
||||
)
|
||||
room_stats_delta["banned_members"] = +1
|
||||
else:
|
||||
err = "%s is not a valid membership" % (repr(membership),)
|
||||
logger.error(err)
|
||||
raise ValueError(err)
|
||||
raise ValueError("%r is not a valid membership" % (membership,))
|
||||
|
||||
user_id = state_key
|
||||
if self.is_mine_id(user_id):
|
||||
# update user_stats as it's one of our users
|
||||
public = yield self._is_public_room(room_id)
|
||||
# this accounts for transitions like leave → ban and so on.
|
||||
has_changed_joinedness = (prev_membership == Membership.JOIN) != (
|
||||
membership == Membership.JOIN
|
||||
)
|
||||
|
||||
if has_changed_joinedness:
|
||||
# update user_stats as it's one of our users
|
||||
public = yield self._is_public_room(room_id)
|
||||
|
||||
field = "public_rooms" if public else "private_rooms"
|
||||
delta = +1 if membership == Membership.JOIN else -1
|
||||
|
||||
if membership == Membership.LEAVE:
|
||||
yield self.store.update_stats_delta(
|
||||
now,
|
||||
"user",
|
||||
user_id,
|
||||
"public_rooms" if public else "private_rooms",
|
||||
-1,
|
||||
)
|
||||
elif membership == Membership.JOIN:
|
||||
yield self.store.update_stats_delta(
|
||||
now,
|
||||
"user",
|
||||
user_id,
|
||||
"public_rooms" if public else "private_rooms",
|
||||
+1,
|
||||
stream_timestamp, "user", user_id, {field: delta}
|
||||
)
|
||||
|
||||
elif typ == EventTypes.Create:
|
||||
@@ -246,28 +258,50 @@ class StatsHandler(StateDeltasHandler):
|
||||
},
|
||||
)
|
||||
|
||||
is_newly_created = True
|
||||
|
||||
elif typ == EventTypes.JoinRules:
|
||||
old_room_state = yield self.store.get_room_state(room_id)
|
||||
yield self.store.update_room_state(
|
||||
room_id, {"join_rules": event_content.get("join_rule")}
|
||||
)
|
||||
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
|
||||
# whether the room would be public anyway,
|
||||
# because of history_visibility
|
||||
other_field_gives_publicity = (
|
||||
old_room_state["history_visibility"] == "world_readable"
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(now, room_id, is_public)
|
||||
|
||||
if not other_field_gives_publicity:
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(
|
||||
stream_timestamp, room_id, is_public
|
||||
)
|
||||
|
||||
elif typ == EventTypes.RoomHistoryVisibility:
|
||||
old_room_state = yield self.store.get_room_state(room_id)
|
||||
yield self.store.update_room_state(
|
||||
room_id,
|
||||
{"history_visibility": event_content.get("history_visibility")},
|
||||
)
|
||||
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "history_visibility", "world_readable"
|
||||
# whether the room would be public anyway,
|
||||
# because of join_rule
|
||||
other_field_gives_publicity = (
|
||||
old_room_state["join_rules"] == JoinRules.PUBLIC
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(now, room_id, is_public)
|
||||
|
||||
if not other_field_gives_publicity:
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "history_visibility", "world_readable"
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(
|
||||
stream_timestamp, room_id, is_public
|
||||
)
|
||||
|
||||
elif typ == EventTypes.Encryption:
|
||||
yield self.store.update_room_state(
|
||||
@@ -290,6 +324,20 @@ class StatsHandler(StateDeltasHandler):
|
||||
room_id, {"canonical_alias": event_content.get("alias")}
|
||||
)
|
||||
|
||||
if is_newly_created:
|
||||
yield self.store.update_stats_delta(
|
||||
stream_timestamp,
|
||||
"room",
|
||||
room_id,
|
||||
room_stats_delta,
|
||||
complete_with_stream_id=stream_id,
|
||||
)
|
||||
|
||||
elif len(room_stats_delta) > 0:
|
||||
yield self.store.update_stats_delta(
|
||||
stream_timestamp, "room", room_id, room_stats_delta
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_public_room_stats(self, ts, room_id, is_public):
|
||||
"""
|
||||
@@ -308,10 +356,13 @@ class StatsHandler(StateDeltasHandler):
|
||||
for user_id in user_ids:
|
||||
if self.hs.is_mine(UserID.from_string(user_id)):
|
||||
yield self.store.update_stats_delta(
|
||||
ts, "user", user_id, "public_rooms", +1 if is_public else -1
|
||||
)
|
||||
yield self.store.update_stats_delta(
|
||||
ts, "user", user_id, "private_rooms", -1 if is_public else +1
|
||||
ts,
|
||||
"user",
|
||||
user_id,
|
||||
{
|
||||
"public_rooms": +1 if is_public else -1,
|
||||
"private_rooms": -1 if is_public else +1,
|
||||
},
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
||||
@@ -2270,8 +2270,9 @@ class EventsStore(
|
||||
"room_aliases",
|
||||
"room_depth",
|
||||
"room_memberships",
|
||||
"room_state",
|
||||
"room_stats",
|
||||
"room_stats_state",
|
||||
"room_stats_current",
|
||||
"room_stats_historical",
|
||||
"room_stats_earliest_token",
|
||||
"rooms",
|
||||
"stream_ordering_to_exterm",
|
||||
|
||||
144
synapse/storage/schema/delta/56/stats_separated1.sql
Normal file
144
synapse/storage/schema/delta/56/stats_separated1.sql
Normal file
@@ -0,0 +1,144 @@
|
||||
/* Copyright 2018 New Vector Ltd
|
||||
* Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
----- First clean up from previous versions of room stats.
|
||||
|
||||
-- First remove old stats stuff
|
||||
DROP TABLE IF EXISTS room_stats;
|
||||
DROP TABLE IF EXISTS user_stats;
|
||||
DROP TABLE IF EXISTS room_stats_earliest_tokens;
|
||||
DROP TABLE IF EXISTS _temp_populate_stats_position;
|
||||
DROP TABLE IF EXISTS _temp_populate_stats_rooms;
|
||||
DROP TABLE IF EXISTS stats_stream_pos;
|
||||
|
||||
-- Unschedule old background updates if they're still scheduled
|
||||
DELETE FROM background_updates WHERE update_name IN (
|
||||
'populate_stats_createtables',
|
||||
'populate_stats_process_rooms',
|
||||
'populate_stats_cleanup'
|
||||
);
|
||||
|
||||
----- Create tables for our version of room stats.
|
||||
|
||||
-- single-row table to track position of incremental updates
|
||||
CREATE TABLE IF NOT EXISTS stats_incremental_position (
|
||||
-- the stream_id of the last-processed state delta
|
||||
state_delta_stream_id BIGINT,
|
||||
|
||||
-- the stream_ordering of the last-processed backfilled event
|
||||
-- (this is negative)
|
||||
total_events_min_stream_ordering BIGINT,
|
||||
|
||||
-- the stream_ordering of the last-processed normally-created event
|
||||
-- (this is positive)
|
||||
total_events_max_stream_ordering BIGINT,
|
||||
|
||||
-- If true, this represents the contract agreed upon by the stats
|
||||
-- regenerator.
|
||||
-- If false, this is suitable for use by the delta/incremental processor.
|
||||
is_background_contract BOOLEAN NOT NULL PRIMARY KEY
|
||||
);
|
||||
|
||||
-- insert a null row and make sure it is the only one.
|
||||
DELETE FROM stats_incremental_position;
|
||||
INSERT INTO stats_incremental_position (
|
||||
state_delta_stream_id,
|
||||
total_events_min_stream_ordering,
|
||||
total_events_max_stream_ordering,
|
||||
is_background_contract
|
||||
) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1));
|
||||
|
||||
-- represents PRESENT room statistics for a room
|
||||
-- only holds absolute fields
|
||||
CREATE TABLE IF NOT EXISTS room_stats_current (
|
||||
room_id TEXT NOT NULL PRIMARY KEY,
|
||||
|
||||
current_state_events INT NOT NULL,
|
||||
total_events INT NOT NULL,
|
||||
total_event_bytes BIGINT NOT NULL,
|
||||
joined_members INT NOT NULL,
|
||||
invited_members INT NOT NULL,
|
||||
left_members INT NOT NULL,
|
||||
banned_members INT NOT NULL,
|
||||
|
||||
-- If initial stats regen is still to be performed: NULL
|
||||
-- If initial stats regen has been performed: the maximum delta stream
|
||||
-- position that this row takes into account.
|
||||
completed_delta_stream_id BIGINT
|
||||
);
|
||||
|
||||
|
||||
-- represents HISTORICAL room statistics for a room
|
||||
CREATE TABLE IF NOT EXISTS room_stats_historical (
|
||||
room_id TEXT NOT NULL,
|
||||
-- These stats cover the time from (end_ts - bucket_size)...end_ts (in ms).
|
||||
-- Note that end_ts is quantised.
|
||||
end_ts BIGINT NOT NULL,
|
||||
bucket_size INT NOT NULL,
|
||||
|
||||
current_state_events INT NOT NULL,
|
||||
total_events INT NOT NULL,
|
||||
total_event_bytes BIGINT NOT NULL,
|
||||
joined_members INT NOT NULL,
|
||||
invited_members INT NOT NULL,
|
||||
left_members INT NOT NULL,
|
||||
banned_members INT NOT NULL,
|
||||
|
||||
PRIMARY KEY (room_id, end_ts)
|
||||
);
|
||||
|
||||
-- We use this index to speed up deletion of ancient room stats.
|
||||
CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical (end_ts);
|
||||
|
||||
-- We don't need an index on (room_id, end_ts) because PRIMARY KEY sorts that
|
||||
-- out for us. (We would want it to review stats for a particular room.)
|
||||
|
||||
|
||||
-- represents PRESENT statistics for a user
|
||||
-- only holds absolute fields
|
||||
CREATE TABLE IF NOT EXISTS user_stats_current (
|
||||
user_id TEXT NOT NULL PRIMARY KEY,
|
||||
|
||||
public_rooms INT NOT NULL,
|
||||
private_rooms INT NOT NULL,
|
||||
|
||||
-- If initial stats regen is still to be performed: NULL
|
||||
-- If initial stats regen has been performed: the maximum delta stream
|
||||
-- position that this row takes into account.
|
||||
completed_delta_stream_id BIGINT
|
||||
);
|
||||
|
||||
-- represents HISTORICAL statistics for a user
|
||||
CREATE TABLE IF NOT EXISTS user_stats_historical (
|
||||
user_id TEXT NOT NULL,
|
||||
end_ts BIGINT NOT NULL,
|
||||
bucket_size INT NOT NULL,
|
||||
|
||||
public_rooms INT NOT NULL,
|
||||
private_rooms INT NOT NULL,
|
||||
|
||||
PRIMARY KEY (user_id, end_ts)
|
||||
);
|
||||
|
||||
-- We use this index to speed up deletion of ancient user stats.
|
||||
CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical (end_ts);
|
||||
|
||||
-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that
|
||||
-- out for us. (We would want it to review stats for a particular user.)
|
||||
|
||||
-- Also rename room_state to room_stats_state to make its ownership clear.
|
||||
ALTER TABLE room_state RENAME TO room_stats_state;
|
||||
@@ -0,0 +1,24 @@
|
||||
/* Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- These partial indices helps us with finding incomplete stats row
|
||||
CREATE INDEX IF NOT EXISTS room_stats_not_complete
|
||||
ON room_stats_current (room_id)
|
||||
WHERE completed_delta_stream_id IS NULL;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS user_stats_not_complete
|
||||
ON user_stats_current (user_id)
|
||||
WHERE completed_delta_stream_id IS NULL;
|
||||
|
||||
27
synapse/storage/schema/delta/56/stats_separated2.sql.sqlite
Normal file
27
synapse/storage/schema/delta/56/stats_separated2.sql.sqlite
Normal file
@@ -0,0 +1,27 @@
|
||||
/* Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- even though SQLite >= 3.8 can support partial indices, we won't enable
|
||||
-- them, in case the SQLite database may be later used on another system.
|
||||
-- It's also the case that SQLite is only likely to be used in small
|
||||
-- deployments or testing, where the optimisations gained by use of a
|
||||
-- partial index are not a big concern.
|
||||
|
||||
CREATE INDEX IF NOT EXISTS room_stats_not_complete
|
||||
ON room_stats_current (completed_delta_stream_id, room_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS user_stats_not_complete
|
||||
ON user_stats_current (completed_delta_stream_id, user_id);
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,304 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mock import Mock
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client.v1 import login, room
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class StatsRoomTests(unittest.HomeserverTestCase):
|
||||
|
||||
servlets = [
|
||||
admin.register_servlets_for_client_rest_resource,
|
||||
room.register_servlets,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor, clock, hs):
|
||||
|
||||
self.store = hs.get_datastore()
|
||||
self.handler = self.hs.get_stats_handler()
|
||||
|
||||
def _add_background_updates(self):
|
||||
"""
|
||||
Add the background updates we need to run.
|
||||
"""
|
||||
# Ugh, have to reset this flag
|
||||
self.store._all_done = False
|
||||
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": "populate_stats_process_rooms",
|
||||
"progress_json": "{}",
|
||||
"depends_on": "populate_stats_createtables",
|
||||
},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": "populate_stats_cleanup",
|
||||
"progress_json": "{}",
|
||||
"depends_on": "populate_stats_process_rooms",
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
def test_initial_room(self):
|
||||
"""
|
||||
The background updates will build the table from scratch.
|
||||
"""
|
||||
r = self.get_success(self.store.get_all_room_state())
|
||||
self.assertEqual(len(r), 0)
|
||||
|
||||
# Disable stats
|
||||
self.hs.config.stats_enabled = False
|
||||
self.handler.stats_enabled = False
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1_token = self.login("u1", "pass")
|
||||
|
||||
room_1 = self.helper.create_room_as(u1, tok=u1_token)
|
||||
self.helper.send_state(
|
||||
room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
|
||||
)
|
||||
|
||||
# Stats disabled, shouldn't have done anything
|
||||
r = self.get_success(self.store.get_all_room_state())
|
||||
self.assertEqual(len(r), 0)
|
||||
|
||||
# Enable stats
|
||||
self.hs.config.stats_enabled = True
|
||||
self.handler.stats_enabled = True
|
||||
|
||||
# Do the initial population of the user directory via the background update
|
||||
self._add_background_updates()
|
||||
|
||||
while not self.get_success(self.store.has_completed_background_updates()):
|
||||
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||
|
||||
r = self.get_success(self.store.get_all_room_state())
|
||||
|
||||
self.assertEqual(len(r), 1)
|
||||
self.assertEqual(r[0]["topic"], "foo")
|
||||
|
||||
def test_initial_earliest_token(self):
|
||||
"""
|
||||
Ingestion via notify_new_event will ignore tokens that the background
|
||||
update have already processed.
|
||||
"""
|
||||
self.reactor.advance(86401)
|
||||
|
||||
self.hs.config.stats_enabled = False
|
||||
self.handler.stats_enabled = False
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1_token = self.login("u1", "pass")
|
||||
|
||||
u2 = self.register_user("u2", "pass")
|
||||
u2_token = self.login("u2", "pass")
|
||||
|
||||
u3 = self.register_user("u3", "pass")
|
||||
u3_token = self.login("u3", "pass")
|
||||
|
||||
room_1 = self.helper.create_room_as(u1, tok=u1_token)
|
||||
self.helper.send_state(
|
||||
room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
|
||||
)
|
||||
|
||||
# Begin the ingestion by creating the temp tables. This will also store
|
||||
# the position that the deltas should begin at, once they take over.
|
||||
self.hs.config.stats_enabled = True
|
||||
self.handler.stats_enabled = True
|
||||
self.store._all_done = False
|
||||
self.get_success(self.store.update_stats_stream_pos(None))
|
||||
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
|
||||
)
|
||||
)
|
||||
|
||||
while not self.get_success(self.store.has_completed_background_updates()):
|
||||
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||
|
||||
# Now, before the table is actually ingested, add some more events.
|
||||
self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token)
|
||||
self.helper.join(room=room_1, user=u2, tok=u2_token)
|
||||
|
||||
# Now do the initial ingestion.
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{"update_name": "populate_stats_process_rooms", "progress_json": "{}"},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": "populate_stats_cleanup",
|
||||
"progress_json": "{}",
|
||||
"depends_on": "populate_stats_process_rooms",
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
self.store._all_done = False
|
||||
while not self.get_success(self.store.has_completed_background_updates()):
|
||||
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||
|
||||
self.reactor.advance(86401)
|
||||
|
||||
# Now add some more events, triggering ingestion. Because of the stream
|
||||
# position being set to before the events sent in the middle, a simpler
|
||||
# implementation would reprocess those events, and say there were four
|
||||
# users, not three.
|
||||
self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token)
|
||||
self.helper.join(room=room_1, user=u3, tok=u3_token)
|
||||
|
||||
# Get the deltas! There should be two -- day 1, and day 2.
|
||||
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
|
||||
|
||||
# The oldest has 2 joined members
|
||||
self.assertEqual(r[-1]["joined_members"], 2)
|
||||
|
||||
# The newest has 3
|
||||
self.assertEqual(r[0]["joined_members"], 3)
|
||||
|
||||
def test_incorrect_state_transition(self):
|
||||
"""
|
||||
If the state transition is not one of (JOIN, INVITE, LEAVE, BAN) to
|
||||
(JOIN, INVITE, LEAVE, BAN), an error is raised.
|
||||
"""
|
||||
events = {
|
||||
"a1": {"membership": Membership.LEAVE},
|
||||
"a2": {"membership": "not a real thing"},
|
||||
}
|
||||
|
||||
def get_event(event_id, allow_none=True):
|
||||
m = Mock()
|
||||
m.content = events[event_id]
|
||||
d = defer.Deferred()
|
||||
self.reactor.callLater(0.0, d.callback, m)
|
||||
return d
|
||||
|
||||
def get_received_ts(event_id):
|
||||
return defer.succeed(1)
|
||||
|
||||
self.store.get_received_ts = get_received_ts
|
||||
self.store.get_event = get_event
|
||||
|
||||
deltas = [
|
||||
{
|
||||
"type": EventTypes.Member,
|
||||
"state_key": "some_user",
|
||||
"room_id": "room",
|
||||
"event_id": "a1",
|
||||
"prev_event_id": "a2",
|
||||
"stream_id": 60,
|
||||
}
|
||||
]
|
||||
|
||||
f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
|
||||
self.assertEqual(
|
||||
f.value.args[0], "'not a real thing' is not a valid prev_membership"
|
||||
)
|
||||
|
||||
# And the other way...
|
||||
deltas = [
|
||||
{
|
||||
"type": EventTypes.Member,
|
||||
"state_key": "some_user",
|
||||
"room_id": "room",
|
||||
"event_id": "a2",
|
||||
"prev_event_id": "a1",
|
||||
"stream_id": 100,
|
||||
}
|
||||
]
|
||||
|
||||
f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
|
||||
self.assertEqual(
|
||||
f.value.args[0], "'not a real thing' is not a valid membership"
|
||||
)
|
||||
|
||||
def test_redacted_prev_event(self):
|
||||
"""
|
||||
If the prev_event does not exist, then it is assumed to be a LEAVE.
|
||||
"""
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1_token = self.login("u1", "pass")
|
||||
|
||||
room_1 = self.helper.create_room_as(u1, tok=u1_token)
|
||||
|
||||
# Do the initial population of the user directory via the background update
|
||||
self._add_background_updates()
|
||||
|
||||
while not self.get_success(self.store.has_completed_background_updates()):
|
||||
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||
|
||||
events = {"a1": None, "a2": {"membership": Membership.JOIN}}
|
||||
|
||||
def get_event(event_id, allow_none=True):
|
||||
if events.get(event_id):
|
||||
m = Mock()
|
||||
m.content = events[event_id]
|
||||
else:
|
||||
m = None
|
||||
d = defer.Deferred()
|
||||
self.reactor.callLater(0.0, d.callback, m)
|
||||
return d
|
||||
|
||||
def get_received_ts(event_id):
|
||||
return defer.succeed(1)
|
||||
|
||||
self.store.get_received_ts = get_received_ts
|
||||
self.store.get_event = get_event
|
||||
|
||||
deltas = [
|
||||
{
|
||||
"type": EventTypes.Member,
|
||||
"state_key": "some_user:test",
|
||||
"room_id": room_1,
|
||||
"event_id": "a2",
|
||||
"prev_event_id": "a1",
|
||||
"stream_id": 100,
|
||||
}
|
||||
]
|
||||
|
||||
# Handle our fake deltas, which has a user going from LEAVE -> JOIN.
|
||||
self.get_success(self.handler._handle_deltas(deltas))
|
||||
|
||||
# One delta, with two joined members -- the room creator, and our fake
|
||||
# user.
|
||||
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
|
||||
self.assertEqual(len(r), 1)
|
||||
self.assertEqual(r[0]["joined_members"], 2)
|
||||
Reference in New Issue
Block a user