Compare commits

...

91 Commits

Author SHA1 Message Date
Erik Johnston
745f2da4b8 Merge pull request #5946 from matrix-org/rei/rss_inc8
Separated Statistics [8/7ish EOF]
2019-09-02 13:04:54 +01:00
Erik Johnston
02f759e0a2 Renamve get_room_state 2019-09-02 11:33:03 +01:00
Erik Johnston
84532a4009 Merge branch 'rei/rss_target' of github.com:matrix-org/synapse into rei/rss_inc8 2019-09-02 10:27:39 +01:00
reivilibre
e8932145ab Merge pull request #5941 from matrix-org/rei/rss_inc7
Separated Statistics [7/7ish]
2019-08-30 17:37:24 +01:00
Olivier Wilkinson (reivilibre)
ffc30b8f60 Merge branch 'develop' into rei/rss_target 2019-08-30 17:32:09 +01:00
Olivier Wilkinson (reivilibre)
fca3a9c121 Fix to use milliseconds
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-30 17:01:24 +01:00
Olivier Wilkinson (reivilibre)
21593febc3 Linting
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-30 16:32:20 +01:00
Olivier Wilkinson (reivilibre)
fc5d1182b6 Add stats docs
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-30 16:27:00 +01:00
Olivier Wilkinson (reivilibre)
d49457b941 Add stats tests
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-30 16:26:54 +01:00
Olivier Wilkinson (reivilibre)
6f5e543901 Various fixes
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-30 16:26:40 +01:00
Olivier Wilkinson (reivilibre)
7b977bdf55 Fixes to counting and stats deltas
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-30 15:58:07 +01:00
Olivier Wilkinson (reivilibre)
ab11c0a293 Whoopsies; these things come in order…
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-30 15:39:57 +01:00
Olivier Wilkinson (reivilibre)
b928909795 Fix incremental processor when there are no deltas.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-30 15:36:49 +01:00
Olivier Wilkinson (reivilibre)
98a8928fb5 Merge branch 'rei/rss_inc7' into rei/rss_inc8 2019-08-30 15:25:38 +01:00
Olivier Wilkinson (reivilibre)
50c321d8f3 Adapt to use renamed room_state
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-30 15:25:33 +01:00
Olivier Wilkinson (reivilibre)
eba432e541 Merge branch 'rei/rss_inc7' into rei/rss_inc8 2019-08-30 15:23:56 +01:00
Olivier Wilkinson (reivilibre)
d39c09c284 Ambiguous room_id
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-30 15:23:55 +01:00
Olivier Wilkinson (reivilibre)
97b20354fa Merge branch 'rei/rss_inc7' into rei/rss_inc8 2019-08-30 15:19:51 +01:00
Olivier Wilkinson (reivilibre)
4ecc62b0b6 Whoops, took out a line there...
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-30 15:19:50 +01:00
Olivier Wilkinson (reivilibre)
425d44521d Merge branch 'rei/rss_inc7' into rei/rss_inc8 2019-08-30 15:13:44 +01:00
Olivier Wilkinson (reivilibre)
b379a11dc2 users table's ID field is actually called name.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-30 15:13:41 +01:00
Olivier Wilkinson (reivilibre)
bf6d45f456 Merge branch 'rei/rss_inc7' into rei/rss_inc8 2019-08-30 15:11:14 +01:00
Olivier Wilkinson (reivilibre)
0f2e59f4f1 Fix that became apparent after unit testing
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-30 15:11:03 +01:00
Olivier Wilkinson (reivilibre)
7dc387e54c Merge branch 'rei/rss_inc7' into rei/rss_inc8 2019-08-30 15:08:44 +01:00
Olivier Wilkinson (reivilibre)
065042c325 Code formatting and typo pointed out by Erik.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-30 15:08:26 +01:00
Olivier Wilkinson (reivilibre)
440c60ef8f Some fixes that have become necessary due to changes in other PRs
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-30 15:05:57 +01:00
Olivier Wilkinson (reivilibre)
1d6cf15f6e Merge branch 'rei/rss_inc7' into rei/rss_inc8 2019-08-30 14:58:50 +01:00
reivilibre
8c0260299f Merge pull request #5924 from matrix-org/rei/rss_inc6
Separated Statistics [6/7ish]
2019-08-30 14:39:22 +01:00
Olivier Wilkinson (reivilibre)
893729a3a6 Code formatting
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-30 14:36:18 +01:00
Olivier Wilkinson (reivilibre)
44b036779e Add stats regenerator 2019-08-30 14:32:48 +01:00
Olivier Wilkinson (reivilibre)
757205d718 Convert chain to list as chain is only once iterable.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-30 14:26:02 +01:00
Olivier Wilkinson (reivilibre)
6c582d7ccb Merge branch 'rei/rss_target' into rei/rss_inc6 2019-08-30 11:21:40 +01:00
Olivier Wilkinson (reivilibre)
4c13f2b282 Merge branch 'develop' into rei/rss_target 2019-08-30 11:21:27 +01:00
reivilibre
9dbf42af8a Merge pull request #5923 from matrix-org/rei/rss_inc5
Separated Statistics [5/7ish]
2019-08-30 07:52:07 +01:00
Olivier Wilkinson (reivilibre)
60481031f2 Merge branch 'rei/rss_target' into rei/rss_inc5 2019-08-29 15:02:09 +01:00
Olivier Wilkinson (reivilibre)
7c0224d5c0 Merge branch 'rei/rss_target' into rei/rss_inc6 2019-08-29 15:01:52 +01:00
Olivier Wilkinson (reivilibre)
f7ececb0ac Merge branch 'develop' into rei/rss_target 2019-08-29 15:01:27 +01:00
Olivier Wilkinson (reivilibre)
39dbee2a3e Count total_events and total_event_bytes within the loop.
In this case, we still update these counts if we get stuck in the loop
because the server is busy.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-29 14:10:21 +01:00
Olivier Wilkinson (reivilibre)
4444b9a1b3 Code formatting (Black)
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-29 08:08:41 +01:00
Olivier Wilkinson (reivilibre)
3b69bf3e74 Upsert fixes 2019-08-28 20:18:45 +01:00
Olivier Wilkinson (reivilibre)
73d552a05d Hoist up None check to prevent trying to iterate over NoneType.keys() 2019-08-28 16:14:00 +01:00
Olivier Wilkinson (reivilibre)
b06f2947e4 Track new users in user statistics.
This makes the rows 'completed' so that the stats regenerator need not
touch them.
2019-08-28 15:46:33 +01:00
Olivier Wilkinson (reivilibre)
d7a692f860 Update total_events and total_event_bytes on new events.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-28 14:57:03 +01:00
Olivier Wilkinson (reivilibre)
a13ad21abf Add incremental counting for rooms' total events and total event bytes.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-28 14:49:58 +01:00
Olivier Wilkinson (reivilibre)
bc2c284dbe Add total_event_bytes to room statistics schema.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-28 14:28:44 +01:00
reivilibre
3cdce28d3b Merge pull request #5890 from matrix-org/rei/rss_inc3
Separated Statistics [3/7ish]
2019-08-28 13:31:14 +01:00
Olivier Wilkinson (reivilibre)
81aa6d53b0 Address code review comments
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-28 10:54:22 +01:00
Olivier Wilkinson (reivilibre)
dfb22fec48 Merge branch 'rei/rss_target' into rei/rss_inc3 2019-08-28 09:53:33 +01:00
reivilibre
cc66cf1238 Merge pull request #5889 from matrix-org/rei/rss_inc2
Separated Statistics [2/7ish]
2019-08-28 09:49:38 +01:00
Olivier Wilkinson (reivilibre)
a344ad3d3f Code formatting (Black)
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-28 09:33:03 +01:00
reivilibre
b9f1adc370 Update synapse/storage/stats.py
Co-Authored-By: Erik Johnston <erik@matrix.org>
2019-08-28 09:01:25 +01:00
Olivier Wilkinson (reivilibre)
1af7866562 Clean up code with improved naming and hoist around functions.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 16:16:53 +01:00
Olivier Wilkinson (reivilibre)
064143c130 Use DeferredLock instead of threading.Lock
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 15:00:16 +01:00
Olivier Wilkinson (reivilibre)
324f21b216 Fix logic error.
`absolute_fields` being None shouldn't preclude completion of a current
stats row.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 14:53:45 +01:00
Olivier Wilkinson (reivilibre)
10c1a233f9 Fix logic error.
`absolute_fields` being None shouldn't preclude completion of a current
stats row.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 14:49:12 +01:00
Olivier Wilkinson (reivilibre)
44d3c2e80b Invalidate get_earliest_token_for_stats cache as required.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 14:34:20 +01:00
Olivier Wilkinson (reivilibre)
07c267c516 For user stats, handle other membership transitions properly.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 14:34:05 +01:00
Olivier Wilkinson (reivilibre)
62b1250629 Update _purge_room_txn to take account of separated stats tables
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 14:24:56 +01:00
Olivier Wilkinson (reivilibre)
11c4e506bd Rename room_state table to room_stats_state
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 14:24:25 +01:00
Olivier Wilkinson (reivilibre)
491eaf0808 Remove obsolete OldCollectionRequired as old collection is obsolete.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 14:19:38 +01:00
Olivier Wilkinson (reivilibre)
dd8e6020d8 For user stats, handle other membership transitions properly.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 14:17:04 +01:00
Olivier Wilkinson (reivilibre)
99c88ac84e No-op if no membership change and thus simplify verbose dict updates.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 14:14:17 +01:00
Olivier Wilkinson (reivilibre)
3b09a37682 Adapt to stats now working in milliseconds
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 14:10:39 +01:00
Olivier Wilkinson (reivilibre)
bc754cdeed Merge branch 'rei/rss_inc2' into rei/rss_inc3 2019-08-27 13:54:38 +01:00
Olivier Wilkinson (reivilibre)
c775f310e9 Don't include the room & user stats docs in this PR.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 13:51:25 +01:00
Olivier Wilkinson (reivilibre)
09cbc3a8e9 Switch to milliseconds in room/user stats for consistency.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 13:50:58 +01:00
Olivier Wilkinson (reivilibre)
736ac58e11 Code formatting (Black)
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 13:42:33 +01:00
Olivier Wilkinson (reivilibre)
a6c102009e Lock tables in upsert fall-backs.
Should not be too much of a performance concern as this code won't be
hit on Postgres, which large deployments should be using.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 13:41:48 +01:00
Olivier Wilkinson (reivilibre)
544ba2c2e9 Apply minor suggestions from review
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 13:40:00 +01:00
Olivier Wilkinson (reivilibre)
81c5289c83 Clarify _update_stats_delta_txn by adding code comments and kwargs.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 13:35:37 +01:00
reivilibre
4b7bf2e413 Apply suggestions from code review
Co-Authored-By: Erik Johnston <erik@matrix.org>
2019-08-27 13:26:08 +01:00
Olivier Wilkinson (reivilibre)
5043ef801a Merge branch 'rei/rss_target' into rei/rss_inc2 2019-08-27 11:56:34 +01:00
Olivier Wilkinson (reivilibre)
baeaf00a12 Merge branch 'develop' into rei/rss_target 2019-08-27 11:55:27 +01:00
Olivier Wilkinson (reivilibre)
1ecd1a6a5f Use engine-specific delta SQL files rather than delta written in Python.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 09:50:49 +01:00
Olivier Wilkinson (reivilibre)
c3d2bf2807 Allow schema deltas to be engine-specific
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 08:52:20 +01:00
Olivier Wilkinson (reivilibre)
79252d1c83 Fix up historical stats support.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-22 16:10:32 +01:00
Olivier Wilkinson (reivilibre)
e8fc180d4d Fix up SQL schema delta
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-22 16:10:05 +01:00
Olivier Wilkinson (reivilibre)
7b657f1148 Simplify table structure
This obviates the need for old collection, but comes at the minor cost
of not being able to track historical stats or per-slice fields until
after the statistics regenerator is finished.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-22 15:40:58 +01:00
Olivier Wilkinson (reivilibre)
18a4c03c50 Remove needless defaults.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 16:04:04 +01:00
Olivier Wilkinson (reivilibre)
eafa8d3c54 Unify name of 'stats regenerator' in schema comments.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 16:03:37 +01:00
Olivier Wilkinson (reivilibre)
977310ee27 Clarify _update_stats_delta_txn
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 15:49:00 +01:00
Olivier Wilkinson (reivilibre)
981c6cf544 Sanitise accepted fields in _update_stats_delta_txn
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 15:41:10 +01:00
Olivier Wilkinson (reivilibre)
6a19f7e101 Add room and user statistics documentation.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 15:35:20 +01:00
reivilibre
4a97eef0dc Update synapse/storage/stats.py
Co-Authored-By: Erik Johnston <erik@matrix.org>
2019-08-20 15:12:21 +01:00
reivilibre
b5573c0ffb Update synapse/storage/stats.py
Co-Authored-By: Erik Johnston <erik@matrix.org>
2019-08-20 15:02:49 +01:00
Olivier Wilkinson (reivilibre)
1819563640 Ack, isort!
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 14:27:47 +01:00
Olivier Wilkinson (reivilibre)
e4cbea6c46 Handle state deltas and turn them into stats deltas
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 14:24:35 +01:00
Olivier Wilkinson (reivilibre)
80a1c6e9e5 Add storage function for storing stats deltas
Old collection is not included in this commit

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 14:08:02 +01:00
Olivier Wilkinson (reivilibre)
d7675e79e1 Add schema for Separated Statistics
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 13:45:05 +01:00
reivilibre
8de9ebe35d Tear out current room & user statistics (#5880)
* Tear out current room & user statistics.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>

* Black is back with more linting complaints

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 13:22:07 +01:00
Olivier Wilkinson (reivilibre)
8374bcb0a8 Newsfile
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-19 15:25:53 +01:00
12 changed files with 2018 additions and 324 deletions

1
changelog.d/5879.misc Normal file
View File

@@ -0,0 +1 @@
Rework room and user statistics to separate current & historical rows, as well as track stats correctly.

View File

@@ -0,0 +1,136 @@
Room and User Statistics
========================
Synapse maintains room and user statistics (as well as a cache of room state),
in various tables.
These can be used for administrative purposes but are also used when generating
the public room directory. If these tables get stale or out of sync (possibly
after database corruption), you may wish to regenerate them.
# Synapse Administrator Documentation
## Various SQL scripts that you may find useful
### Delete stats, including historical stats
```sql
DELETE FROM room_stats_current;
DELETE FROM room_stats_historical;
DELETE FROM user_stats_current;
DELETE FROM user_stats_historical;
```
### Regenerate stats (all subjects)
```sql
BEGIN;
DELETE FROM stats_incremental_position;
INSERT INTO stats_incremental_position (
state_delta_stream_id,
total_events_min_stream_ordering,
total_events_max_stream_ordering,
is_background_contract
) VALUES (NULL, NULL, NULL, FALSE), (NULL, NULL, NULL, TRUE);
COMMIT;
DELETE FROM room_stats_current;
DELETE FROM user_stats_current;
```
then follow the steps below for **'Regenerate stats (missing subjects only)'**
### Regenerate stats (missing subjects only)
```sql
-- Set up staging tables
-- we depend on current_state_events_membership because this is used
-- in our counting.
INSERT INTO background_updates (update_name, progress_json) VALUES
('populate_stats_prepare', '{}', 'current_state_events_membership');
-- Run through each room and update stats
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_stats_process_rooms', '{}', 'populate_stats_prepare');
-- Run through each user and update stats.
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_stats_process_users', '{}', 'populate_stats_process_rooms');
-- Clean up staging tables
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_stats_cleanup', '{}', 'populate_stats_process_users');
```
then **restart Synapse**.
# Synapse Developer Documentation
## High-Level Concepts
### Definitions
* **subject**: Something we are tracking stats about currently a room or user.
* **current row**: An entry for a subject in the appropriate current statistics
table. Each subject can have only one.
* **historical row**: An entry for a subject in the appropriate historical
statistics table. Each subject can have any number of these.
### Overview
Stats are maintained as time series. There are two kinds of column:
* absolute columns where the value is correct for the time given by `end_ts`
in the stats row. (Imagine a line graph for these values)
* They can also be thought of as 'gauges' in Prometheus, if you are familiar.
* per-slice columns where the value corresponds to how many of the occurrences
occurred within the time slice given by `(end_ts bucket_size)…end_ts`
or `start_ts…end_ts`. (Imagine a histogram for these values)
Currently, only absolute columns are in use.
Stats are maintained in two tables (for each type): current and historical.
Current stats correspond to the present values. Each subject can only have one
entry.
Historical stats correspond to values in the past. Subjects may have multiple
entries.
## Concepts around the management of stats
### current rows
Current rows contain the most up-to-date statistics for a room.
They only contain absolute columns
#### incomplete current rows
There are also **incomplete** current rows, which are current rows that do not
contain a full count yet this is because they are waiting for the regeneration
process to give them an initial count. Incomplete current rows DO NOT contain
correct and up-to-date values. As such, *incomplete rows are not old-collected*.
Instead, old incomplete rows will be extended so they are no longer old.
### historical rows
Historical rows can always be considered to be valid for the time slice and
end time specified. (This, of course, assumes a lack of defects in the code
to track the statistics, and assumes integrity of the database).
Even still, there are two considerations that we may need to bear in mind:
* historical rows will not exist for every time slice they will be omitted
if there were no changes. In this case, the following assumptions can be
made to interpolate/recreate missing rows:
- absolute fields have the same values as in the preceding row
- per-slice fields are zero (`0`)
* historical rows will not be retained forever rows older than a configurable
time will be purged.
#### purge
The purging of historical rows is not yet implemented.

View File

@@ -27,19 +27,16 @@ class StatsConfig(Config):
def read_config(self, config, **kwargs):
self.stats_enabled = True
self.stats_bucket_size = 86400
self.stats_bucket_size = 86400 * 1000
self.stats_retention = sys.maxsize
stats_config = config.get("stats", None)
if stats_config:
self.stats_enabled = stats_config.get("enabled", self.stats_enabled)
self.stats_bucket_size = (
self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000
self.stats_bucket_size = self.parse_duration(
stats_config.get("bucket_size", "1d")
)
self.stats_retention = (
self.parse_duration(
stats_config.get("retention", "%ds" % (sys.maxsize,))
)
/ 1000
self.stats_retention = self.parse_duration(
stats_config.get("retention", "%ds" % (sys.maxsize,))
)
def generate_config_section(self, config_dir_path, server_name, **kwargs):

View File

@@ -49,9 +49,6 @@ class StatsHandler(StateDeltasHandler):
# The current position in the current_state_delta stream
self.pos = None
# Guard to ensure we only process deltas one at a time
self._is_processing = False
if hs.config.stats_enabled:
self.notifier.add_replication_callback(self.notify_new_event)
@@ -65,43 +62,61 @@ class StatsHandler(StateDeltasHandler):
if not self.hs.config.stats_enabled:
return
if self._is_processing:
return
lock = self.store.stats_delta_processing_lock
@defer.inlineCallbacks
def process():
yield lock.acquire()
try:
yield self._unsafe_process()
finally:
self._is_processing = False
yield lock.release()
self._is_processing = True
run_as_background_process("stats.notify_new_event", process)
if not lock.locked:
# we only want to run this process one-at-a-time,
# and also, if the initial background updater wants us to keep out,
# we should respect that.
run_as_background_process("stats.notify_new_event", process)
@defer.inlineCallbacks
def _unsafe_process(self):
# If self.pos is None then means we haven't fetched it from DB
if self.pos is None:
self.pos = yield self.store.get_stats_stream_pos()
# If None is one of the values, then means that the stats regenerator has not (or had not) yet unwedged us
# but note that this might be outdated, so we retrieve the positions again.
if self.pos is None or None in self.pos.values():
self.pos = yield self.store.get_stats_positions()
# If still None then the initial background update hasn't happened yet
if self.pos is None:
# If still contains a None position, then the stats regenerator hasn't started yet
if None in self.pos.values():
return None
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "stats_delta"):
deltas = yield self.store.get_current_state_deltas(self.pos)
if not deltas:
return
deltas = yield self.store.get_current_state_deltas(
self.pos["state_delta_stream_id"]
)
logger.info("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
if deltas:
logger.debug("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
self.pos = deltas[-1]["stream_id"]
yield self.store.update_stats_stream_pos(self.pos)
self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
yield self.store.update_stats_positions(self.pos)
event_processing_positions.labels("stats").set(self.pos)
event_processing_positions.labels("stats").set(
self.pos["state_delta_stream_id"]
)
# Then count deltas for total_events and total_event_bytes.
with Measure(self.clock, "stats_total_events_and_bytes"):
self.pos, had_counts = yield self.store.incremental_update_room_total_events_and_bytes(
self.pos
)
if not deltas and not had_counts:
break
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
@@ -115,11 +130,10 @@ class StatsHandler(StateDeltasHandler):
event_id = delta["event_id"]
stream_id = delta["stream_id"]
prev_event_id = delta["prev_event_id"]
stream_pos = delta["stream_id"]
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
token = yield self.store.get_earliest_token_for_room_stats(room_id)
token = yield self.store.get_earliest_token_for_stats("room", room_id)
# If the earliest token to begin from is larger than our current
# stream ID, skip processing this delta.
@@ -131,7 +145,10 @@ class StatsHandler(StateDeltasHandler):
continue
if event_id is None and prev_event_id is None:
# Errr...
logger.error(
"event ID is None and so is the previous event ID. stream_id: %s",
stream_id,
)
continue
event_content = {}
@@ -141,94 +158,86 @@ class StatsHandler(StateDeltasHandler):
if event:
event_content = event.content or {}
# We use stream_pos here rather than fetch by event_id as event_id
# may be None
now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
# We can't afford for this time to stray into the past, so we count
# it as now.
stream_timestamp = int(self.clock.time_msec())
# quantise time to the nearest bucket
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
# All the values in this dict are deltas (RELATIVE changes)
room_stats_delta = {}
is_newly_created = False
if prev_event_id is None:
# this state event doesn't overwrite another,
# so it is a new effective/current state event
room_stats_delta["current_state_events"] = 1
if typ == EventTypes.Member:
# we could use _get_key_change here but it's a bit inefficient
# given we're not testing for a specific result; might as well
# just grab the prev_membership and membership strings and
# compare them.
prev_event_content = {}
# We take None rather than leave as a previous membership
# in the absence of a previous event because we do not want to
# reduce the leave count when a new-to-the-room user joins.
prev_membership = None
if prev_event_id is not None:
prev_event = yield self.store.get_event(
prev_event_id, allow_none=True
)
if prev_event:
prev_event_content = prev_event.content
prev_membership = prev_event_content.get(
"membership", Membership.LEAVE
)
membership = event_content.get("membership", Membership.LEAVE)
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
if prev_membership == membership:
continue
if prev_membership == Membership.JOIN:
yield self.store.update_stats_delta(
now, "room", room_id, "joined_members", -1
)
if prev_membership is None:
logger.debug("No previous membership for this user.")
elif membership == prev_membership:
pass # noop
elif prev_membership == Membership.JOIN:
room_stats_delta["joined_members"] = -1
elif prev_membership == Membership.INVITE:
yield self.store.update_stats_delta(
now, "room", room_id, "invited_members", -1
)
room_stats_delta["invited_members"] = -1
elif prev_membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now, "room", room_id, "left_members", -1
)
room_stats_delta["left_members"] = -1
elif prev_membership == Membership.BAN:
yield self.store.update_stats_delta(
now, "room", room_id, "banned_members", -1
)
room_stats_delta["banned_members"] = -1
else:
err = "%s is not a valid prev_membership" % (repr(prev_membership),)
logger.error(err)
raise ValueError(err)
raise ValueError(
"%r is not a valid prev_membership" % (prev_membership,)
)
if membership == prev_membership:
pass # noop
if membership == Membership.JOIN:
yield self.store.update_stats_delta(
now, "room", room_id, "joined_members", +1
)
room_stats_delta["joined_members"] = +1
elif membership == Membership.INVITE:
yield self.store.update_stats_delta(
now, "room", room_id, "invited_members", +1
)
room_stats_delta["invited_members"] = +1
elif membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now, "room", room_id, "left_members", +1
)
room_stats_delta["left_members"] = +1
elif membership == Membership.BAN:
yield self.store.update_stats_delta(
now, "room", room_id, "banned_members", +1
)
room_stats_delta["banned_members"] = +1
else:
err = "%s is not a valid membership" % (repr(membership),)
logger.error(err)
raise ValueError(err)
raise ValueError("%r is not a valid membership" % (membership,))
user_id = state_key
if self.is_mine_id(user_id):
# update user_stats as it's one of our users
public = yield self._is_public_room(room_id)
# this accounts for transitions like leave → ban and so on.
has_changed_joinedness = (prev_membership == Membership.JOIN) != (
membership == Membership.JOIN
)
if has_changed_joinedness:
# update user_stats as it's one of our users
public = yield self._is_public_room(room_id)
field = "public_rooms" if public else "private_rooms"
delta = +1 if membership == Membership.JOIN else -1
if membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now,
"user",
user_id,
"public_rooms" if public else "private_rooms",
-1,
)
elif membership == Membership.JOIN:
yield self.store.update_stats_delta(
now,
"user",
user_id,
"public_rooms" if public else "private_rooms",
+1,
stream_timestamp, "user", user_id, {field: delta}
)
elif typ == EventTypes.Create:
@@ -246,28 +255,50 @@ class StatsHandler(StateDeltasHandler):
},
)
is_newly_created = True
elif typ == EventTypes.JoinRules:
old_room_state = yield self.store.get_room_stats_state(room_id)
yield self.store.update_room_state(
room_id, {"join_rules": event_content.get("join_rule")}
)
is_public = yield self._get_key_change(
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
# whether the room would be public anyway,
# because of history_visibility
other_field_gives_publicity = (
old_room_state["history_visibility"] == "world_readable"
)
if is_public is not None:
yield self.update_public_room_stats(now, room_id, is_public)
if not other_field_gives_publicity:
is_public = yield self._get_key_change(
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
)
if is_public is not None:
yield self.update_public_room_stats(
stream_timestamp, room_id, is_public
)
elif typ == EventTypes.RoomHistoryVisibility:
old_room_state = yield self.store.get_room_stats_state(room_id)
yield self.store.update_room_state(
room_id,
{"history_visibility": event_content.get("history_visibility")},
)
is_public = yield self._get_key_change(
prev_event_id, event_id, "history_visibility", "world_readable"
# whether the room would be public anyway,
# because of join_rule
other_field_gives_publicity = (
old_room_state["join_rules"] == JoinRules.PUBLIC
)
if is_public is not None:
yield self.update_public_room_stats(now, room_id, is_public)
if not other_field_gives_publicity:
is_public = yield self._get_key_change(
prev_event_id, event_id, "history_visibility", "world_readable"
)
if is_public is not None:
yield self.update_public_room_stats(
stream_timestamp, room_id, is_public
)
elif typ == EventTypes.Encryption:
yield self.store.update_room_state(
@@ -290,6 +321,20 @@ class StatsHandler(StateDeltasHandler):
room_id, {"canonical_alias": event_content.get("alias")}
)
if is_newly_created:
yield self.store.update_stats_delta(
stream_timestamp,
"room",
room_id,
room_stats_delta,
complete_with_stream_id=stream_id,
)
elif len(room_stats_delta) > 0:
yield self.store.update_stats_delta(
stream_timestamp, "room", room_id, room_stats_delta
)
@defer.inlineCallbacks
def update_public_room_stats(self, ts, room_id, is_public):
"""
@@ -308,10 +353,13 @@ class StatsHandler(StateDeltasHandler):
for user_id in user_ids:
if self.hs.is_mine(UserID.from_string(user_id)):
yield self.store.update_stats_delta(
ts, "user", user_id, "public_rooms", +1 if is_public else -1
)
yield self.store.update_stats_delta(
ts, "user", user_id, "private_rooms", -1 if is_public else +1
ts,
"user",
user_id,
{
"public_rooms": +1 if is_public else -1,
"private_rooms": -1 if is_public else +1,
},
)
@defer.inlineCallbacks

View File

@@ -2270,8 +2270,9 @@ class EventsStore(
"room_aliases",
"room_depth",
"room_memberships",
"room_state",
"room_stats",
"room_stats_state",
"room_stats_current",
"room_stats_historical",
"room_stats_earliest_token",
"rooms",
"stream_ordering_to_exterm",

View File

@@ -869,6 +869,17 @@ class RegistrationStore(
(user_id_obj.localpart, create_profile_with_displayname),
)
if self.hs.config.stats_enabled:
# we create a new completed user statistics row
# we don't strictly need current_token since this user really can't
# have any state deltas before now (as it is a new user), but still,
# we include it for completeness.
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
self._update_stats_delta_txn(
txn, now, "user", user_id, {}, complete_with_stream_id=current_token
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
txn.call_after(self.is_guest.invalidate, (user_id,))
@@ -1140,6 +1151,7 @@ class RegistrationStore(
deferred str|None: A str representing a link to redirect the user
to if there is one.
"""
# Insert everything into a transaction in order to run atomically
def validate_threepid_session_txn(txn):
row = self._simple_select_one_txn(

View File

@@ -0,0 +1,144 @@
/* Copyright 2018 New Vector Ltd
* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
----- First clean up from previous versions of room stats.
-- First remove old stats stuff
DROP TABLE IF EXISTS room_stats;
DROP TABLE IF EXISTS user_stats;
DROP TABLE IF EXISTS room_stats_earliest_tokens;
DROP TABLE IF EXISTS _temp_populate_stats_position;
DROP TABLE IF EXISTS _temp_populate_stats_rooms;
DROP TABLE IF EXISTS stats_stream_pos;
-- Unschedule old background updates if they're still scheduled
DELETE FROM background_updates WHERE update_name IN (
'populate_stats_createtables',
'populate_stats_process_rooms',
'populate_stats_cleanup'
);
----- Create tables for our version of room stats.
-- single-row table to track position of incremental updates
CREATE TABLE IF NOT EXISTS stats_incremental_position (
-- the stream_id of the last-processed state delta
state_delta_stream_id BIGINT,
-- the stream_ordering of the last-processed backfilled event
-- (this is negative)
total_events_min_stream_ordering BIGINT,
-- the stream_ordering of the last-processed normally-created event
-- (this is positive)
total_events_max_stream_ordering BIGINT,
-- If true, this represents the contract agreed upon by the stats
-- regenerator.
-- If false, this is suitable for use by the delta/incremental processor.
is_background_contract BOOLEAN NOT NULL PRIMARY KEY
);
-- insert a null row and make sure it is the only one.
DELETE FROM stats_incremental_position;
INSERT INTO stats_incremental_position (
state_delta_stream_id,
total_events_min_stream_ordering,
total_events_max_stream_ordering,
is_background_contract
) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1));
-- represents PRESENT room statistics for a room
-- only holds absolute fields
CREATE TABLE IF NOT EXISTS room_stats_current (
room_id TEXT NOT NULL PRIMARY KEY,
current_state_events INT NOT NULL,
total_events INT NOT NULL,
total_event_bytes BIGINT NOT NULL,
joined_members INT NOT NULL,
invited_members INT NOT NULL,
left_members INT NOT NULL,
banned_members INT NOT NULL,
-- If initial stats regen is still to be performed: NULL
-- If initial stats regen has been performed: the maximum delta stream
-- position that this row takes into account.
completed_delta_stream_id BIGINT
);
-- represents HISTORICAL room statistics for a room
CREATE TABLE IF NOT EXISTS room_stats_historical (
room_id TEXT NOT NULL,
-- These stats cover the time from (end_ts - bucket_size)...end_ts (in ms).
-- Note that end_ts is quantised.
end_ts BIGINT NOT NULL,
bucket_size INT NOT NULL,
current_state_events INT NOT NULL,
total_events INT NOT NULL,
total_event_bytes BIGINT NOT NULL,
joined_members INT NOT NULL,
invited_members INT NOT NULL,
left_members INT NOT NULL,
banned_members INT NOT NULL,
PRIMARY KEY (room_id, end_ts)
);
-- We use this index to speed up deletion of ancient room stats.
CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical (end_ts);
-- We don't need an index on (room_id, end_ts) because PRIMARY KEY sorts that
-- out for us. (We would want it to review stats for a particular room.)
-- represents PRESENT statistics for a user
-- only holds absolute fields
CREATE TABLE IF NOT EXISTS user_stats_current (
user_id TEXT NOT NULL PRIMARY KEY,
public_rooms INT NOT NULL,
private_rooms INT NOT NULL,
-- If initial stats regen is still to be performed: NULL
-- If initial stats regen has been performed: the maximum delta stream
-- position that this row takes into account.
completed_delta_stream_id BIGINT
);
-- represents HISTORICAL statistics for a user
CREATE TABLE IF NOT EXISTS user_stats_historical (
user_id TEXT NOT NULL,
end_ts BIGINT NOT NULL,
bucket_size INT NOT NULL,
public_rooms INT NOT NULL,
private_rooms INT NOT NULL,
PRIMARY KEY (user_id, end_ts)
);
-- We use this index to speed up deletion of ancient user stats.
CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical (end_ts);
-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that
-- out for us. (We would want it to review stats for a particular user.)
-- Also rename room_state to room_stats_state to make its ownership clear.
ALTER TABLE room_state RENAME TO room_stats_state;

View File

@@ -0,0 +1,24 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- These partial indices helps us with finding incomplete stats row
CREATE INDEX IF NOT EXISTS room_stats_not_complete
ON room_stats_current (room_id)
WHERE completed_delta_stream_id IS NULL;
CREATE INDEX IF NOT EXISTS user_stats_not_complete
ON user_stats_current (user_id)
WHERE completed_delta_stream_id IS NULL;

View File

@@ -0,0 +1,27 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- even though SQLite >= 3.8 can support partial indices, we won't enable
-- them, in case the SQLite database may be later used on another system.
-- It's also the case that SQLite is only likely to be used in small
-- deployments or testing, where the optimisations gained by use of a
-- partial index are not a big concern.
CREATE INDEX IF NOT EXISTS room_stats_not_complete
ON room_stats_current (completed_delta_stream_id, room_id);
CREATE INDEX IF NOT EXISTS user_stats_not_complete
ON user_stats_current (completed_delta_stream_id, user_id);

File diff suppressed because it is too large Load Diff

View File

@@ -17,12 +17,18 @@ from mock import Mock
from twisted.internet import defer
from synapse import storage
from synapse.api.constants import EventTypes, Membership
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from tests import unittest
# The expected number of state events in a fresh public room.
EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM = 5
# The expected number of state events in a fresh private room.
EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM = 6
class StatsRoomTests(unittest.HomeserverTestCase):
@@ -33,7 +39,6 @@ class StatsRoomTests(unittest.HomeserverTestCase):
]
def prepare(self, reactor, clock, hs):
self.store = hs.get_datastore()
self.handler = self.hs.get_stats_handler()
@@ -47,7 +52,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.get_success(
self.store._simple_insert(
"background_updates",
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
{"update_name": "populate_stats_prepare", "progress_json": "{}"},
)
)
self.get_success(
@@ -56,7 +61,17 @@ class StatsRoomTests(unittest.HomeserverTestCase):
{
"update_name": "populate_stats_process_rooms",
"progress_json": "{}",
"depends_on": "populate_stats_createtables",
"depends_on": "populate_stats_prepare",
},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_process_users",
"progress_json": "{}",
"depends_on": "populate_stats_process_rooms",
},
)
)
@@ -66,16 +81,43 @@ class StatsRoomTests(unittest.HomeserverTestCase):
{
"update_name": "populate_stats_cleanup",
"progress_json": "{}",
"depends_on": "populate_stats_process_rooms",
"depends_on": "populate_stats_process_users",
},
)
)
def get_all_room_state(self):
return self.store._simple_select_list(
"room_stats_state", None, retcols=("name", "topic", "canonical_alias")
)
def _get_current_stats(self, stats_type, stat_id):
table, id_col = storage.stats.TYPE_TO_TABLE[stats_type]
cols = (
["completed_delta_stream_id"]
+ list(storage.stats.ABSOLUTE_STATS_FIELDS[stats_type])
+ list(storage.stats.PER_SLICE_FIELDS[stats_type])
)
return self.get_success(
self.store._simple_select_one(
table + "_current", {id_col: stat_id}, cols, allow_none=True
)
)
def _perform_background_initial_update(self):
# Do the initial population of the stats via the background update
self._add_background_updates()
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
def test_initial_room(self):
"""
The background updates will build the table from scratch.
"""
r = self.get_success(self.store.get_all_room_state())
r = self.get_success(self.get_all_room_state())
self.assertEqual(len(r), 0)
# Disable stats
@@ -91,7 +133,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
)
# Stats disabled, shouldn't have done anything
r = self.get_success(self.store.get_all_room_state())
r = self.get_success(self.get_all_room_state())
self.assertEqual(len(r), 0)
# Enable stats
@@ -104,7 +146,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
r = self.get_success(self.store.get_all_room_state())
r = self.get_success(self.get_all_room_state())
self.assertEqual(len(r), 1)
self.assertEqual(r[0]["topic"], "foo")
@@ -114,6 +156,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
Ingestion via notify_new_event will ignore tokens that the background
update have already processed.
"""
self.reactor.advance(86401)
self.hs.config.stats_enabled = False
@@ -138,12 +181,12 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.hs.config.stats_enabled = True
self.handler.stats_enabled = True
self.store._all_done = False
self.get_success(self.store.update_stats_stream_pos(None))
self.get_success(self.store.update_stats_positions(None))
self.get_success(
self.store._simple_insert(
"background_updates",
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
{"update_name": "populate_stats_prepare", "progress_json": "{}"},
)
)
@@ -154,6 +197,8 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token)
self.helper.join(room=room_1, user=u2, tok=u2_token)
# orig_delta_processor = self.store.
# Now do the initial ingestion.
self.get_success(
self.store._simple_insert(
@@ -185,8 +230,15 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token)
self.helper.join(room=room_1, user=u3, tok=u3_token)
# Get the deltas! There should be two -- day 1, and day 2.
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
# self.handler.notify_new_event()
# We need to let the delta processor advance…
self.pump(10 * 60)
# Get the slices! There should be two -- day 1, and day 2.
r = self.get_success(self.store.get_statistics_for_subject("room", room_1, 0))
self.assertEqual(len(r), 2)
# The oldest has 2 joined members
self.assertEqual(r[-1]["joined_members"], 2)
@@ -259,7 +311,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
room_1 = self.helper.create_room_as(u1, tok=u1_token)
# Do the initial population of the user directory via the background update
# Do the initial population of the stats via the background update
self._add_background_updates()
while not self.get_success(self.store.has_completed_background_updates()):
@@ -299,6 +351,548 @@ class StatsRoomTests(unittest.HomeserverTestCase):
# One delta, with two joined members -- the room creator, and our fake
# user.
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
r = self.get_success(self.store.get_statistics_for_subject("room", room_1, 0))
self.assertEqual(len(r), 1)
self.assertEqual(r[0]["joined_members"], 2)
def test_create_user(self):
"""
When we create a user, it should have statistics already ready.
"""
u1 = self.register_user("u1", "pass")
u1stats = self._get_current_stats("user", u1)
self.assertIsNotNone(u1stats)
# row is complete
self.assertIsNotNone(u1stats["completed_delta_stream_id"])
# not in any rooms by default
self.assertEqual(u1stats["public_rooms"], 0)
self.assertEqual(u1stats["private_rooms"], 0)
def test_create_room(self):
"""
When we create a room, it should have statistics already ready.
"""
self._perform_background_initial_update()
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
r1stats = self._get_current_stats("room", r1)
r2 = self.helper.create_room_as(u1, tok=u1token, is_public=False)
r2stats = self._get_current_stats("room", r2)
self.assertIsNotNone(r1stats)
self.assertIsNotNone(r2stats)
# row is complete
self.assertIsNotNone(r1stats["completed_delta_stream_id"])
self.assertIsNotNone(r2stats["completed_delta_stream_id"])
# contains the default things you'd expect in a fresh room
self.assertEqual(
r1stats["total_events"],
EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM,
"Wrong number of total_events in new room's stats!"
" You may need to update this if more state events are added to"
" the room creation process.",
)
self.assertEqual(
r2stats["total_events"],
EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM,
"Wrong number of total_events in new room's stats!"
" You may need to update this if more state events are added to"
" the room creation process.",
)
self.assertEqual(
r1stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM
)
self.assertEqual(
r2stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM
)
self.assertEqual(r1stats["joined_members"], 1)
self.assertEqual(r1stats["invited_members"], 0)
self.assertEqual(r1stats["banned_members"], 0)
self.assertEqual(r2stats["joined_members"], 1)
self.assertEqual(r2stats["invited_members"], 0)
self.assertEqual(r2stats["banned_members"], 0)
def test_send_message_increments_total_events(self):
"""
When we send a message, it increments total_events.
"""
self._perform_background_initial_update()
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
r1stats_ante = self._get_current_stats("room", r1)
self.helper.send(r1, "hiss", tok=u1token)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
def test_send_state_event_nonoverwriting(self):
"""
When we send a non-overwriting state event, it increments total_events AND current_state_events
"""
self._perform_background_initial_update()
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
self.helper.send_state(
r1, "cat.hissing", {"value": True}, tok=u1token, state_key="tabby"
)
r1stats_ante = self._get_current_stats("room", r1)
self.helper.send_state(
r1, "cat.hissing", {"value": False}, tok=u1token, state_key="moggy"
)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
1,
)
def test_send_state_event_overwriting(self):
"""
When we send an overwriting state event, it increments total_events ONLY
"""
self._perform_background_initial_update()
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
self.helper.send_state(
r1, "cat.hissing", {"value": True}, tok=u1token, state_key="tabby"
)
r1stats_ante = self._get_current_stats("room", r1)
self.helper.send_state(
r1, "cat.hissing", {"value": False}, tok=u1token, state_key="tabby"
)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
0,
)
def test_join_first_time(self):
"""
When a user joins a room for the first time, total_events, current_state_events and
joined_members should increase by exactly 1.
"""
self._perform_background_initial_update()
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
u2 = self.register_user("u2", "pass")
u2token = self.login("u2", "pass")
r1stats_ante = self._get_current_stats("room", r1)
self.helper.join(r1, u2, tok=u2token)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
1,
)
self.assertEqual(
r1stats_post["joined_members"] - r1stats_ante["joined_members"], 1
)
def test_join_after_leave(self):
"""
When a user joins a room after being previously left, total_events and
joined_members should increase by exactly 1.
current_state_events should not increase.
left_members should decrease by exactly 1.
"""
self._perform_background_initial_update()
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
u2 = self.register_user("u2", "pass")
u2token = self.login("u2", "pass")
self.helper.join(r1, u2, tok=u2token)
self.helper.leave(r1, u2, tok=u2token)
r1stats_ante = self._get_current_stats("room", r1)
self.helper.join(r1, u2, tok=u2token)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
0,
)
self.assertEqual(
r1stats_post["joined_members"] - r1stats_ante["joined_members"], +1
)
self.assertEqual(
r1stats_post["left_members"] - r1stats_ante["left_members"], -1
)
def test_invited(self):
"""
When a user invites another user, current_state_events, total_events and
invited_members should increase by exactly 1.
"""
self._perform_background_initial_update()
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
u2 = self.register_user("u2", "pass")
r1stats_ante = self._get_current_stats("room", r1)
self.helper.invite(r1, u1, u2, tok=u1token)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
1,
)
self.assertEqual(
r1stats_post["invited_members"] - r1stats_ante["invited_members"], +1
)
def test_join_after_invite(self):
"""
When a user joins a room after being invited, total_events and
joined_members should increase by exactly 1.
current_state_events should not increase.
invited_members should decrease by exactly 1.
"""
self._perform_background_initial_update()
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
u2 = self.register_user("u2", "pass")
u2token = self.login("u2", "pass")
self.helper.invite(r1, u1, u2, tok=u1token)
r1stats_ante = self._get_current_stats("room", r1)
self.helper.join(r1, u2, tok=u2token)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
0,
)
self.assertEqual(
r1stats_post["joined_members"] - r1stats_ante["joined_members"], +1
)
self.assertEqual(
r1stats_post["invited_members"] - r1stats_ante["invited_members"], -1
)
def test_left(self):
"""
When a user leaves a room after joining, total_events and
left_members should increase by exactly 1.
current_state_events should not increase.
joined_members should decrease by exactly 1.
"""
self._perform_background_initial_update()
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
u2 = self.register_user("u2", "pass")
u2token = self.login("u2", "pass")
self.helper.join(r1, u2, tok=u2token)
r1stats_ante = self._get_current_stats("room", r1)
self.helper.leave(r1, u2, tok=u2token)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
0,
)
self.assertEqual(
r1stats_post["left_members"] - r1stats_ante["left_members"], +1
)
self.assertEqual(
r1stats_post["joined_members"] - r1stats_ante["joined_members"], -1
)
def test_banned(self):
"""
When a user is banned from a room after joining, total_events and
left_members should increase by exactly 1.
current_state_events should not increase.
banned_members should decrease by exactly 1.
"""
self._perform_background_initial_update()
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
u2 = self.register_user("u2", "pass")
u2token = self.login("u2", "pass")
self.helper.join(r1, u2, tok=u2token)
r1stats_ante = self._get_current_stats("room", r1)
self.helper.change_membership(r1, u1, u2, "ban", tok=u1token)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
0,
)
self.assertEqual(
r1stats_post["banned_members"] - r1stats_ante["banned_members"], +1
)
self.assertEqual(
r1stats_post["joined_members"] - r1stats_ante["joined_members"], -1
)
def test_initial_background_update(self):
"""
Test that statistics can be generated by the initial background update
handler.
This test also tests that stats rows are not created for new subjects
when stats are disabled. However, it may be desirable to change this
behaviour eventually to still keep current rows.
"""
self.hs.config.stats_enabled = False
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
# test that these subjects, which were created during a time of disabled
# stats, do not have stats.
self.assertIsNone(self._get_current_stats("room", r1))
self.assertIsNone(self._get_current_stats("user", u1))
self.hs.config.stats_enabled = True
self._perform_background_initial_update()
r1stats = self._get_current_stats("room", r1)
u1stats = self._get_current_stats("user", u1)
self.assertIsNotNone(r1stats["completed_delta_stream_id"])
self.assertIsNotNone(u1stats["completed_delta_stream_id"])
self.assertEqual(r1stats["joined_members"], 1)
self.assertEqual(
r1stats["total_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM
)
self.assertEqual(
r1stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM
)
self.assertEqual(u1stats["public_rooms"], 1)
def test_incomplete_stats(self):
"""
This tests that we track incomplete statistics.
We first test that incomplete stats are incrementally generated,
following the preparation of a background regen.
We then test that these incomplete rows are completed by the background
regen.
"""
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
u2 = self.register_user("u2", "pass")
u2token = self.login("u2", "pass")
u3 = self.register_user("u3", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token, is_public=False)
# preparation stage of the initial background update
# Ugh, have to reset this flag
self.store._all_done = False
self.get_success(
self.store._simple_insert(
"background_updates",
{"update_name": "populate_stats_prepare", "progress_json": "{}"},
)
)
self.get_success(
self.store._simple_delete(
"room_stats_current", {"1": 1}, "test_delete_stats"
)
)
self.get_success(
self.store._simple_delete(
"user_stats_current", {"1": 1}, "test_delete_stats"
)
)
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
r1stats_ante = self._get_current_stats("room", r1)
u1stats_ante = self._get_current_stats("user", u1)
u2stats_ante = self._get_current_stats("user", u2)
self.helper.invite(r1, u1, u2, tok=u1token)
self.helper.join(r1, u2, tok=u2token)
self.helper.invite(r1, u1, u3, tok=u1token)
self.helper.send(r1, "thou shalt yield", tok=u1token)
r1stats_post = self._get_current_stats("room", r1)
u1stats_post = self._get_current_stats("user", u1)
u2stats_post = self._get_current_stats("user", u2)
# now let the background update continue & finish
self.store._all_done = False
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_process_rooms",
"progress_json": "{}",
"depends_on": "populate_stats_prepare",
},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_process_users",
"progress_json": "{}",
"depends_on": "populate_stats_process_rooms",
},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_cleanup",
"progress_json": "{}",
"depends_on": "populate_stats_process_users",
},
)
)
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
r1stats_complete = self._get_current_stats("room", r1)
u1stats_complete = self._get_current_stats("user", u1)
u2stats_complete = self._get_current_stats("user", u2)
# now we make our assertions
# first check that none of the stats rows were complete before
# the background update occurred.
self.assertIsNone(r1stats_ante["completed_delta_stream_id"])
self.assertIsNone(r1stats_post["completed_delta_stream_id"])
self.assertIsNone(u1stats_ante["completed_delta_stream_id"])
self.assertIsNone(u1stats_post["completed_delta_stream_id"])
self.assertIsNone(u2stats_ante["completed_delta_stream_id"])
self.assertIsNone(u2stats_post["completed_delta_stream_id"])
# check that _ante rows are all skeletons without any deltas applied
self.assertEqual(r1stats_ante["joined_members"], 0)
self.assertEqual(r1stats_ante["invited_members"], 0)
self.assertEqual(r1stats_ante["total_events"], 0)
self.assertEqual(r1stats_ante["current_state_events"], 0)
self.assertEqual(u1stats_ante["public_rooms"], 0)
self.assertEqual(u1stats_ante["private_rooms"], 0)
self.assertEqual(u2stats_ante["public_rooms"], 0)
self.assertEqual(u2stats_ante["private_rooms"], 0)
# check that _post rows have the expected deltas applied
self.assertEqual(r1stats_post["joined_members"], 1)
self.assertEqual(r1stats_post["invited_members"], 1)
self.assertEqual(r1stats_post["total_events"], 4)
self.assertEqual(r1stats_post["current_state_events"], 2)
self.assertEqual(u1stats_post["public_rooms"], 0)
self.assertEqual(u1stats_post["private_rooms"], 0)
self.assertEqual(u2stats_post["public_rooms"], 0)
self.assertEqual(u2stats_post["private_rooms"], 1)
# check that _complete rows are complete and correct
self.assertEqual(r1stats_complete["joined_members"], 2)
self.assertEqual(r1stats_complete["invited_members"], 1)
self.assertEqual(
r1stats_complete["total_events"],
4 + EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM,
)
self.assertEqual(
r1stats_complete["current_state_events"],
2 + EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM,
)
self.assertEqual(u1stats_complete["public_rooms"], 0)
self.assertEqual(u1stats_complete["private_rooms"], 1)
self.assertEqual(u2stats_complete["public_rooms"], 0)
self.assertEqual(u2stats_complete["private_rooms"], 1)

View File

@@ -128,8 +128,12 @@ class RestHelper(object):
return channel.json_body
def send_state(self, room_id, event_type, body, tok, expect_code=200):
path = "/_matrix/client/r0/rooms/%s/state/%s" % (room_id, event_type)
def send_state(self, room_id, event_type, body, tok, expect_code=200, state_key=""):
path = "/_matrix/client/r0/rooms/%s/state/%s/%s" % (
room_id,
event_type,
state_key,
)
if tok:
path = path + "?access_token=%s" % tok