mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Merge commit '89cb613a4e' into release-v1.129
This commit is contained in:
@@ -30,13 +30,10 @@ The following statistics are sent to the configured reporting endpoint:
|
||||
| `python_version` | string | The Python version number in use (e.g "3.7.1"). Taken from `sys.version_info`. |
|
||||
| `total_users` | int | The number of registered users on the homeserver. |
|
||||
| `total_nonbridged_users` | int | The number of users, excluding those created by an Application Service. |
|
||||
| `daily_user_type_native` | int | The number of native, non-guest users created in the last 24 hours. |
|
||||
| `daily_user_type_native` | int | The number of native users created in the last 24 hours. |
|
||||
| `daily_user_type_guest` | int | The number of guest users created in the last 24 hours. |
|
||||
| `daily_user_type_bridged` | int | The number of users created by Application Services in the last 24 hours. |
|
||||
| `total_room_count` | int | The total number of rooms present on the homeserver. |
|
||||
| `total_event_count` | int | The total number of events present on the homeserver. |
|
||||
| `total_message_count` | int | The total number of non-state events with type `m.room.message` present on the homeserver. |
|
||||
| `total_e2ee_event_count` | int | The total number of non-state events with type `m.room.encrypted` present on the homeserver. This can be used as a slight over-estimate for the number of encrypted messages. |
|
||||
| `daily_active_users` | int | The number of unique users[^1] that have used the homeserver in the last 24 hours. |
|
||||
| `monthly_active_users` | int | The number of unique users[^1] that have used the homeserver in the last 30 days. |
|
||||
| `daily_active_rooms` | int | The number of rooms that have had a (state) event with the type `m.room.message` sent in them in the last 24 hours. |
|
||||
@@ -53,8 +50,8 @@ The following statistics are sent to the configured reporting endpoint:
|
||||
| `cache_factor` | int | The configured [`global factor`](../../configuration/config_documentation.md#caching) value for caching. |
|
||||
| `event_cache_size` | int | The configured [`event_cache_size`](../../configuration/config_documentation.md#caching) value for caching. |
|
||||
| `database_engine` | string | The database engine that is in use. Either "psycopg2" meaning PostgreSQL is in use, or "sqlite3" for SQLite3. |
|
||||
| `database_server_version` | string | The version of the database server. Examples being "10.10" for PostgreSQL server version 10.0, and "3.38.5" for SQLite 3.38.5 installed on the system. |
|
||||
| `log_level` | string | The log level in use. Examples are "INFO", "WARNING", "ERROR", "DEBUG", etc. |
|
||||
| `database_server_version` | string | The version of the database server. Examples being "10.10" for PostgreSQL server version 10.0, and "3.38.5" for SQLite 3.38.5 installed on the system. |
|
||||
| `log_level` | string | The log level in use. Examples are "INFO", "WARNING", "ERROR", "DEBUG", etc. |
|
||||
|
||||
|
||||
[^1]: Native matrix users and guests are always counted. If the
|
||||
|
||||
@@ -34,22 +34,6 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger("synapse.app.homeserver")
|
||||
|
||||
ONE_MINUTE_SECONDS = 60
|
||||
ONE_HOUR_SECONDS = 60 * ONE_MINUTE_SECONDS
|
||||
|
||||
MILLISECONDS_PER_SECOND = 1000
|
||||
|
||||
INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME_SECONDS = 5 * ONE_MINUTE_SECONDS
|
||||
"""
|
||||
We wait 5 minutes to send the first set of stats as the server can be quite busy the
|
||||
first few minutes
|
||||
"""
|
||||
|
||||
PHONE_HOME_INTERVAL_SECONDS = 3 * ONE_HOUR_SECONDS
|
||||
"""
|
||||
Phone home stats are sent every 3 hours
|
||||
"""
|
||||
|
||||
# Contains the list of processes we will be monitoring
|
||||
# currently either 0 or 1
|
||||
_stats_process: List[Tuple[int, "resource.struct_rusage"]] = []
|
||||
@@ -137,9 +121,6 @@ async def phone_stats_home(
|
||||
|
||||
room_count = await store.get_room_count()
|
||||
stats["total_room_count"] = room_count
|
||||
stats["total_event_count"] = await store.count_total_events()
|
||||
stats["total_message_count"] = await store.count_total_messages()
|
||||
stats["total_e2ee_event_count"] = await store.count_total_e2ee_events()
|
||||
|
||||
stats["daily_active_users"] = common_metrics.daily_active_users
|
||||
stats["monthly_active_users"] = await store.count_monthly_users()
|
||||
@@ -204,14 +185,12 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
|
||||
# If you increase the loop period, the accuracy of user_daily_visits
|
||||
# table will decrease
|
||||
clock.looping_call(
|
||||
hs.get_datastores().main.generate_user_daily_visits,
|
||||
5 * ONE_MINUTE_SECONDS * MILLISECONDS_PER_SECOND,
|
||||
hs.get_datastores().main.generate_user_daily_visits, 5 * 60 * 1000
|
||||
)
|
||||
|
||||
# monthly active user limiting functionality
|
||||
clock.looping_call(
|
||||
hs.get_datastores().main.reap_monthly_active_users,
|
||||
ONE_HOUR_SECONDS * MILLISECONDS_PER_SECOND,
|
||||
hs.get_datastores().main.reap_monthly_active_users, 1000 * 60 * 60
|
||||
)
|
||||
hs.get_datastores().main.reap_monthly_active_users()
|
||||
|
||||
@@ -237,20 +216,12 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
|
||||
|
||||
if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only:
|
||||
generate_monthly_active_users()
|
||||
clock.looping_call(
|
||||
generate_monthly_active_users,
|
||||
5 * ONE_MINUTE_SECONDS * MILLISECONDS_PER_SECOND,
|
||||
)
|
||||
clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
|
||||
# End of monthly active user settings
|
||||
|
||||
if hs.config.metrics.report_stats:
|
||||
logger.info("Scheduling stats reporting for 3 hour intervals")
|
||||
clock.looping_call(
|
||||
phone_stats_home,
|
||||
PHONE_HOME_INTERVAL_SECONDS * MILLISECONDS_PER_SECOND,
|
||||
hs,
|
||||
stats,
|
||||
)
|
||||
clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000, hs, stats)
|
||||
|
||||
# We need to defer this init for the cases that we daemonize
|
||||
# otherwise the process ID we get is that of the non-daemon process
|
||||
@@ -258,6 +229,4 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
|
||||
|
||||
# We wait 5 minutes to send the first set of stats as the server can
|
||||
# be quite busy the first few minutes
|
||||
clock.call_later(
|
||||
INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME_SECONDS, phone_stats_home, hs, stats
|
||||
)
|
||||
clock.call_later(5 * 60, phone_stats_home, hs, stats)
|
||||
|
||||
@@ -47,7 +47,7 @@ from synapse.storage.databases.main.events_worker import (
|
||||
)
|
||||
from synapse.storage.databases.main.state_deltas import StateDeltasStore
|
||||
from synapse.storage.databases.main.stream import StreamWorkerStore
|
||||
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection
|
||||
from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
|
||||
@@ -311,12 +311,6 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
||||
self._sliding_sync_membership_snapshots_fix_forgotten_column_bg_update,
|
||||
)
|
||||
|
||||
# Add a background update to add triggers which track event counts.
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
self._event_stats_populate_counts_bg_update,
|
||||
)
|
||||
|
||||
# We want this to run on the main database at startup before we start processing
|
||||
# events.
|
||||
#
|
||||
@@ -2553,288 +2547,6 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
||||
|
||||
return num_rows
|
||||
|
||||
async def _event_stats_populate_counts_bg_update(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
"""
|
||||
Background update to populate the `event_stats` table with initial
|
||||
values, and register DB triggers to continue updating it.
|
||||
|
||||
We first register TRIGGERs on rows being added/removed from the `events` table,
|
||||
which will keep the event counts continuously updated. We also mark the stopping
|
||||
point for the main population step so we don't double count events.
|
||||
|
||||
Then we will iterate through the `events` table in batches and update event
|
||||
counts until we reach the stopping point.
|
||||
|
||||
This data is intended to be used by the phone-home stats to keep track
|
||||
of total event and message counts. A trigger is preferred to counting
|
||||
rows in the `events` table, as said table can grow quite large.
|
||||
|
||||
It is also preferable to adding an index on the `events` table, as even
|
||||
an index can grow large. And calculating total counts would require
|
||||
querying that entire index.
|
||||
"""
|
||||
# The last event `stream_ordering` we processed (starting place of this next
|
||||
# batch).
|
||||
last_event_stream_ordering = progress.get(
|
||||
"last_event_stream_ordering", -(1 << 31)
|
||||
)
|
||||
# The event `stream_ordering` we should stop at. This is used to avoid double
|
||||
# counting events that are already accounted for because of the triggers.
|
||||
stop_event_stream_ordering: Optional[int] = progress.get(
|
||||
"stop_event_stream_ordering", None
|
||||
)
|
||||
|
||||
def _add_triggers_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Optional[int]:
|
||||
"""
|
||||
Adds the triggers to the `events` table to keep the `event_stats` counts
|
||||
up-to-date.
|
||||
|
||||
Also populates the `stop_event_stream_ordering` background update progress
|
||||
value. This marks the point at which we added the triggers, so we can avoid
|
||||
double counting events that are already accounted for in the population
|
||||
step.
|
||||
|
||||
Returns:
|
||||
The latest event `stream_ordering` in the `events` table when the triggers
|
||||
were added or `None` if the `events` table is empty.
|
||||
"""
|
||||
|
||||
# Each time an event is inserted into the `events` table, update the stats.
|
||||
#
|
||||
# We're using `AFTER` triggers as we want to count successful inserts/deletes and
|
||||
# not the ones that could potentially fail.
|
||||
if isinstance(txn.database_engine, Sqlite3Engine):
|
||||
txn.execute(
|
||||
"""
|
||||
CREATE TRIGGER IF NOT EXISTS event_stats_events_insert_trigger
|
||||
AFTER INSERT ON events
|
||||
BEGIN
|
||||
-- Always increment total_event_count
|
||||
UPDATE event_stats SET total_event_count = total_event_count + 1;
|
||||
|
||||
-- Increment unencrypted_message_count for m.room.message events
|
||||
UPDATE event_stats
|
||||
SET unencrypted_message_count = unencrypted_message_count + 1
|
||||
WHERE NEW.type = 'm.room.message' AND NEW.state_key IS NULL;
|
||||
|
||||
-- Increment e2ee_event_count for m.room.encrypted events
|
||||
UPDATE event_stats
|
||||
SET e2ee_event_count = e2ee_event_count + 1
|
||||
WHERE NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL;
|
||||
END;
|
||||
"""
|
||||
)
|
||||
|
||||
txn.execute(
|
||||
"""
|
||||
CREATE TRIGGER IF NOT EXISTS event_stats_events_delete_trigger
|
||||
AFTER DELETE ON events
|
||||
BEGIN
|
||||
-- Always decrement total_event_count
|
||||
UPDATE event_stats SET total_event_count = total_event_count - 1;
|
||||
|
||||
-- Decrement unencrypted_message_count for m.room.message events
|
||||
UPDATE event_stats
|
||||
SET unencrypted_message_count = unencrypted_message_count - 1
|
||||
WHERE OLD.type = 'm.room.message' AND OLD.state_key IS NULL;
|
||||
|
||||
-- Decrement e2ee_event_count for m.room.encrypted events
|
||||
UPDATE event_stats
|
||||
SET e2ee_event_count = e2ee_event_count - 1
|
||||
WHERE OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL;
|
||||
END;
|
||||
"""
|
||||
)
|
||||
elif isinstance(txn.database_engine, PostgresEngine):
|
||||
txn.execute(
|
||||
"""
|
||||
CREATE OR REPLACE FUNCTION event_stats_increment_counts() RETURNS trigger AS $BODY$
|
||||
BEGIN
|
||||
IF TG_OP = 'INSERT' THEN
|
||||
-- Always increment total_event_count
|
||||
UPDATE event_stats SET total_event_count = total_event_count + 1;
|
||||
|
||||
-- Increment unencrypted_message_count for m.room.message events
|
||||
IF NEW.type = 'm.room.message' AND NEW.state_key IS NULL THEN
|
||||
UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count + 1;
|
||||
END IF;
|
||||
|
||||
-- Increment e2ee_event_count for m.room.encrypted events
|
||||
IF NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL THEN
|
||||
UPDATE event_stats SET e2ee_event_count = e2ee_event_count + 1;
|
||||
END IF;
|
||||
|
||||
-- We're not modifying the row being inserted/deleted, so we return it unchanged.
|
||||
RETURN NEW;
|
||||
|
||||
ELSIF TG_OP = 'DELETE' THEN
|
||||
-- Always decrement total_event_count
|
||||
UPDATE event_stats SET total_event_count = total_event_count - 1;
|
||||
|
||||
-- Decrement unencrypted_message_count for m.room.message events
|
||||
IF OLD.type = 'm.room.message' AND OLD.state_key IS NULL THEN
|
||||
UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count - 1;
|
||||
END IF;
|
||||
|
||||
-- Decrement e2ee_event_count for m.room.encrypted events
|
||||
IF OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL THEN
|
||||
UPDATE event_stats SET e2ee_event_count = e2ee_event_count - 1;
|
||||
END IF;
|
||||
|
||||
-- "The usual idiom in DELETE triggers is to return OLD."
|
||||
-- (https://www.postgresql.org/docs/current/plpgsql-trigger.html)
|
||||
RETURN OLD;
|
||||
END IF;
|
||||
|
||||
RAISE EXCEPTION 'update_event_stats() was run with unexpected operation (%%). '
|
||||
'This indicates a trigger misconfiguration as this function should only'
|
||||
'run with INSERT/DELETE operations.', TG_OP;
|
||||
END;
|
||||
$BODY$ LANGUAGE plpgsql;
|
||||
"""
|
||||
)
|
||||
|
||||
# We could use `CREATE OR REPLACE TRIGGER` but that's only available in Postgres
|
||||
# 14 (https://www.postgresql.org/docs/14/sql-createtrigger.html)
|
||||
txn.execute(
|
||||
"""
|
||||
DO
|
||||
$$BEGIN
|
||||
CREATE TRIGGER event_stats_increment_counts_trigger
|
||||
AFTER INSERT OR DELETE ON events
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE event_stats_increment_counts();
|
||||
EXCEPTION
|
||||
-- This acts as a "CREATE TRIGGER IF NOT EXISTS" for Postgres
|
||||
WHEN duplicate_object THEN
|
||||
NULL;
|
||||
END;$$;
|
||||
"""
|
||||
)
|
||||
else:
|
||||
raise NotImplementedError("Unknown database engine")
|
||||
|
||||
# Find the latest `stream_ordering` in the `events` table. We need to do
|
||||
# this in the same transaction as where we add the triggers so we don't miss
|
||||
# any events.
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT stream_ordering
|
||||
FROM events
|
||||
ORDER BY stream_ordering DESC
|
||||
LIMIT 1
|
||||
"""
|
||||
)
|
||||
row = cast(Optional[Tuple[int]], txn.fetchone())
|
||||
|
||||
# Update the progress
|
||||
if row is not None:
|
||||
(max_stream_ordering,) = row
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn,
|
||||
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
{"stop_event_stream_ordering": max_stream_ordering},
|
||||
)
|
||||
return max_stream_ordering
|
||||
|
||||
return None
|
||||
|
||||
# First, add the triggers to keep the `event_stats` values up-to-date.
|
||||
#
|
||||
# If we don't have a `stop_event_stream_ordering` yet, we need to add the
|
||||
# triggers to the `events` table and set the stopping point so we don't
|
||||
# double count `events` later.
|
||||
if stop_event_stream_ordering is None:
|
||||
stop_event_stream_ordering = await self.db_pool.runInteraction(
|
||||
"_event_stats_populate_counts_bg_update_add_triggers",
|
||||
_add_triggers_txn,
|
||||
)
|
||||
|
||||
# If there is no `stop_event_stream_ordering`, then there are no events
|
||||
# in the `events` table and we can end the background update altogether.
|
||||
if stop_event_stream_ordering is None:
|
||||
await self.db_pool.updates._end_background_update(
|
||||
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE
|
||||
)
|
||||
return batch_size
|
||||
|
||||
def _populate_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> int:
|
||||
"""
|
||||
Updates the `event_stats` table from this batch of events.
|
||||
"""
|
||||
|
||||
# Increment the counts based on the events present in this batch.
|
||||
txn.execute(
|
||||
"""
|
||||
WITH event_batch AS (
|
||||
SELECT *
|
||||
FROM events
|
||||
WHERE stream_ordering > ? AND stream_ordering <= ?
|
||||
ORDER BY stream_ordering ASC
|
||||
LIMIT ?
|
||||
),
|
||||
batch_stats AS (
|
||||
SELECT
|
||||
MAX(stream_ordering) AS max_stream_ordering,
|
||||
COALESCE(COUNT(*), 0) AS total_event_count,
|
||||
COALESCE(SUM(CASE WHEN type = 'm.room.message' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS unencrypted_message_count,
|
||||
COALESCE(SUM(CASE WHEN type = 'm.room.encrypted' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS e2ee_event_count
|
||||
FROM event_batch
|
||||
|
||||
UNION ALL
|
||||
|
||||
SELECT null, 0, 0, 0
|
||||
WHERE NOT EXISTS (SELECT 1 FROM event_batch)
|
||||
LIMIT 1
|
||||
)
|
||||
UPDATE event_stats
|
||||
SET
|
||||
total_event_count = total_event_count + (SELECT total_event_count FROM batch_stats),
|
||||
unencrypted_message_count = unencrypted_message_count + (SELECT unencrypted_message_count FROM batch_stats),
|
||||
e2ee_event_count = e2ee_event_count + (SELECT e2ee_event_count FROM batch_stats)
|
||||
RETURNING
|
||||
(SELECT total_event_count FROM batch_stats) AS total_event_count,
|
||||
(SELECT max_stream_ordering FROM batch_stats) AS max_stream_ordering
|
||||
""",
|
||||
(last_event_stream_ordering, stop_event_stream_ordering, batch_size),
|
||||
)
|
||||
|
||||
# Get the results of the update
|
||||
(total_event_count, max_stream_ordering) = cast(
|
||||
Tuple[int, Optional[int]], txn.fetchone()
|
||||
)
|
||||
|
||||
# Update the progress
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn,
|
||||
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
{
|
||||
"last_event_stream_ordering": max_stream_ordering,
|
||||
"stop_event_stream_ordering": stop_event_stream_ordering,
|
||||
},
|
||||
)
|
||||
|
||||
return total_event_count
|
||||
|
||||
num_rows_processed = await self.db_pool.runInteraction(
|
||||
"_event_stats_populate_counts_bg_update",
|
||||
_populate_txn,
|
||||
)
|
||||
|
||||
# No more rows to process, so our background update is complete.
|
||||
if not num_rows_processed:
|
||||
await self.db_pool.updates._end_background_update(
|
||||
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE
|
||||
)
|
||||
|
||||
return batch_size
|
||||
|
||||
|
||||
def _resolve_stale_data_in_sliding_sync_tables(
|
||||
txn: LoggingTransaction,
|
||||
|
||||
@@ -126,44 +126,6 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
|
||||
|
||||
return await self.db_pool.runInteraction("count_e2ee_messages", _count_messages)
|
||||
|
||||
async def count_total_events(self) -> int:
|
||||
"""
|
||||
Returns the total number of events present on the server.
|
||||
"""
|
||||
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="event_stats",
|
||||
keyvalues={},
|
||||
retcol="total_event_count",
|
||||
desc="count_total_events",
|
||||
)
|
||||
|
||||
async def count_total_messages(self) -> int:
|
||||
"""
|
||||
Returns the total number of `m.room.message` events present on the
|
||||
server.
|
||||
"""
|
||||
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="event_stats",
|
||||
keyvalues={},
|
||||
retcol="unencrypted_message_count",
|
||||
desc="count_total_messages",
|
||||
)
|
||||
|
||||
async def count_total_e2ee_events(self) -> int:
|
||||
"""
|
||||
Returns the total number of `m.room.encrypted` events present on the
|
||||
server.
|
||||
"""
|
||||
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="event_stats",
|
||||
keyvalues={},
|
||||
retcol="e2ee_event_count",
|
||||
desc="count_total_e2ee_events",
|
||||
)
|
||||
|
||||
async def count_daily_sent_e2ee_messages(self) -> int:
|
||||
def _count_messages(txn: LoggingTransaction) -> int:
|
||||
# This is good enough as if you have silly characters in your own
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
#
|
||||
#
|
||||
|
||||
SCHEMA_VERSION = 92 # remember to update the list below when updating
|
||||
SCHEMA_VERSION = 91 # remember to update the list below when updating
|
||||
"""Represents the expectations made by the codebase about the database schema
|
||||
|
||||
This should be incremented whenever the codebase changes its requirements on the
|
||||
@@ -162,12 +162,6 @@ Changes in SCHEMA_VERSION = 89
|
||||
Changes in SCHEMA_VERSION = 90
|
||||
- Add a column `participant` to `room_memberships` table
|
||||
- Add background update to delete unreferenced state groups.
|
||||
|
||||
Changes in SCHEMA_VERSION = 91
|
||||
- TODO
|
||||
|
||||
Changes in SCHEMA_VERSION = 92
|
||||
- Add `event_stats` table to store global event statistics like total counts
|
||||
"""
|
||||
|
||||
|
||||
|
||||
@@ -1,33 +0,0 @@
|
||||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2025 New Vector, Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
|
||||
-- Create the `event_stats` table to store these statistics.
|
||||
CREATE TABLE event_stats (
|
||||
total_event_count INTEGER NOT NULL DEFAULT 0,
|
||||
unencrypted_message_count INTEGER NOT NULL DEFAULT 0,
|
||||
e2ee_event_count INTEGER NOT NULL DEFAULT 0
|
||||
);
|
||||
|
||||
-- Insert initial values into the table.
|
||||
INSERT INTO event_stats (
|
||||
total_event_count,
|
||||
unencrypted_message_count,
|
||||
e2ee_event_count
|
||||
) VALUES (0, 0, 0);
|
||||
|
||||
-- Add a background update to populate the `event_stats` table with the current counts
|
||||
-- from the `events` table and add triggers to keep this count up-to-date.
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||
(9201, 'event_stats_populate_counts_bg_update', '{}');
|
||||
|
||||
@@ -52,5 +52,3 @@ class _BackgroundUpdates:
|
||||
MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE = (
|
||||
"mark_unreferenced_state_groups_for_deletion_bg_update"
|
||||
)
|
||||
|
||||
EVENT_STATS_POPULATE_COUNTS_BG_UPDATE = "event_stats_populate_counts_bg_update"
|
||||
|
||||
@@ -1,258 +0,0 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
import logging
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.app.phone_stats_home import (
|
||||
PHONE_HOME_INTERVAL_SECONDS,
|
||||
start_phone_stats_home,
|
||||
)
|
||||
from synapse.rest import admin, login, register, room
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests import unittest
|
||||
from tests.server import ThreadedMemoryReactorClock
|
||||
|
||||
TEST_REPORT_STATS_ENDPOINT = "https://fake.endpoint/stats"
|
||||
TEST_SERVER_CONTEXT = "test-server-context"
|
||||
|
||||
|
||||
class PhoneHomeStatsTestCase(unittest.HomeserverTestCase):
|
||||
servlets = [
|
||||
admin.register_servlets_for_client_rest_resource,
|
||||
room.register_servlets,
|
||||
register.register_servlets,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def make_homeserver(
|
||||
self, reactor: ThreadedMemoryReactorClock, clock: Clock
|
||||
) -> HomeServer:
|
||||
# Configure the homeserver to enable stats reporting.
|
||||
config = self.default_config()
|
||||
config["report_stats"] = True
|
||||
config["report_stats_endpoint"] = TEST_REPORT_STATS_ENDPOINT
|
||||
|
||||
# Configure the server context so we can check it ends up being reported
|
||||
config["server_context"] = TEST_SERVER_CONTEXT
|
||||
|
||||
# Allow guests to be registered
|
||||
config["allow_guest_access"] = True
|
||||
|
||||
hs = self.setup_test_homeserver(config=config)
|
||||
|
||||
# Replace the proxied http client with a mock, so we can inspect outbound requests to
|
||||
# the configured stats endpoint.
|
||||
self.put_json_mock = AsyncMock(return_value={})
|
||||
hs.get_proxied_http_client().put_json = self.put_json_mock # type: ignore[method-assign]
|
||||
return hs
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
# Wait for the background updates to add the database triggers that keep the
|
||||
# `event_stats` table up-to-date.
|
||||
self.wait_for_background_updates()
|
||||
|
||||
# Force stats reporting to occur
|
||||
start_phone_stats_home(hs=hs)
|
||||
|
||||
super().prepare(reactor, clock, hs)
|
||||
|
||||
def _get_latest_phone_home_stats(self) -> JsonDict:
|
||||
# Wait for `phone_stats_home` to be called again + a healthy margin (50s).
|
||||
self.reactor.advance(2 * PHONE_HOME_INTERVAL_SECONDS + 50)
|
||||
|
||||
# Extract the reported stats from our http client mock
|
||||
mock_calls = self.put_json_mock.call_args_list
|
||||
report_stats_calls = []
|
||||
for call in mock_calls:
|
||||
if call.args[0] == TEST_REPORT_STATS_ENDPOINT:
|
||||
report_stats_calls.append(call)
|
||||
|
||||
self.assertGreaterEqual(
|
||||
(len(report_stats_calls)),
|
||||
1,
|
||||
"Expected at-least one call to the report_stats endpoint",
|
||||
)
|
||||
|
||||
# Extract the phone home stats from the call
|
||||
phone_home_stats = report_stats_calls[0].args[1]
|
||||
|
||||
return phone_home_stats
|
||||
|
||||
def _perform_user_actions(self) -> None:
|
||||
"""
|
||||
Perform some actions on the homeserver that would bump the phone home
|
||||
stats.
|
||||
"""
|
||||
|
||||
# Create some users
|
||||
user_1_mxid = self.register_user(
|
||||
username="test_user_1",
|
||||
password="test",
|
||||
)
|
||||
user_2_mxid = self.register_user(
|
||||
username="test_user_2",
|
||||
password="test",
|
||||
)
|
||||
# Note: `self.register_user` does not support guest registration, and updating the
|
||||
# Admin API it calls to add a new parameter would cause the `mac` parameter to fail
|
||||
# in a backwards-incompatible manner. Hence, we make a manual request here.
|
||||
_guest_user_mxid = self.make_request(
|
||||
method="POST",
|
||||
path="/_matrix/client/v3/register?kind=guest",
|
||||
content={
|
||||
"username": "guest_user",
|
||||
"password": "test",
|
||||
},
|
||||
shorthand=False,
|
||||
)
|
||||
|
||||
# Log in to each user
|
||||
user_1_token = self.login(username=user_1_mxid, password="test")
|
||||
user_2_token = self.login(username=user_2_mxid, password="test")
|
||||
|
||||
# Create a room between the two users
|
||||
room_1_id = self.helper.create_room_as(
|
||||
is_public=False,
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# Mark this room as end-to-end encrypted
|
||||
self.helper.send_state(
|
||||
room_id=room_1_id,
|
||||
event_type="m.room.encryption",
|
||||
body={
|
||||
"algorithm": "m.megolm.v1.aes-sha2",
|
||||
"rotation_period_ms": 604800000,
|
||||
"rotation_period_msgs": 100,
|
||||
},
|
||||
state_key="",
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 1 invites user 2
|
||||
self.helper.invite(
|
||||
room=room_1_id,
|
||||
src=user_1_mxid,
|
||||
targ=user_2_mxid,
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 2 joins
|
||||
self.helper.join(
|
||||
room=room_1_id,
|
||||
user=user_2_mxid,
|
||||
tok=user_2_token,
|
||||
)
|
||||
|
||||
# User 1 sends 10 unencrypted messages
|
||||
for _ in range(10):
|
||||
self.helper.send(
|
||||
room_id=room_1_id,
|
||||
body="Zoinks Scoob! A message!",
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 2 sends 5 encrypted "messages"
|
||||
for _ in range(5):
|
||||
self.helper.send_event(
|
||||
room_id=room_1_id,
|
||||
type="m.room.encrypted",
|
||||
content={
|
||||
"algorithm": "m.olm.v1.curve25519-aes-sha2",
|
||||
"sender_key": "some_key",
|
||||
"ciphertext": {
|
||||
"some_key": {
|
||||
"type": 0,
|
||||
"body": "encrypted_payload",
|
||||
},
|
||||
},
|
||||
},
|
||||
tok=user_2_token,
|
||||
)
|
||||
|
||||
def test_phone_home_stats(self) -> None:
|
||||
"""
|
||||
Test that the phone home stats contain the stats we expect based on
|
||||
the scenario carried out in `prepare`
|
||||
"""
|
||||
# Do things to bump the stats
|
||||
self._perform_user_actions()
|
||||
|
||||
# Wait for the stats to be reported
|
||||
phone_home_stats = self._get_latest_phone_home_stats()
|
||||
|
||||
self.assertEqual(
|
||||
phone_home_stats["homeserver"], self.hs.config.server.server_name
|
||||
)
|
||||
|
||||
self.assertTrue(isinstance(phone_home_stats["memory_rss"], int))
|
||||
self.assertTrue(isinstance(phone_home_stats["cpu_average"], int))
|
||||
|
||||
self.assertEqual(phone_home_stats["server_context"], TEST_SERVER_CONTEXT)
|
||||
|
||||
self.assertTrue(isinstance(phone_home_stats["timestamp"], int))
|
||||
self.assertTrue(isinstance(phone_home_stats["uptime_seconds"], int))
|
||||
self.assertTrue(isinstance(phone_home_stats["python_version"], str))
|
||||
|
||||
# We expect only our test users to exist on the homeserver
|
||||
self.assertEqual(phone_home_stats["total_users"], 3)
|
||||
self.assertEqual(phone_home_stats["total_nonbridged_users"], 3)
|
||||
self.assertEqual(phone_home_stats["daily_user_type_native"], 2)
|
||||
self.assertEqual(phone_home_stats["daily_user_type_guest"], 1)
|
||||
self.assertEqual(phone_home_stats["daily_user_type_bridged"], 0)
|
||||
self.assertEqual(phone_home_stats["total_room_count"], 1)
|
||||
self.assertEqual(phone_home_stats["total_event_count"], 24)
|
||||
self.assertEqual(phone_home_stats["total_message_count"], 10)
|
||||
self.assertEqual(phone_home_stats["total_e2ee_event_count"], 5)
|
||||
self.assertEqual(phone_home_stats["daily_active_users"], 2)
|
||||
self.assertEqual(phone_home_stats["monthly_active_users"], 2)
|
||||
self.assertEqual(phone_home_stats["daily_active_rooms"], 1)
|
||||
self.assertEqual(phone_home_stats["daily_active_e2ee_rooms"], 1)
|
||||
self.assertEqual(phone_home_stats["daily_messages"], 10)
|
||||
self.assertEqual(phone_home_stats["daily_e2ee_messages"], 5)
|
||||
self.assertEqual(phone_home_stats["daily_sent_messages"], 10)
|
||||
self.assertEqual(phone_home_stats["daily_sent_e2ee_messages"], 5)
|
||||
|
||||
# Our users have not been around for >30 days, hence these are all 0.
|
||||
self.assertEqual(phone_home_stats["r30v2_users_all"], 0)
|
||||
self.assertEqual(phone_home_stats["r30v2_users_android"], 0)
|
||||
self.assertEqual(phone_home_stats["r30v2_users_ios"], 0)
|
||||
self.assertEqual(phone_home_stats["r30v2_users_electron"], 0)
|
||||
self.assertEqual(phone_home_stats["r30v2_users_web"], 0)
|
||||
self.assertEqual(
|
||||
phone_home_stats["cache_factor"], self.hs.config.caches.global_factor
|
||||
)
|
||||
self.assertEqual(
|
||||
phone_home_stats["event_cache_size"],
|
||||
self.hs.config.caches.event_cache_size,
|
||||
)
|
||||
self.assertEqual(
|
||||
phone_home_stats["database_engine"],
|
||||
self.hs.config.database.databases[0].config["name"],
|
||||
)
|
||||
self.assertEqual(
|
||||
phone_home_stats["database_server_version"],
|
||||
self.hs.get_datastores().main.database_engine.server_version,
|
||||
)
|
||||
|
||||
synapse_logger = logging.getLogger("synapse")
|
||||
log_level = synapse_logger.getEffectiveLevel()
|
||||
self.assertEqual(phone_home_stats["log_level"], logging.getLevelName(log_level))
|
||||
@@ -1,237 +0,0 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.rest import admin, login, register, room
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types.storage import _BackgroundUpdates
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class EventStatsTestCase(unittest.HomeserverTestCase):
|
||||
"""
|
||||
Tests for the `event_stats` table
|
||||
"""
|
||||
|
||||
servlets = [
|
||||
admin.register_servlets_for_client_rest_resource,
|
||||
room.register_servlets,
|
||||
register.register_servlets,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
# Wait for the background updates to add the database triggers that keep the
|
||||
# `event_stats` table up-to-date.
|
||||
#
|
||||
# This also prevents background updates running during the tests and messing
|
||||
# with the results.
|
||||
self.wait_for_background_updates()
|
||||
|
||||
super().prepare(reactor, clock, hs)
|
||||
|
||||
def _perform_user_actions(self) -> None:
|
||||
"""
|
||||
Perform some actions on the homeserver that would bump the event counts.
|
||||
"""
|
||||
# Create some users
|
||||
user_1_mxid = self.register_user(
|
||||
username="test_user_1",
|
||||
password="test",
|
||||
)
|
||||
user_2_mxid = self.register_user(
|
||||
username="test_user_2",
|
||||
password="test",
|
||||
)
|
||||
# Note: `self.register_user` does not support guest registration, and updating the
|
||||
# Admin API it calls to add a new parameter would cause the `mac` parameter to fail
|
||||
# in a backwards-incompatible manner. Hence, we make a manual request here.
|
||||
_guest_user_mxid = self.make_request(
|
||||
method="POST",
|
||||
path="/_matrix/client/v3/register?kind=guest",
|
||||
content={
|
||||
"username": "guest_user",
|
||||
"password": "test",
|
||||
},
|
||||
shorthand=False,
|
||||
)
|
||||
|
||||
# Log in to each user
|
||||
user_1_token = self.login(username=user_1_mxid, password="test")
|
||||
user_2_token = self.login(username=user_2_mxid, password="test")
|
||||
|
||||
# Create a room between the two users
|
||||
room_1_id = self.helper.create_room_as(
|
||||
is_public=False,
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# Mark this room as end-to-end encrypted
|
||||
self.helper.send_state(
|
||||
room_id=room_1_id,
|
||||
event_type="m.room.encryption",
|
||||
body={
|
||||
"algorithm": "m.megolm.v1.aes-sha2",
|
||||
"rotation_period_ms": 604800000,
|
||||
"rotation_period_msgs": 100,
|
||||
},
|
||||
state_key="",
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 1 invites user 2
|
||||
self.helper.invite(
|
||||
room=room_1_id,
|
||||
src=user_1_mxid,
|
||||
targ=user_2_mxid,
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 2 joins
|
||||
self.helper.join(
|
||||
room=room_1_id,
|
||||
user=user_2_mxid,
|
||||
tok=user_2_token,
|
||||
)
|
||||
|
||||
# User 1 sends 10 unencrypted messages
|
||||
for _ in range(10):
|
||||
self.helper.send(
|
||||
room_id=room_1_id,
|
||||
body="Zoinks Scoob! A message!",
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 2 sends 5 encrypted "messages"
|
||||
for _ in range(5):
|
||||
self.helper.send_event(
|
||||
room_id=room_1_id,
|
||||
type="m.room.encrypted",
|
||||
content={
|
||||
"algorithm": "m.olm.v1.curve25519-aes-sha2",
|
||||
"sender_key": "some_key",
|
||||
"ciphertext": {
|
||||
"some_key": {
|
||||
"type": 0,
|
||||
"body": "encrypted_payload",
|
||||
},
|
||||
},
|
||||
},
|
||||
tok=user_2_token,
|
||||
)
|
||||
|
||||
def test_background_update_with_events(self) -> None:
|
||||
"""
|
||||
Test that the background update to populate the `event_stats` table works
|
||||
correctly when there are events in the database.
|
||||
"""
|
||||
# Do things to bump the stats
|
||||
self._perform_user_actions()
|
||||
|
||||
# Keep in mind: These are already populated as the background update has already
|
||||
# ran once when Synapse started and added the database triggers which are
|
||||
# incrementing things as new events come in.
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 24)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 10)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 5)
|
||||
|
||||
# Run the background update again
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
"progress_json": "{}",
|
||||
},
|
||||
)
|
||||
)
|
||||
self.store.db_pool.updates._all_done = False
|
||||
self.wait_for_background_updates()
|
||||
|
||||
# We expect these values to double as the background update is being run *again*
|
||||
# and will double-count the `events`.
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 48)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 20)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 10)
|
||||
|
||||
def test_background_update_without_events(self) -> None:
|
||||
"""
|
||||
Test that the background update to populate the `event_stats` table works
|
||||
correctly without events in the database.
|
||||
"""
|
||||
# Keep in mind: These are already populated as the background update has already
|
||||
# ran once when Synapse started and added the database triggers which are
|
||||
# incrementing things as new events come in.
|
||||
#
|
||||
# In this case, no events have been sent, so we expect the counts to be 0.
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 0)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 0)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 0)
|
||||
|
||||
# Run the background update again
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
"progress_json": "{}",
|
||||
},
|
||||
)
|
||||
)
|
||||
self.store.db_pool.updates._all_done = False
|
||||
self.wait_for_background_updates()
|
||||
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 0)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 0)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 0)
|
||||
|
||||
def test_background_update_resume_progress(self) -> None:
|
||||
"""
|
||||
Test that the background update to populate the `event_stats` table works
|
||||
correctly to resume from `progress_json`.
|
||||
"""
|
||||
# Do things to bump the stats
|
||||
self._perform_user_actions()
|
||||
|
||||
# Keep in mind: These are already populated as the background update has already
|
||||
# ran once when Synapse started and added the database triggers which are
|
||||
# incrementing things as new events come in.
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 24)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 10)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 5)
|
||||
|
||||
# Run the background update again
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
"progress_json": '{ "last_event_stream_ordering": 14, "stop_event_stream_ordering": 21 }',
|
||||
},
|
||||
)
|
||||
)
|
||||
self.store.db_pool.updates._all_done = False
|
||||
self.wait_for_background_updates()
|
||||
|
||||
# We expect these values to increase as the background update is being run
|
||||
# *again* and will double-count some of the `events` over the range specified
|
||||
# by the `progress_json`.
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 24 + 7)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 16)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 6)
|
||||
Reference in New Issue
Block a user