mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-09 01:30:18 +00:00
Compare commits
9 Commits
quenting/l
...
madlittlem
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d0e2a45635 | ||
|
|
1cb9a0f59e | ||
|
|
3c873f9d97 | ||
|
|
29c936f075 | ||
|
|
0b37f9f6bc | ||
|
|
ddf77b58df | ||
|
|
a0e94f7cbd | ||
|
|
eb89758266 | ||
|
|
7f044f9053 |
1
changelog.d/18371.feature
Normal file
1
changelog.d/18371.feature
Normal file
@@ -0,0 +1 @@
|
||||
Add `total_event_count`, `total_message_count`, and `total_e2ee_event_count` fields to the homeserver usage statistics.
|
||||
@@ -30,10 +30,13 @@ The following statistics are sent to the configured reporting endpoint:
|
||||
| `python_version` | string | The Python version number in use (e.g "3.7.1"). Taken from `sys.version_info`. |
|
||||
| `total_users` | int | The number of registered users on the homeserver. |
|
||||
| `total_nonbridged_users` | int | The number of users, excluding those created by an Application Service. |
|
||||
| `daily_user_type_native` | int | The number of native users created in the last 24 hours. |
|
||||
| `daily_user_type_native` | int | The number of native, non-guest users created in the last 24 hours. |
|
||||
| `daily_user_type_guest` | int | The number of guest users created in the last 24 hours. |
|
||||
| `daily_user_type_bridged` | int | The number of users created by Application Services in the last 24 hours. |
|
||||
| `total_room_count` | int | The total number of rooms present on the homeserver. |
|
||||
| `total_event_count` | int | The total number of events present on the homeserver. |
|
||||
| `total_message_count` | int | The total number of non-state events with type `m.room.message` present on the homeserver. |
|
||||
| `total_e2ee_event_count` | int | The total number of non-state events with type `m.room.encrypted` present on the homeserver. This can be used as a slight over-estimate for the number of encrypted messages. |
|
||||
| `daily_active_users` | int | The number of unique users[^1] that have used the homeserver in the last 24 hours. |
|
||||
| `monthly_active_users` | int | The number of unique users[^1] that have used the homeserver in the last 30 days. |
|
||||
| `daily_active_rooms` | int | The number of rooms that have had a (state) event with the type `m.room.message` sent in them in the last 24 hours. |
|
||||
@@ -50,8 +53,8 @@ The following statistics are sent to the configured reporting endpoint:
|
||||
| `cache_factor` | int | The configured [`global factor`](../../configuration/config_documentation.md#caching) value for caching. |
|
||||
| `event_cache_size` | int | The configured [`event_cache_size`](../../configuration/config_documentation.md#caching) value for caching. |
|
||||
| `database_engine` | string | The database engine that is in use. Either "psycopg2" meaning PostgreSQL is in use, or "sqlite3" for SQLite3. |
|
||||
| `database_server_version` | string | The version of the database server. Examples being "10.10" for PostgreSQL server version 10.0, and "3.38.5" for SQLite 3.38.5 installed on the system. |
|
||||
| `log_level` | string | The log level in use. Examples are "INFO", "WARNING", "ERROR", "DEBUG", etc. |
|
||||
| `database_server_version` | string | The version of the database server. Examples being "10.10" for PostgreSQL server version 10.0, and "3.38.5" for SQLite 3.38.5 installed on the system. |
|
||||
| `log_level` | string | The log level in use. Examples are "INFO", "WARNING", "ERROR", "DEBUG", etc. |
|
||||
|
||||
|
||||
[^1]: Native matrix users and guests are always counted. If the
|
||||
|
||||
@@ -34,6 +34,22 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger("synapse.app.homeserver")
|
||||
|
||||
ONE_MINUTE_SECONDS = 60
|
||||
ONE_HOUR_SECONDS = 60 * ONE_MINUTE_SECONDS
|
||||
|
||||
MILLISECONDS_PER_SECOND = 1000
|
||||
|
||||
INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME_SECONDS = 5 * ONE_MINUTE_SECONDS
|
||||
"""
|
||||
We wait 5 minutes to send the first set of stats as the server can be quite busy the
|
||||
first few minutes
|
||||
"""
|
||||
|
||||
PHONE_HOME_INTERVAL_SECONDS = 3 * ONE_HOUR_SECONDS
|
||||
"""
|
||||
Phone home stats are sent every 3 hours
|
||||
"""
|
||||
|
||||
# Contains the list of processes we will be monitoring
|
||||
# currently either 0 or 1
|
||||
_stats_process: List[Tuple[int, "resource.struct_rusage"]] = []
|
||||
@@ -121,6 +137,9 @@ async def phone_stats_home(
|
||||
|
||||
room_count = await store.get_room_count()
|
||||
stats["total_room_count"] = room_count
|
||||
stats["total_event_count"] = await store.count_total_events()
|
||||
stats["total_message_count"] = await store.count_total_messages()
|
||||
stats["total_e2ee_event_count"] = await store.count_total_e2ee_events()
|
||||
|
||||
stats["daily_active_users"] = common_metrics.daily_active_users
|
||||
stats["monthly_active_users"] = await store.count_monthly_users()
|
||||
@@ -185,12 +204,14 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
|
||||
# If you increase the loop period, the accuracy of user_daily_visits
|
||||
# table will decrease
|
||||
clock.looping_call(
|
||||
hs.get_datastores().main.generate_user_daily_visits, 5 * 60 * 1000
|
||||
hs.get_datastores().main.generate_user_daily_visits,
|
||||
5 * ONE_MINUTE_SECONDS * MILLISECONDS_PER_SECOND,
|
||||
)
|
||||
|
||||
# monthly active user limiting functionality
|
||||
clock.looping_call(
|
||||
hs.get_datastores().main.reap_monthly_active_users, 1000 * 60 * 60
|
||||
hs.get_datastores().main.reap_monthly_active_users,
|
||||
ONE_HOUR_SECONDS * MILLISECONDS_PER_SECOND,
|
||||
)
|
||||
hs.get_datastores().main.reap_monthly_active_users()
|
||||
|
||||
@@ -216,12 +237,20 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
|
||||
|
||||
if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only:
|
||||
generate_monthly_active_users()
|
||||
clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
|
||||
clock.looping_call(
|
||||
generate_monthly_active_users,
|
||||
5 * ONE_MINUTE_SECONDS * MILLISECONDS_PER_SECOND,
|
||||
)
|
||||
# End of monthly active user settings
|
||||
|
||||
if hs.config.metrics.report_stats:
|
||||
logger.info("Scheduling stats reporting for 3 hour intervals")
|
||||
clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000, hs, stats)
|
||||
clock.looping_call(
|
||||
phone_stats_home,
|
||||
PHONE_HOME_INTERVAL_SECONDS * MILLISECONDS_PER_SECOND,
|
||||
hs,
|
||||
stats,
|
||||
)
|
||||
|
||||
# We need to defer this init for the cases that we daemonize
|
||||
# otherwise the process ID we get is that of the non-daemon process
|
||||
@@ -229,4 +258,6 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
|
||||
|
||||
# We wait 5 minutes to send the first set of stats as the server can
|
||||
# be quite busy the first few minutes
|
||||
clock.call_later(5 * 60, phone_stats_home, hs, stats)
|
||||
clock.call_later(
|
||||
INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME_SECONDS, phone_stats_home, hs, stats
|
||||
)
|
||||
|
||||
@@ -378,6 +378,8 @@ class RoomSendEventRestServlet(TransactionRestServlet):
|
||||
) -> Tuple[int, JsonDict]:
|
||||
content = parse_json_object_from_request(request)
|
||||
|
||||
logger.info("asdf event send in %s (%s)", room_id, content)
|
||||
|
||||
origin_server_ts = None
|
||||
if requester.app_service:
|
||||
origin_server_ts = parse_integer(request, "ts")
|
||||
@@ -419,6 +421,8 @@ class RoomSendEventRestServlet(TransactionRestServlet):
|
||||
except ShadowBanError:
|
||||
event_id = generate_fake_event_id()
|
||||
|
||||
logger.info("asdf event send DONE in %s (%s) -> %s", room_id, content, event_id)
|
||||
|
||||
set_tag("event_id", event_id)
|
||||
return 200, {"event_id": event_id}
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ from synapse.storage.databases.main.events_worker import (
|
||||
)
|
||||
from synapse.storage.databases.main.state_deltas import StateDeltasStore
|
||||
from synapse.storage.databases.main.stream import StreamWorkerStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection
|
||||
from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
|
||||
@@ -311,6 +311,12 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
||||
self._sliding_sync_membership_snapshots_fix_forgotten_column_bg_update,
|
||||
)
|
||||
|
||||
# Add a background update to add triggers which track event counts.
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
self._event_stats_populate_counts_bg_update,
|
||||
)
|
||||
|
||||
# We want this to run on the main database at startup before we start processing
|
||||
# events.
|
||||
#
|
||||
@@ -2547,6 +2553,327 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
||||
|
||||
return num_rows
|
||||
|
||||
async def _event_stats_populate_counts_bg_update(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
"""
|
||||
Background update to populate the `event_stats` table with initial
|
||||
values, and register DB triggers to continue updating it.
|
||||
|
||||
We first register TRIGGERs on rows being added/removed from the `events` table,
|
||||
which will keep the event counts continuously updated. We also mark the stopping
|
||||
point for the main population step so we don't double count events.
|
||||
|
||||
Then we will iterate through the `events` table in batches and update event
|
||||
counts until we reach the stopping point.
|
||||
|
||||
It's safe to run this background update multiple times (start with an empty
|
||||
`progress_json`).
|
||||
|
||||
This data is intended to be used by the phone-home stats to keep track
|
||||
of total event and message counts. A trigger is preferred to counting
|
||||
rows in the `events` table, as said table can grow quite large.
|
||||
|
||||
It is also preferable to adding an index on the `events` table, as even
|
||||
an index can grow large. And calculating total counts would require
|
||||
querying that entire index.
|
||||
"""
|
||||
# The last event `stream_ordering` we processed (starting place of this next
|
||||
# batch). Since we're processing things in `stream_ordering` ascending order,
|
||||
# this should be the maximum `stream_ordering` we processed.
|
||||
last_event_stream_ordering = progress.get(
|
||||
"last_event_stream_ordering", -(1 << 31)
|
||||
)
|
||||
# The event `stream_ordering` we should stop at. This is used to avoid double
|
||||
# counting events that are already accounted for because of the triggers.
|
||||
stop_event_stream_ordering: Optional[int] = progress.get(
|
||||
"stop_event_stream_ordering", None
|
||||
)
|
||||
|
||||
def _add_triggers_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Optional[int]:
|
||||
"""
|
||||
Adds the triggers to the `events` table to keep the `event_stats` counts
|
||||
up-to-date.
|
||||
|
||||
Also populates the `stop_event_stream_ordering` background update progress
|
||||
value. This marks the point at which we added the triggers, so we can avoid
|
||||
double counting events that are already accounted for in the population
|
||||
step.
|
||||
|
||||
Returns:
|
||||
The latest event `stream_ordering` in the `events` table when the triggers
|
||||
were added or `None` if the `events` table is empty.
|
||||
"""
|
||||
# Clear the `event_stats` table so we can start fresh (the background update
|
||||
# may have been run before)
|
||||
txn.execute(
|
||||
"""
|
||||
UPDATE event_stats
|
||||
SET
|
||||
total_event_count = 0,
|
||||
unencrypted_message_count = 0,
|
||||
e2ee_event_count = 0
|
||||
"""
|
||||
)
|
||||
|
||||
# Each time an event is inserted into the `events` table, update the stats.
|
||||
#
|
||||
# We're using `AFTER` triggers as we want to count successful inserts/deletes and
|
||||
# not the ones that could potentially fail.
|
||||
if isinstance(txn.database_engine, Sqlite3Engine):
|
||||
txn.execute(
|
||||
"""
|
||||
CREATE TRIGGER IF NOT EXISTS event_stats_events_insert_trigger
|
||||
AFTER INSERT ON events
|
||||
BEGIN
|
||||
-- Always increment total_event_count
|
||||
UPDATE event_stats SET total_event_count = total_event_count + 1;
|
||||
|
||||
-- Increment unencrypted_message_count for m.room.message events
|
||||
UPDATE event_stats
|
||||
SET unencrypted_message_count = unencrypted_message_count + 1
|
||||
WHERE NEW.type = 'm.room.message' AND NEW.state_key IS NULL;
|
||||
|
||||
-- Increment e2ee_event_count for m.room.encrypted events
|
||||
UPDATE event_stats
|
||||
SET e2ee_event_count = e2ee_event_count + 1
|
||||
WHERE NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL;
|
||||
END;
|
||||
"""
|
||||
)
|
||||
|
||||
txn.execute(
|
||||
"""
|
||||
CREATE TRIGGER IF NOT EXISTS event_stats_events_delete_trigger
|
||||
AFTER DELETE ON events
|
||||
BEGIN
|
||||
-- Always decrement total_event_count
|
||||
UPDATE event_stats SET total_event_count = total_event_count - 1;
|
||||
|
||||
-- Decrement unencrypted_message_count for m.room.message events
|
||||
UPDATE event_stats
|
||||
SET unencrypted_message_count = unencrypted_message_count - 1
|
||||
WHERE OLD.type = 'm.room.message' AND OLD.state_key IS NULL;
|
||||
|
||||
-- Decrement e2ee_event_count for m.room.encrypted events
|
||||
UPDATE event_stats
|
||||
SET e2ee_event_count = e2ee_event_count - 1
|
||||
WHERE OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL;
|
||||
END;
|
||||
"""
|
||||
)
|
||||
elif isinstance(txn.database_engine, PostgresEngine):
|
||||
txn.execute(
|
||||
"""
|
||||
CREATE OR REPLACE FUNCTION event_stats_increment_counts() RETURNS trigger AS $BODY$
|
||||
BEGIN
|
||||
IF TG_OP = 'INSERT' THEN
|
||||
-- Always increment total_event_count
|
||||
UPDATE event_stats SET total_event_count = total_event_count + 1;
|
||||
|
||||
-- Increment unencrypted_message_count for m.room.message events
|
||||
IF NEW.type = 'm.room.message' AND NEW.state_key IS NULL THEN
|
||||
UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count + 1;
|
||||
END IF;
|
||||
|
||||
-- Increment e2ee_event_count for m.room.encrypted events
|
||||
IF NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL THEN
|
||||
UPDATE event_stats SET e2ee_event_count = e2ee_event_count + 1;
|
||||
END IF;
|
||||
|
||||
-- We're not modifying the row being inserted/deleted, so we return it unchanged.
|
||||
RETURN NEW;
|
||||
|
||||
ELSIF TG_OP = 'DELETE' THEN
|
||||
-- Always decrement total_event_count
|
||||
UPDATE event_stats SET total_event_count = total_event_count - 1;
|
||||
|
||||
-- Decrement unencrypted_message_count for m.room.message events
|
||||
IF OLD.type = 'm.room.message' AND OLD.state_key IS NULL THEN
|
||||
UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count - 1;
|
||||
END IF;
|
||||
|
||||
-- Decrement e2ee_event_count for m.room.encrypted events
|
||||
IF OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL THEN
|
||||
UPDATE event_stats SET e2ee_event_count = e2ee_event_count - 1;
|
||||
END IF;
|
||||
|
||||
-- "The usual idiom in DELETE triggers is to return OLD."
|
||||
-- (https://www.postgresql.org/docs/current/plpgsql-trigger.html)
|
||||
RETURN OLD;
|
||||
END IF;
|
||||
|
||||
RAISE EXCEPTION 'update_event_stats() was run with unexpected operation (%%). '
|
||||
'This indicates a trigger misconfiguration as this function should only'
|
||||
'run with INSERT/DELETE operations.', TG_OP;
|
||||
END;
|
||||
$BODY$ LANGUAGE plpgsql;
|
||||
"""
|
||||
)
|
||||
|
||||
# We could use `CREATE OR REPLACE TRIGGER` but that's only available in Postgres
|
||||
# 14 (https://www.postgresql.org/docs/14/sql-createtrigger.html)
|
||||
txn.execute(
|
||||
"""
|
||||
DO
|
||||
$$BEGIN
|
||||
CREATE TRIGGER event_stats_increment_counts_trigger
|
||||
AFTER INSERT OR DELETE ON events
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE event_stats_increment_counts();
|
||||
EXCEPTION
|
||||
-- This acts as a "CREATE TRIGGER IF NOT EXISTS" for Postgres
|
||||
WHEN duplicate_object THEN
|
||||
NULL;
|
||||
END;$$;
|
||||
"""
|
||||
)
|
||||
else:
|
||||
raise NotImplementedError("Unknown database engine")
|
||||
|
||||
# Find the latest `stream_ordering` in the `events` table. We need to do
|
||||
# this in the same transaction as where we add the triggers so we don't miss
|
||||
# any events.
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT stream_ordering
|
||||
FROM events
|
||||
ORDER BY stream_ordering DESC
|
||||
LIMIT 1
|
||||
"""
|
||||
)
|
||||
row = cast(Optional[Tuple[int]], txn.fetchone())
|
||||
|
||||
# Update the progress
|
||||
if row is not None:
|
||||
(max_stream_ordering,) = row
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn,
|
||||
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
{"stop_event_stream_ordering": max_stream_ordering},
|
||||
)
|
||||
return max_stream_ordering
|
||||
|
||||
return None
|
||||
|
||||
# First, add the triggers to keep the `event_stats` values up-to-date.
|
||||
#
|
||||
# If we don't have a `stop_event_stream_ordering` yet, we need to add the
|
||||
# triggers to the `events` table and set the stopping point so we don't
|
||||
# double count `events` later.
|
||||
if stop_event_stream_ordering is None:
|
||||
stop_event_stream_ordering = await self.db_pool.runInteraction(
|
||||
"_event_stats_populate_counts_bg_update_add_triggers",
|
||||
_add_triggers_txn,
|
||||
)
|
||||
|
||||
# If there is no `stop_event_stream_ordering`, then there are no events
|
||||
# in the `events` table and we can end the background update altogether.
|
||||
if stop_event_stream_ordering is None:
|
||||
await self.db_pool.updates._end_background_update(
|
||||
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE
|
||||
)
|
||||
return batch_size
|
||||
|
||||
def _populate_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> int:
|
||||
"""
|
||||
Updates the `event_stats` table from this batch of events.
|
||||
"""
|
||||
|
||||
# Increment the counts based on the events present in this batch.
|
||||
update_event_stats_sql = """
|
||||
WITH event_batch AS (
|
||||
SELECT *
|
||||
FROM events
|
||||
WHERE stream_ordering > ? AND stream_ordering <= ?
|
||||
ORDER BY stream_ordering ASC
|
||||
LIMIT ?
|
||||
),
|
||||
batch_stats AS (
|
||||
SELECT
|
||||
MAX(stream_ordering) AS max_stream_ordering,
|
||||
COALESCE(COUNT(*), 0) AS total_event_count,
|
||||
COALESCE(SUM(CASE WHEN type = 'm.room.message' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS unencrypted_message_count,
|
||||
COALESCE(SUM(CASE WHEN type = 'm.room.encrypted' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS e2ee_event_count
|
||||
FROM event_batch
|
||||
|
||||
UNION ALL
|
||||
|
||||
SELECT null, 0, 0, 0
|
||||
WHERE NOT EXISTS (SELECT 1 FROM event_batch)
|
||||
LIMIT 1
|
||||
)
|
||||
UPDATE event_stats
|
||||
SET
|
||||
total_event_count = total_event_count + (SELECT total_event_count FROM batch_stats),
|
||||
unencrypted_message_count = unencrypted_message_count + (SELECT unencrypted_message_count FROM batch_stats),
|
||||
e2ee_event_count = e2ee_event_count + (SELECT e2ee_event_count FROM batch_stats)
|
||||
"""
|
||||
if self.db_pool.engine.supports_returning:
|
||||
txn.execute(
|
||||
f"""
|
||||
{update_event_stats_sql}
|
||||
RETURNING
|
||||
(SELECT total_event_count FROM batch_stats) AS total_event_count,
|
||||
(SELECT max_stream_ordering FROM batch_stats) AS max_stream_ordering
|
||||
""",
|
||||
(
|
||||
last_event_stream_ordering,
|
||||
stop_event_stream_ordering,
|
||||
batch_size,
|
||||
),
|
||||
)
|
||||
else:
|
||||
txn.execute(update_event_stats_sql)
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT COUNT(*), MAX(stream_ordering)
|
||||
FROM events
|
||||
WHERE stream_ordering > ? AND stream_ordering <= ?
|
||||
ORDER BY stream_ordering ASC
|
||||
LIMIT ?
|
||||
""",
|
||||
(
|
||||
last_event_stream_ordering,
|
||||
stop_event_stream_ordering,
|
||||
batch_size,
|
||||
),
|
||||
)
|
||||
|
||||
# Get the results of the update
|
||||
(total_event_count, max_stream_ordering) = cast(
|
||||
Tuple[int, Optional[int]], txn.fetchone()
|
||||
)
|
||||
|
||||
# Update the progress
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn,
|
||||
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
{
|
||||
"last_event_stream_ordering": max_stream_ordering,
|
||||
"stop_event_stream_ordering": stop_event_stream_ordering,
|
||||
},
|
||||
)
|
||||
|
||||
return total_event_count
|
||||
|
||||
num_rows_processed = await self.db_pool.runInteraction(
|
||||
"_event_stats_populate_counts_bg_update",
|
||||
_populate_txn,
|
||||
)
|
||||
|
||||
# No more rows to process, so our background update is complete.
|
||||
if not num_rows_processed:
|
||||
await self.db_pool.updates._end_background_update(
|
||||
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE
|
||||
)
|
||||
|
||||
return batch_size
|
||||
|
||||
|
||||
def _resolve_stale_data_in_sliding_sync_tables(
|
||||
txn: LoggingTransaction,
|
||||
|
||||
@@ -126,6 +126,44 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
|
||||
|
||||
return await self.db_pool.runInteraction("count_e2ee_messages", _count_messages)
|
||||
|
||||
async def count_total_events(self) -> int:
|
||||
"""
|
||||
Returns the total number of events present on the server.
|
||||
"""
|
||||
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="event_stats",
|
||||
keyvalues={},
|
||||
retcol="total_event_count",
|
||||
desc="count_total_events",
|
||||
)
|
||||
|
||||
async def count_total_messages(self) -> int:
|
||||
"""
|
||||
Returns the total number of `m.room.message` events present on the
|
||||
server.
|
||||
"""
|
||||
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="event_stats",
|
||||
keyvalues={},
|
||||
retcol="unencrypted_message_count",
|
||||
desc="count_total_messages",
|
||||
)
|
||||
|
||||
async def count_total_e2ee_events(self) -> int:
|
||||
"""
|
||||
Returns the total number of `m.room.encrypted` events present on the
|
||||
server.
|
||||
"""
|
||||
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="event_stats",
|
||||
keyvalues={},
|
||||
retcol="e2ee_event_count",
|
||||
desc="count_total_e2ee_events",
|
||||
)
|
||||
|
||||
async def count_daily_sent_e2ee_messages(self) -> int:
|
||||
def _count_messages(txn: LoggingTransaction) -> int:
|
||||
# This is good enough as if you have silly characters in your own
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
#
|
||||
#
|
||||
|
||||
SCHEMA_VERSION = 91 # remember to update the list below when updating
|
||||
SCHEMA_VERSION = 92 # remember to update the list below when updating
|
||||
"""Represents the expectations made by the codebase about the database schema
|
||||
|
||||
This should be incremented whenever the codebase changes its requirements on the
|
||||
@@ -162,6 +162,12 @@ Changes in SCHEMA_VERSION = 89
|
||||
Changes in SCHEMA_VERSION = 90
|
||||
- Add a column `participant` to `room_memberships` table
|
||||
- Add background update to delete unreferenced state groups.
|
||||
|
||||
Changes in SCHEMA_VERSION = 91
|
||||
- TODO
|
||||
|
||||
Changes in SCHEMA_VERSION = 92
|
||||
- Add `event_stats` table to store global event statistics like total counts
|
||||
"""
|
||||
|
||||
|
||||
|
||||
33
synapse/storage/schema/main/delta/92/01_event_stats.sql
Normal file
33
synapse/storage/schema/main/delta/92/01_event_stats.sql
Normal file
@@ -0,0 +1,33 @@
|
||||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2025 New Vector, Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
|
||||
-- Create the `event_stats` table to store these statistics.
|
||||
CREATE TABLE event_stats (
|
||||
total_event_count INTEGER NOT NULL DEFAULT 0,
|
||||
unencrypted_message_count INTEGER NOT NULL DEFAULT 0,
|
||||
e2ee_event_count INTEGER NOT NULL DEFAULT 0
|
||||
);
|
||||
|
||||
-- Insert initial values into the table.
|
||||
INSERT INTO event_stats (
|
||||
total_event_count,
|
||||
unencrypted_message_count,
|
||||
e2ee_event_count
|
||||
) VALUES (0, 0, 0);
|
||||
|
||||
-- Add a background update to populate the `event_stats` table with the current counts
|
||||
-- from the `events` table and add triggers to keep this count up-to-date.
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||
(9201, 'event_stats_populate_counts_bg_update', '{}');
|
||||
|
||||
@@ -52,3 +52,5 @@ class _BackgroundUpdates:
|
||||
MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE = (
|
||||
"mark_unreferenced_state_groups_for_deletion_bg_update"
|
||||
)
|
||||
|
||||
EVENT_STATS_POPULATE_COUNTS_BG_UPDATE = "event_stats_populate_counts_bg_update"
|
||||
|
||||
@@ -2378,7 +2378,7 @@ class GetRoomMembershipForUserAtToTokenTestCase(HomeserverTestCase):
|
||||
class GetRoomMembershipForUserAtToTokenShardTestCase(BaseMultiWorkerStreamTestCase):
|
||||
"""
|
||||
Tests Sliding Sync handler `get_room_membership_for_user_at_to_token()` to make sure it works with
|
||||
sharded event stream_writers enabled
|
||||
sharded event stream_writers enabled ("event_persisters").
|
||||
"""
|
||||
|
||||
servlets = [
|
||||
|
||||
264
tests/metrics/test_phone_home_stats.py
Normal file
264
tests/metrics/test_phone_home_stats.py
Normal file
@@ -0,0 +1,264 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
import logging
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.app.phone_stats_home import (
|
||||
PHONE_HOME_INTERVAL_SECONDS,
|
||||
start_phone_stats_home,
|
||||
)
|
||||
from synapse.rest import admin, login, register, room
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests import unittest
|
||||
from tests.server import ThreadedMemoryReactorClock
|
||||
|
||||
TEST_REPORT_STATS_ENDPOINT = "https://fake.endpoint/stats"
|
||||
TEST_SERVER_CONTEXT = "test-server-context"
|
||||
|
||||
|
||||
class PhoneHomeStatsTestCase(unittest.HomeserverTestCase):
|
||||
servlets = [
|
||||
admin.register_servlets_for_client_rest_resource,
|
||||
room.register_servlets,
|
||||
register.register_servlets,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def make_homeserver(
|
||||
self, reactor: ThreadedMemoryReactorClock, clock: Clock
|
||||
) -> HomeServer:
|
||||
# Configure the homeserver to enable stats reporting.
|
||||
config = self.default_config()
|
||||
config["report_stats"] = True
|
||||
config["report_stats_endpoint"] = TEST_REPORT_STATS_ENDPOINT
|
||||
|
||||
# Configure the server context so we can check it ends up being reported
|
||||
config["server_context"] = TEST_SERVER_CONTEXT
|
||||
|
||||
# Allow guests to be registered
|
||||
config["allow_guest_access"] = True
|
||||
|
||||
hs = self.setup_test_homeserver(config=config)
|
||||
|
||||
# Replace the proxied http client with a mock, so we can inspect outbound requests to
|
||||
# the configured stats endpoint.
|
||||
self.put_json_mock = AsyncMock(return_value={})
|
||||
hs.get_proxied_http_client().put_json = self.put_json_mock # type: ignore[method-assign]
|
||||
return hs
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
# Wait for the background updates to add the database triggers that keep the
|
||||
# `event_stats` table up-to-date.
|
||||
self.wait_for_background_updates()
|
||||
|
||||
# Force stats reporting to occur
|
||||
start_phone_stats_home(hs=hs)
|
||||
|
||||
super().prepare(reactor, clock, hs)
|
||||
|
||||
def _get_latest_phone_home_stats(self) -> JsonDict:
|
||||
# Wait for `phone_stats_home` to be called again + a healthy margin (50s).
|
||||
self.reactor.advance(2 * PHONE_HOME_INTERVAL_SECONDS + 50)
|
||||
|
||||
# Extract the reported stats from our http client mock
|
||||
mock_calls = self.put_json_mock.call_args_list
|
||||
report_stats_calls = []
|
||||
for call in mock_calls:
|
||||
if call.args[0] == TEST_REPORT_STATS_ENDPOINT:
|
||||
report_stats_calls.append(call)
|
||||
|
||||
self.assertGreaterEqual(
|
||||
(len(report_stats_calls)),
|
||||
1,
|
||||
"Expected at-least one call to the report_stats endpoint",
|
||||
)
|
||||
|
||||
# Extract the phone home stats from the call
|
||||
phone_home_stats = report_stats_calls[0].args[1]
|
||||
|
||||
return phone_home_stats
|
||||
|
||||
def _perform_user_actions(self) -> None:
|
||||
"""
|
||||
Perform some actions on the homeserver that would bump the phone home
|
||||
stats.
|
||||
|
||||
This creates a few users (including a guest), a room, and sends some messages.
|
||||
Expected number of events:
|
||||
- 10 unencrypted messages
|
||||
- 5 encrypted messages
|
||||
- 24 total events (including room state, etc)
|
||||
"""
|
||||
|
||||
# Create some users
|
||||
user_1_mxid = self.register_user(
|
||||
username="test_user_1",
|
||||
password="test",
|
||||
)
|
||||
user_2_mxid = self.register_user(
|
||||
username="test_user_2",
|
||||
password="test",
|
||||
)
|
||||
# Note: `self.register_user` does not support guest registration, and updating the
|
||||
# Admin API it calls to add a new parameter would cause the `mac` parameter to fail
|
||||
# in a backwards-incompatible manner. Hence, we make a manual request here.
|
||||
_guest_user_mxid = self.make_request(
|
||||
method="POST",
|
||||
path="/_matrix/client/v3/register?kind=guest",
|
||||
content={
|
||||
"username": "guest_user",
|
||||
"password": "test",
|
||||
},
|
||||
shorthand=False,
|
||||
)
|
||||
|
||||
# Log in to each user
|
||||
user_1_token = self.login(username=user_1_mxid, password="test")
|
||||
user_2_token = self.login(username=user_2_mxid, password="test")
|
||||
|
||||
# Create a room between the two users
|
||||
room_1_id = self.helper.create_room_as(
|
||||
is_public=False,
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# Mark this room as end-to-end encrypted
|
||||
self.helper.send_state(
|
||||
room_id=room_1_id,
|
||||
event_type="m.room.encryption",
|
||||
body={
|
||||
"algorithm": "m.megolm.v1.aes-sha2",
|
||||
"rotation_period_ms": 604800000,
|
||||
"rotation_period_msgs": 100,
|
||||
},
|
||||
state_key="",
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 1 invites user 2
|
||||
self.helper.invite(
|
||||
room=room_1_id,
|
||||
src=user_1_mxid,
|
||||
targ=user_2_mxid,
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 2 joins
|
||||
self.helper.join(
|
||||
room=room_1_id,
|
||||
user=user_2_mxid,
|
||||
tok=user_2_token,
|
||||
)
|
||||
|
||||
# User 1 sends 10 unencrypted messages
|
||||
for _ in range(10):
|
||||
self.helper.send(
|
||||
room_id=room_1_id,
|
||||
body="Zoinks Scoob! A message!",
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 2 sends 5 encrypted "messages"
|
||||
for _ in range(5):
|
||||
self.helper.send_event(
|
||||
room_id=room_1_id,
|
||||
type="m.room.encrypted",
|
||||
content={
|
||||
"algorithm": "m.olm.v1.curve25519-aes-sha2",
|
||||
"sender_key": "some_key",
|
||||
"ciphertext": {
|
||||
"some_key": {
|
||||
"type": 0,
|
||||
"body": "encrypted_payload",
|
||||
},
|
||||
},
|
||||
},
|
||||
tok=user_2_token,
|
||||
)
|
||||
|
||||
def test_phone_home_stats(self) -> None:
|
||||
"""
|
||||
Test that the phone home stats contain the stats we expect based on
|
||||
the scenario carried out in `prepare`
|
||||
"""
|
||||
# Do things to bump the stats
|
||||
self._perform_user_actions()
|
||||
|
||||
# Wait for the stats to be reported
|
||||
phone_home_stats = self._get_latest_phone_home_stats()
|
||||
|
||||
self.assertEqual(
|
||||
phone_home_stats["homeserver"], self.hs.config.server.server_name
|
||||
)
|
||||
|
||||
self.assertTrue(isinstance(phone_home_stats["memory_rss"], int))
|
||||
self.assertTrue(isinstance(phone_home_stats["cpu_average"], int))
|
||||
|
||||
self.assertEqual(phone_home_stats["server_context"], TEST_SERVER_CONTEXT)
|
||||
|
||||
self.assertTrue(isinstance(phone_home_stats["timestamp"], int))
|
||||
self.assertTrue(isinstance(phone_home_stats["uptime_seconds"], int))
|
||||
self.assertTrue(isinstance(phone_home_stats["python_version"], str))
|
||||
|
||||
# We expect only our test users to exist on the homeserver
|
||||
self.assertEqual(phone_home_stats["total_users"], 3)
|
||||
self.assertEqual(phone_home_stats["total_nonbridged_users"], 3)
|
||||
self.assertEqual(phone_home_stats["daily_user_type_native"], 2)
|
||||
self.assertEqual(phone_home_stats["daily_user_type_guest"], 1)
|
||||
self.assertEqual(phone_home_stats["daily_user_type_bridged"], 0)
|
||||
self.assertEqual(phone_home_stats["total_room_count"], 1)
|
||||
self.assertEqual(phone_home_stats["total_event_count"], 24)
|
||||
self.assertEqual(phone_home_stats["total_message_count"], 10)
|
||||
self.assertEqual(phone_home_stats["total_e2ee_event_count"], 5)
|
||||
self.assertEqual(phone_home_stats["daily_active_users"], 2)
|
||||
self.assertEqual(phone_home_stats["monthly_active_users"], 2)
|
||||
self.assertEqual(phone_home_stats["daily_active_rooms"], 1)
|
||||
self.assertEqual(phone_home_stats["daily_active_e2ee_rooms"], 1)
|
||||
self.assertEqual(phone_home_stats["daily_messages"], 10)
|
||||
self.assertEqual(phone_home_stats["daily_e2ee_messages"], 5)
|
||||
self.assertEqual(phone_home_stats["daily_sent_messages"], 10)
|
||||
self.assertEqual(phone_home_stats["daily_sent_e2ee_messages"], 5)
|
||||
|
||||
# Our users have not been around for >30 days, hence these are all 0.
|
||||
self.assertEqual(phone_home_stats["r30v2_users_all"], 0)
|
||||
self.assertEqual(phone_home_stats["r30v2_users_android"], 0)
|
||||
self.assertEqual(phone_home_stats["r30v2_users_ios"], 0)
|
||||
self.assertEqual(phone_home_stats["r30v2_users_electron"], 0)
|
||||
self.assertEqual(phone_home_stats["r30v2_users_web"], 0)
|
||||
self.assertEqual(
|
||||
phone_home_stats["cache_factor"], self.hs.config.caches.global_factor
|
||||
)
|
||||
self.assertEqual(
|
||||
phone_home_stats["event_cache_size"],
|
||||
self.hs.config.caches.event_cache_size,
|
||||
)
|
||||
self.assertEqual(
|
||||
phone_home_stats["database_engine"],
|
||||
self.hs.config.database.databases[0].config["name"],
|
||||
)
|
||||
self.assertEqual(
|
||||
phone_home_stats["database_server_version"],
|
||||
self.hs.get_datastores().main.database_engine.server_version,
|
||||
)
|
||||
|
||||
synapse_logger = logging.getLogger("synapse")
|
||||
log_level = synapse_logger.getEffectiveLevel()
|
||||
self.assertEqual(phone_home_stats["log_level"], logging.getLevelName(log_level))
|
||||
@@ -486,7 +486,7 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.threadpool = ThreadPool(self)
|
||||
self.threadpool = ThreadlessThreadPool(self)
|
||||
|
||||
self._tcp_callbacks: Dict[Tuple[str, int], Callable] = {}
|
||||
self._udp: List[udp.Port] = []
|
||||
@@ -733,12 +733,12 @@ def make_fake_db_pool(
|
||||
pool.runWithConnection = runWithConnection # type: ignore[method-assign]
|
||||
pool.runInteraction = runInteraction # type: ignore[assignment]
|
||||
# Replace the thread pool with a threadless 'thread' pool
|
||||
pool.threadpool = ThreadPool(reactor)
|
||||
pool.threadpool = ThreadlessThreadPool(reactor)
|
||||
pool.running = True
|
||||
return pool
|
||||
|
||||
|
||||
class ThreadPool:
|
||||
class ThreadlessThreadPool:
|
||||
"""
|
||||
Threadless thread pool.
|
||||
|
||||
|
||||
544
tests/storage/test_event_stats.py
Normal file
544
tests/storage/test_event_stats.py
Normal file
@@ -0,0 +1,544 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
import logging
|
||||
import time
|
||||
from unittest.mock import patch
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.logging.context import defer_to_thread
|
||||
from synapse.rest import admin, login, register, room
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.database import (
|
||||
LoggingTransaction,
|
||||
)
|
||||
from synapse.types.storage import _BackgroundUpdates
|
||||
from synapse.util import Clock
|
||||
from synapse.logging.context import (
|
||||
PreserveLoggingContext,
|
||||
make_deferred_yieldable,
|
||||
run_coroutine_in_background,
|
||||
run_in_background,
|
||||
)
|
||||
|
||||
from tests import unittest
|
||||
from tests.replication._base import BaseMultiWorkerStreamTestCase
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EventStatsTestCase(unittest.HomeserverTestCase):
|
||||
"""
|
||||
Tests for the `event_stats` table
|
||||
"""
|
||||
|
||||
servlets = [
|
||||
admin.register_servlets_for_client_rest_resource,
|
||||
room.register_servlets,
|
||||
register.register_servlets,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.hs = hs
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
# Wait for the background updates to add the database triggers that keep the
|
||||
# `event_stats` table up-to-date.
|
||||
#
|
||||
# This also prevents background updates running during the tests and messing
|
||||
# with the results.
|
||||
self.wait_for_background_updates()
|
||||
|
||||
super().prepare(reactor, clock, hs)
|
||||
|
||||
def _perform_user_actions(self) -> None:
|
||||
"""
|
||||
Perform some actions on the homeserver that would bump the event counts.
|
||||
|
||||
This creates a few users, a room, and sends some messages. Expected number of
|
||||
events:
|
||||
- 10 unencrypted messages
|
||||
- 5 encrypted messages
|
||||
- 24 total events (including room state, etc)
|
||||
"""
|
||||
# Create some users
|
||||
user_1_mxid = self.register_user(
|
||||
username="test_user_1",
|
||||
password="test",
|
||||
)
|
||||
user_2_mxid = self.register_user(
|
||||
username="test_user_2",
|
||||
password="test",
|
||||
)
|
||||
|
||||
# Log in to each user
|
||||
user_1_token = self.login(username=user_1_mxid, password="test")
|
||||
user_2_token = self.login(username=user_2_mxid, password="test")
|
||||
|
||||
# Create a room between the two users
|
||||
room_1_id = self.helper.create_room_as(
|
||||
is_public=False,
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# Mark this room as end-to-end encrypted
|
||||
self.helper.send_state(
|
||||
room_id=room_1_id,
|
||||
event_type="m.room.encryption",
|
||||
body={
|
||||
"algorithm": "m.megolm.v1.aes-sha2",
|
||||
"rotation_period_ms": 604800000,
|
||||
"rotation_period_msgs": 100,
|
||||
},
|
||||
state_key="",
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 1 invites user 2
|
||||
self.helper.invite(
|
||||
room=room_1_id,
|
||||
src=user_1_mxid,
|
||||
targ=user_2_mxid,
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 2 joins
|
||||
self.helper.join(
|
||||
room=room_1_id,
|
||||
user=user_2_mxid,
|
||||
tok=user_2_token,
|
||||
)
|
||||
|
||||
# User 1 sends 10 unencrypted messages
|
||||
for _ in range(10):
|
||||
self.helper.send(
|
||||
room_id=room_1_id,
|
||||
body="Zoinks Scoob! A message!",
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 2 sends 5 encrypted "messages"
|
||||
for _ in range(5):
|
||||
self.helper.send_event(
|
||||
room_id=room_1_id,
|
||||
type="m.room.encrypted",
|
||||
content={
|
||||
"algorithm": "m.olm.v1.curve25519-aes-sha2",
|
||||
"sender_key": "some_key",
|
||||
"ciphertext": {
|
||||
"some_key": {
|
||||
"type": 0,
|
||||
"body": "encrypted_payload",
|
||||
},
|
||||
},
|
||||
},
|
||||
tok=user_2_token,
|
||||
)
|
||||
|
||||
def test_concurrent_event_insert(self) -> None:
|
||||
"""
|
||||
TODO
|
||||
|
||||
Normally, the `events` stream is covered by a single "event_perister" worker but
|
||||
it does experimentally support multiple workers, where load is sharded
|
||||
between them by room ID.
|
||||
|
||||
If we don't pay special attention to this, we will see errors like the following:
|
||||
```
|
||||
psycopg2.errors.SerializationFailure: could not serialize access due to concurrent update
|
||||
CONTEXT: SQL statement "UPDATE event_stats SET total_event_count = total_event_count + 1"
|
||||
```
|
||||
|
||||
This is a regression test for https://github.com/element-hq/synapse/issues/18349
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
room_id1 = self.helper.create_room_as(
|
||||
is_public=False,
|
||||
tok=user1_tok,
|
||||
)
|
||||
|
||||
block = True
|
||||
|
||||
# Create a transaction that interacts with the `event_stats` table
|
||||
def _todo_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> None:
|
||||
nonlocal block
|
||||
while block:
|
||||
logger.info("qwer")
|
||||
# txn.execute(
|
||||
# "UPDATE event_stats SET total_event_count = total_event_count + 1",
|
||||
# )
|
||||
time.sleep(0.1)
|
||||
logger.info("qwer done")
|
||||
|
||||
# We need to return something, so we return None.
|
||||
return None
|
||||
|
||||
# def asdf() -> None:
|
||||
# self.get_success(self.store.db_pool.runInteraction("test", _todo_txn))
|
||||
|
||||
# Start a transaction that is interacting with the `event_stats` table
|
||||
# start_txn = defer_to_thread(self.reactor, asdf)
|
||||
|
||||
start_txn = run_in_background(
|
||||
self.store.db_pool.runInteraction, "test", _todo_txn
|
||||
)
|
||||
|
||||
logger.info("asdf1")
|
||||
_event_response = self.get_success(
|
||||
defer_to_thread(
|
||||
self.reactor, self.helper.send, room_id1, "activity", tok=user1_tok
|
||||
)
|
||||
)
|
||||
logger.info("asdf2")
|
||||
|
||||
block = False
|
||||
|
||||
self.get_success(start_txn)
|
||||
# self.pump(0.1)
|
||||
|
||||
def test_background_update_with_events(self) -> None:
|
||||
"""
|
||||
Test that the background update to populate the `event_stats` table works
|
||||
correctly when there are events in the database.
|
||||
"""
|
||||
# Do things to bump the stats
|
||||
self._perform_user_actions()
|
||||
|
||||
# Since the background update has already run once when Synapse started, let's
|
||||
# manually reset the database `event_stats` back to 0 to ensure this test is
|
||||
# starting from a clean slate. We want to be able to detect 0 -> 24 instead of
|
||||
# 24 -> 24 as it's not possible to prove that any work was actually done if the
|
||||
# number doesn't change.
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_update_one(
|
||||
table="event_stats",
|
||||
keyvalues={},
|
||||
updatevalues={
|
||||
"total_event_count": 0,
|
||||
"unencrypted_message_count": 0,
|
||||
"e2ee_event_count": 0,
|
||||
},
|
||||
desc="reset event_stats in test preparation",
|
||||
)
|
||||
)
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 0)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 0)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 0)
|
||||
|
||||
# Run the background update again
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
"progress_json": "{}",
|
||||
},
|
||||
)
|
||||
)
|
||||
self.store.db_pool.updates._all_done = False
|
||||
self.wait_for_background_updates()
|
||||
|
||||
# Expect our `event_stats` table to be populated with the correct values
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 24)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 10)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 5)
|
||||
|
||||
def test_background_update_without_events(self) -> None:
|
||||
"""
|
||||
Test that the background update to populate the `event_stats` table works
|
||||
correctly without events in the database.
|
||||
"""
|
||||
# Keep in mind: These are already populated as the background update has already
|
||||
# ran once when Synapse started and added the database triggers which are
|
||||
# incrementing things as new events come in.
|
||||
#
|
||||
# In this case, no events have been sent, so we expect the counts to be 0.
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 0)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 0)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 0)
|
||||
|
||||
# Run the background update again
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
"progress_json": "{}",
|
||||
},
|
||||
)
|
||||
)
|
||||
self.store.db_pool.updates._all_done = False
|
||||
self.wait_for_background_updates()
|
||||
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 0)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 0)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 0)
|
||||
|
||||
def test_background_update_resume_progress(self) -> None:
|
||||
"""
|
||||
Test that the background update to populate the `event_stats` table works
|
||||
correctly to resume from `progress_json`.
|
||||
"""
|
||||
# Do things to bump the stats
|
||||
self._perform_user_actions()
|
||||
|
||||
# Keep in mind: These are already populated as the background update has already
|
||||
# ran once when Synapse started and added the database triggers which are
|
||||
# incrementing things as new events come in.
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 24)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 10)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 5)
|
||||
|
||||
# Run the background update again
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
"progress_json": '{ "last_event_stream_ordering": 14, "stop_event_stream_ordering": 21 }',
|
||||
},
|
||||
)
|
||||
)
|
||||
self.store.db_pool.updates._all_done = False
|
||||
self.wait_for_background_updates()
|
||||
|
||||
# We expect these values to increase as the background update is being run
|
||||
# *again* and will double-count some of the `events` over the range specified
|
||||
# by the `progress_json`.
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 24 + 7)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 16)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 6)
|
||||
|
||||
|
||||
class EventStatsConcurrentEventsTestCase(BaseMultiWorkerStreamTestCase):
|
||||
"""
|
||||
Test `event_stats` when events are being inserted/deleted concurrently with sharded
|
||||
event stream_writers enabled ("event_persisters").
|
||||
"""
|
||||
|
||||
servlets = [
|
||||
admin.register_servlets_for_client_rest_resource,
|
||||
room.register_servlets,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def default_config(self) -> dict:
|
||||
config = super().default_config()
|
||||
|
||||
# Enable shared event stream_writers
|
||||
config["stream_writers"] = {"events": ["worker1", "worker2", "worker3"]}
|
||||
config["instance_map"] = {
|
||||
"main": {"host": "testserv", "port": 8765},
|
||||
"worker1": {"host": "testserv", "port": 1001},
|
||||
"worker2": {"host": "testserv", "port": 1002},
|
||||
"worker3": {"host": "testserv", "port": 1003},
|
||||
}
|
||||
return config
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.store = self.hs.get_datastores().main
|
||||
self.event_sources = hs.get_event_sources()
|
||||
|
||||
def _create_room(self, room_id: str, user_id: str, tok: str) -> None:
|
||||
"""
|
||||
Create a room with a specific room_id. We use this so that that we have a
|
||||
consistent room_id across test runs that hashes to the same value and will be
|
||||
sharded to a known worker in the tests.
|
||||
"""
|
||||
|
||||
# We control the room ID generation by patching out the
|
||||
# `_generate_room_id` method
|
||||
with patch(
|
||||
"synapse.handlers.room.RoomCreationHandler._generate_room_id"
|
||||
) as mock:
|
||||
mock.side_effect = lambda: room_id
|
||||
self.helper.create_room_as(user_id, tok=tok)
|
||||
|
||||
def test_concurrent_event_insert(self) -> None:
|
||||
"""
|
||||
TODO
|
||||
|
||||
Normally, the `events` stream is covered by a single "event_perister" worker but
|
||||
it does experimentally support multiple workers, where load is sharded
|
||||
between them by room ID.
|
||||
|
||||
If we don't pay special attention to this, we will see errors like the following:
|
||||
```
|
||||
psycopg2.errors.SerializationFailure: could not serialize access due to concurrent update
|
||||
CONTEXT: SQL statement "UPDATE event_stats SET total_event_count = total_event_count + 1"
|
||||
```
|
||||
|
||||
This is a regression test for https://github.com/element-hq/synapse/issues/18349
|
||||
"""
|
||||
worker_hs1 = self.make_worker_hs(
|
||||
"synapse.app.generic_worker",
|
||||
{"worker_name": "worker1"},
|
||||
)
|
||||
|
||||
worker_hs2 = self.make_worker_hs(
|
||||
"synapse.app.generic_worker",
|
||||
{"worker_name": "worker2"},
|
||||
)
|
||||
|
||||
self.make_worker_hs(
|
||||
"synapse.app.generic_worker",
|
||||
{"worker_name": "worker3"},
|
||||
)
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
room_id1 = self.helper.create_room_as(
|
||||
is_public=False,
|
||||
tok=user1_tok,
|
||||
)
|
||||
|
||||
block = True
|
||||
|
||||
# Create a transaction that interacts with the `event_stats` table
|
||||
def _todo_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> None:
|
||||
nonlocal block
|
||||
while block:
|
||||
logger.info("qwer")
|
||||
# txn.execute(
|
||||
# "UPDATE event_stats SET total_event_count = total_event_count + 1",
|
||||
# )
|
||||
time.sleep(0.1)
|
||||
logger.info("qwer done")
|
||||
|
||||
# We need to return something, so we return None.
|
||||
return None
|
||||
|
||||
# Start a transaction that is interacting with the `event_stats` table
|
||||
#
|
||||
# Try from worker2 which may have it's own thread pool.
|
||||
worker2_store = worker_hs2.get_datastores().main
|
||||
start_txn = run_in_background(
|
||||
worker2_store.db_pool.runInteraction, "test", _todo_txn
|
||||
)
|
||||
|
||||
# Then in room1 (handled by worker1) we send an event.
|
||||
logger.info("asdf1")
|
||||
_event_response = self.helper.send(room_id1, "activity", tok=user1_tok)
|
||||
logger.info("asdf2")
|
||||
|
||||
block = False
|
||||
|
||||
self.get_success(start_txn)
|
||||
# self.pump(0.1)
|
||||
|
||||
def test_sharded_event_persisters(self) -> None:
|
||||
"""
|
||||
TODO
|
||||
|
||||
The test creates three event persister workers and a room that is sharded to
|
||||
each worker.
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
self.make_worker_hs(
|
||||
"synapse.app.generic_worker",
|
||||
{"worker_name": "worker1"},
|
||||
)
|
||||
|
||||
self.make_worker_hs(
|
||||
"synapse.app.generic_worker",
|
||||
{"worker_name": "worker2"},
|
||||
)
|
||||
|
||||
self.make_worker_hs(
|
||||
"synapse.app.generic_worker",
|
||||
{"worker_name": "worker3"},
|
||||
)
|
||||
|
||||
# Specially crafted room IDs that get persisted on different workers.
|
||||
#
|
||||
# Sharded to worker1
|
||||
room_id1 = "!fooo:test"
|
||||
# Sharded to worker2
|
||||
room_id2 = "!bar:test"
|
||||
# Sharded to worker3
|
||||
room_id3 = "!quux:test"
|
||||
|
||||
# Create rooms on the different workers.
|
||||
self._create_room(room_id1, user1_id, user1_tok)
|
||||
self._create_room(room_id2, user1_id, user1_tok)
|
||||
self._create_room(room_id3, user1_id, user1_tok)
|
||||
|
||||
# Ensure that the events were sharded to different workers.
|
||||
pos1 = self.get_success(
|
||||
self.store.get_position_for_event(
|
||||
self.get_success(
|
||||
self.store.get_create_event_for_room(room_id1)
|
||||
).event_id
|
||||
)
|
||||
)
|
||||
self.assertEqual(pos1.instance_name, "worker1")
|
||||
pos2 = self.get_success(
|
||||
self.store.get_position_for_event(
|
||||
self.get_success(
|
||||
self.store.get_create_event_for_room(room_id2)
|
||||
).event_id
|
||||
)
|
||||
)
|
||||
self.assertEqual(pos2.instance_name, "worker2")
|
||||
pos3 = self.get_success(
|
||||
self.store.get_position_for_event(
|
||||
self.get_success(
|
||||
self.store.get_create_event_for_room(room_id3)
|
||||
).event_id
|
||||
)
|
||||
)
|
||||
self.assertEqual(pos3.instance_name, "worker3")
|
||||
|
||||
def send_events_in_room_id(room_id: str) -> None:
|
||||
for i in range(
|
||||
2
|
||||
# 10
|
||||
):
|
||||
logger.info("Sending event %s in %s", i, room_id)
|
||||
# self.helper.send(room_id1, f"activity{i}", tok=user1_tok)
|
||||
defer_to_thread(
|
||||
self.reactor,
|
||||
self.helper.send,
|
||||
room_id,
|
||||
f"activity{i}",
|
||||
tok=user1_tok,
|
||||
)
|
||||
|
||||
# Start creating events in the room at the same time.
|
||||
wait_send_events_in_room1 = defer_to_thread(
|
||||
self.reactor, send_events_in_room_id, room_id1
|
||||
)
|
||||
wait_send_events_in_room2 = defer_to_thread(
|
||||
self.reactor, send_events_in_room_id, room_id2
|
||||
)
|
||||
wait_send_events_in_room3 = defer_to_thread(
|
||||
self.reactor, send_events_in_room_id, room_id3
|
||||
)
|
||||
|
||||
# self.pump(0.1)
|
||||
|
||||
# Wait for the events to be sent
|
||||
self.get_success(wait_send_events_in_room1)
|
||||
self.get_success(wait_send_events_in_room2)
|
||||
self.get_success(wait_send_events_in_room3)
|
||||
Reference in New Issue
Block a user