mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-09 01:30:18 +00:00
Compare commits
71 Commits
madlittlem
...
rei/roomdi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9df5c463fb | ||
|
|
ffc30b8f60 | ||
|
|
304bc2f999 | ||
|
|
a04f01eb15 | ||
|
|
28c742149d | ||
|
|
b396297930 | ||
|
|
30d7bc2821 | ||
|
|
295b1de9e6 | ||
|
|
d5cc8b3e51 | ||
|
|
8c0260299f | ||
|
|
757205d718 | ||
|
|
6c582d7ccb | ||
|
|
4c13f2b282 | ||
|
|
9dbf42af8a | ||
|
|
60481031f2 | ||
|
|
7c0224d5c0 | ||
|
|
f7ececb0ac | ||
|
|
39dbee2a3e | ||
|
|
4444b9a1b3 | ||
|
|
3b69bf3e74 | ||
|
|
73d552a05d | ||
|
|
b06f2947e4 | ||
|
|
d7a692f860 | ||
|
|
a13ad21abf | ||
|
|
bc2c284dbe | ||
|
|
3cdce28d3b | ||
|
|
81aa6d53b0 | ||
|
|
dfb22fec48 | ||
|
|
cc66cf1238 | ||
|
|
a344ad3d3f | ||
|
|
b9f1adc370 | ||
|
|
1af7866562 | ||
|
|
064143c130 | ||
|
|
324f21b216 | ||
|
|
10c1a233f9 | ||
|
|
44d3c2e80b | ||
|
|
07c267c516 | ||
|
|
62b1250629 | ||
|
|
11c4e506bd | ||
|
|
491eaf0808 | ||
|
|
dd8e6020d8 | ||
|
|
99c88ac84e | ||
|
|
3b09a37682 | ||
|
|
bc754cdeed | ||
|
|
c775f310e9 | ||
|
|
09cbc3a8e9 | ||
|
|
736ac58e11 | ||
|
|
a6c102009e | ||
|
|
544ba2c2e9 | ||
|
|
81c5289c83 | ||
|
|
4b7bf2e413 | ||
|
|
5043ef801a | ||
|
|
baeaf00a12 | ||
|
|
1ecd1a6a5f | ||
|
|
c3d2bf2807 | ||
|
|
79252d1c83 | ||
|
|
e8fc180d4d | ||
|
|
7b657f1148 | ||
|
|
18a4c03c50 | ||
|
|
eafa8d3c54 | ||
|
|
977310ee27 | ||
|
|
981c6cf544 | ||
|
|
6a19f7e101 | ||
|
|
4a97eef0dc | ||
|
|
b5573c0ffb | ||
|
|
1819563640 | ||
|
|
e4cbea6c46 | ||
|
|
80a1c6e9e5 | ||
|
|
d7675e79e1 | ||
|
|
8de9ebe35d | ||
|
|
8374bcb0a8 |
1
changelog.d/5879.misc
Normal file
1
changelog.d/5879.misc
Normal file
@@ -0,0 +1 @@
|
||||
Rework room and user statistics to separate current & historical rows, as well as track stats correctly.
|
||||
1
changelog.d/5947.misc
Normal file
1
changelog.d/5947.misc
Normal file
@@ -0,0 +1 @@
|
||||
Perform room directory searches more efficiently, using room statistics.
|
||||
@@ -27,19 +27,16 @@ class StatsConfig(Config):
|
||||
|
||||
def read_config(self, config, **kwargs):
|
||||
self.stats_enabled = True
|
||||
self.stats_bucket_size = 86400
|
||||
self.stats_bucket_size = 86400 * 1000
|
||||
self.stats_retention = sys.maxsize
|
||||
stats_config = config.get("stats", None)
|
||||
if stats_config:
|
||||
self.stats_enabled = stats_config.get("enabled", self.stats_enabled)
|
||||
self.stats_bucket_size = (
|
||||
self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000
|
||||
self.stats_bucket_size = self.parse_duration(
|
||||
stats_config.get("bucket_size", "1d")
|
||||
)
|
||||
self.stats_retention = (
|
||||
self.parse_duration(
|
||||
stats_config.get("retention", "%ds" % (sys.maxsize,))
|
||||
)
|
||||
/ 1000
|
||||
self.stats_retention = self.parse_duration(
|
||||
stats_config.get("retention", "%ds" % (sys.maxsize,))
|
||||
)
|
||||
|
||||
def generate_config_section(self, config_dir_path, server_name, **kwargs):
|
||||
|
||||
@@ -761,6 +761,10 @@ class PublicRoomList(BaseFederationServlet):
|
||||
else:
|
||||
network_tuple = ThirdPartyInstanceID(None, None)
|
||||
|
||||
if limit == 0:
|
||||
# zero is a special value which corresponds to no limit.
|
||||
limit = None
|
||||
|
||||
data = await maybeDeferred(
|
||||
self.handler.get_local_public_room_list,
|
||||
limit,
|
||||
@@ -796,6 +800,10 @@ class PublicRoomList(BaseFederationServlet):
|
||||
if search_filter is None:
|
||||
logger.warning("Nonefilter")
|
||||
|
||||
if limit == 0:
|
||||
# zero is a special value which corresponds to no limit.
|
||||
limit = None
|
||||
|
||||
data = await self.handler.get_local_public_room_list(
|
||||
limit=limit,
|
||||
since_token=since_token,
|
||||
|
||||
@@ -17,7 +17,6 @@ import logging
|
||||
from collections import namedtuple
|
||||
|
||||
from six import PY3, iteritems
|
||||
from six.moves import range
|
||||
|
||||
import msgpack
|
||||
from unpaddedbase64 import decode_base64, encode_base64
|
||||
@@ -25,9 +24,8 @@ from unpaddedbase64 import decode_base64, encode_base64
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes, JoinRules
|
||||
from synapse.api.errors import Codes, HttpResponseException
|
||||
from synapse.api.errors import Codes, HttpResponseException, SynapseError
|
||||
from synapse.types import ThirdPartyInstanceID
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
|
||||
@@ -37,7 +35,6 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
|
||||
|
||||
|
||||
# This is used to indicate we should only return rooms published to the main list.
|
||||
EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
|
||||
|
||||
@@ -72,6 +69,8 @@ class RoomListHandler(BaseHandler):
|
||||
This can be (None, None) to indicate the main list, or a particular
|
||||
appservice and network id to use an appservice specific one.
|
||||
Setting to None returns all public rooms across all lists.
|
||||
from_federation (bool): true iff the request comes from the federation
|
||||
API
|
||||
"""
|
||||
if not self.enable_room_list_search:
|
||||
return defer.succeed({"chunk": [], "total_room_count_estimate": 0})
|
||||
@@ -133,200 +132,113 @@ class RoomListHandler(BaseHandler):
|
||||
from_federation (bool): Whether this request originated from a
|
||||
federating server or a client. Used for room filtering.
|
||||
timeout (int|None): Amount of seconds to wait for a response before
|
||||
timing out.
|
||||
timing out. TODO
|
||||
"""
|
||||
pagination_token = None
|
||||
if since_token and since_token != "END":
|
||||
since_token = RoomListNextBatch.from_token(since_token)
|
||||
if since_token[0] in ("+", "-"):
|
||||
forwards = since_token[0] == "+"
|
||||
pagination_token = since_token[1:]
|
||||
else:
|
||||
raise SynapseError(400, "Invalid since token.")
|
||||
else:
|
||||
since_token = None
|
||||
forwards = True
|
||||
|
||||
rooms_to_order_value = {}
|
||||
rooms_to_num_joined = {}
|
||||
# we request one more than wanted to see if there are more pages to come
|
||||
probing_limit = limit + 1 if limit is not None else None
|
||||
|
||||
newly_visible = []
|
||||
newly_unpublished = []
|
||||
if since_token:
|
||||
stream_token = since_token.stream_ordering
|
||||
current_public_id = yield self.store.get_current_public_room_stream_id()
|
||||
public_room_stream_id = since_token.public_room_stream_id
|
||||
newly_visible, newly_unpublished = yield self.store.get_public_room_changes(
|
||||
public_room_stream_id, current_public_id, network_tuple=network_tuple
|
||||
)
|
||||
else:
|
||||
stream_token = yield self.store.get_room_max_stream_ordering()
|
||||
public_room_stream_id = yield self.store.get_current_public_room_stream_id()
|
||||
|
||||
room_ids = yield self.store.get_public_room_ids_at_stream_id(
|
||||
public_room_stream_id, network_tuple=network_tuple
|
||||
results = yield self.store.get_largest_public_rooms(
|
||||
network_tuple,
|
||||
search_filter,
|
||||
probing_limit,
|
||||
pagination_token,
|
||||
forwards,
|
||||
fetch_creation_event_ids=from_federation,
|
||||
)
|
||||
|
||||
# We want to return rooms in a particular order: the number of joined
|
||||
# users. We then arbitrarily use the room_id as a tie breaker.
|
||||
def build_room_entry(room):
|
||||
entry = {
|
||||
"room_id": room["room_id"],
|
||||
"name": room["name"],
|
||||
"topic": room["topic"],
|
||||
"canonical_alias": room["canonical_alias"],
|
||||
"num_joined_members": room["joined_members"],
|
||||
"avatar_url": room["avatar"],
|
||||
"world_readable": room["history_visibility"] == "world_readable",
|
||||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_order_for_room(room_id):
|
||||
# Most of the rooms won't have changed between the since token and
|
||||
# now (especially if the since token is "now"). So, we can ask what
|
||||
# the current users are in a room (that will hit a cache) and then
|
||||
# check if the room has changed since the since token. (We have to
|
||||
# do it in that order to avoid races).
|
||||
# If things have changed then fall back to getting the current state
|
||||
# at the since token.
|
||||
joined_users = yield self.store.get_users_in_room(room_id)
|
||||
if self.store.has_room_changed_since(room_id, stream_token):
|
||||
latest_event_ids = yield self.store.get_forward_extremeties_for_room(
|
||||
room_id, stream_token
|
||||
)
|
||||
# Filter out Nones – rather omit the field altogether
|
||||
return {k: v for k, v in entry.items() if v is not None}
|
||||
|
||||
if not latest_event_ids:
|
||||
return
|
||||
if from_federation:
|
||||
room_creation_event_ids = [r["creation_event_id"] for r in results]
|
||||
|
||||
joined_users = yield self.state_handler.get_current_users_in_room(
|
||||
room_id, latest_event_ids
|
||||
)
|
||||
results = [build_room_entry(r) for r in results]
|
||||
|
||||
num_joined_users = len(joined_users)
|
||||
rooms_to_num_joined[room_id] = num_joined_users
|
||||
|
||||
if num_joined_users == 0:
|
||||
return
|
||||
|
||||
# We want larger rooms to be first, hence negating num_joined_users
|
||||
rooms_to_order_value[room_id] = (-num_joined_users, room_id)
|
||||
|
||||
logger.info(
|
||||
"Getting ordering for %i rooms since %s", len(room_ids), stream_token
|
||||
)
|
||||
yield concurrently_execute(get_order_for_room, room_ids, 10)
|
||||
|
||||
sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1])
|
||||
sorted_rooms = [room_id for room_id, _ in sorted_entries]
|
||||
|
||||
# `sorted_rooms` should now be a list of all public room ids that is
|
||||
# stable across pagination. Therefore, we can use indices into this
|
||||
# list as our pagination tokens.
|
||||
|
||||
# Filter out rooms that we don't want to return
|
||||
rooms_to_scan = [
|
||||
r
|
||||
for r in sorted_rooms
|
||||
if r not in newly_unpublished and rooms_to_num_joined[r] > 0
|
||||
]
|
||||
|
||||
total_room_count = len(rooms_to_scan)
|
||||
|
||||
if since_token:
|
||||
# Filter out rooms we've already returned previously
|
||||
# `since_token.current_limit` is the index of the last room we
|
||||
# sent down, so we exclude it and everything before/after it.
|
||||
if since_token.direction_is_forward:
|
||||
rooms_to_scan = rooms_to_scan[since_token.current_limit + 1 :]
|
||||
response = {}
|
||||
num_results = len(results)
|
||||
if num_results > 0:
|
||||
final_room_id = results[-1]["room_id"]
|
||||
initial_room_id = results[0]["room_id"]
|
||||
if limit is not None:
|
||||
more_to_come = num_results == probing_limit
|
||||
results = results[0:limit]
|
||||
else:
|
||||
rooms_to_scan = rooms_to_scan[: since_token.current_limit]
|
||||
rooms_to_scan.reverse()
|
||||
more_to_come = False
|
||||
|
||||
logger.info("After sorting and filtering, %i rooms remain", len(rooms_to_scan))
|
||||
if not forwards or (forwards and more_to_come):
|
||||
response["next_batch"] = "+%s" % (final_room_id,)
|
||||
|
||||
# _append_room_entry_to_chunk will append to chunk but will stop if
|
||||
# len(chunk) > limit
|
||||
#
|
||||
# Normally we will generate enough results on the first iteration here,
|
||||
# but if there is a search filter, _append_room_entry_to_chunk may
|
||||
# filter some results out, in which case we loop again.
|
||||
#
|
||||
# We don't want to scan over the entire range either as that
|
||||
# would potentially waste a lot of work.
|
||||
#
|
||||
# XXX if there is no limit, we may end up DoSing the server with
|
||||
# calls to get_current_state_ids for every single room on the
|
||||
# server. Surely we should cap this somehow?
|
||||
#
|
||||
if limit:
|
||||
step = limit + 1
|
||||
else:
|
||||
# step cannot be zero
|
||||
step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
|
||||
if since_token and (forwards or (not forwards and more_to_come)):
|
||||
if num_results > 0:
|
||||
response["prev_batch"] = "-%s" % (initial_room_id,)
|
||||
else:
|
||||
response["prev_batch"] = "-%s" % (pagination_token,)
|
||||
|
||||
chunk = []
|
||||
for i in range(0, len(rooms_to_scan), step):
|
||||
if timeout and self.clock.time() > timeout:
|
||||
raise Exception("Timed out searching room directory")
|
||||
if from_federation:
|
||||
# only show rooms with m.federate=True or absent (default is True)
|
||||
|
||||
batch = rooms_to_scan[i : i + step]
|
||||
logger.info("Processing %i rooms for result", len(batch))
|
||||
yield concurrently_execute(
|
||||
lambda r: self._append_room_entry_to_chunk(
|
||||
r,
|
||||
rooms_to_num_joined[r],
|
||||
chunk,
|
||||
limit,
|
||||
search_filter,
|
||||
from_federation=from_federation,
|
||||
),
|
||||
batch,
|
||||
5,
|
||||
)
|
||||
logger.info("Now %i rooms in result", len(chunk))
|
||||
if len(chunk) >= limit + 1:
|
||||
break
|
||||
# we already have rooms' creation state events' IDs
|
||||
# so get rooms' creation state events
|
||||
creation_events_by_id = yield self.store.get_events(room_creation_event_ids)
|
||||
|
||||
chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
|
||||
# now filter out rooms with m.federate: False in their create event
|
||||
results = [
|
||||
room
|
||||
for (room, room_creation_event_id) in zip(
|
||||
results, room_creation_event_ids
|
||||
)
|
||||
if creation_events_by_id[room_creation_event_id].content.get(
|
||||
"m.federate", True
|
||||
)
|
||||
]
|
||||
|
||||
# Work out the new limit of the batch for pagination, or None if we
|
||||
# know there are no more results that would be returned.
|
||||
# i.e., [since_token.current_limit..new_limit] is the batch of rooms
|
||||
# we've returned (or the reverse if we paginated backwards)
|
||||
# We tried to pull out limit + 1 rooms above, so if we have <= limit
|
||||
# then we know there are no more results to return
|
||||
new_limit = None
|
||||
if chunk and (not limit or len(chunk) > limit):
|
||||
for room in results:
|
||||
# populate search result entries with additional fields, namely
|
||||
# 'aliases' and 'guest_can_join'
|
||||
room_id = room["room_id"]
|
||||
|
||||
if not since_token or since_token.direction_is_forward:
|
||||
if limit:
|
||||
chunk = chunk[:limit]
|
||||
last_room_id = chunk[-1]["room_id"]
|
||||
else:
|
||||
if limit:
|
||||
chunk = chunk[-limit:]
|
||||
last_room_id = chunk[0]["room_id"]
|
||||
aliases = yield self.store.get_aliases_for_room(room_id)
|
||||
if aliases:
|
||||
room["aliases"] = aliases
|
||||
|
||||
new_limit = sorted_rooms.index(last_room_id)
|
||||
state_ids = yield self.store.get_current_state_ids(room_id)
|
||||
guests_can_join = False
|
||||
guest_access_state_id = state_ids.get((EventTypes.GuestAccess, ""))
|
||||
if guest_access_state_id is not None:
|
||||
guest_access = yield self.store.get_event(guest_access_state_id)
|
||||
if guest_access is not None:
|
||||
if guest_access.content.get("guest_access") == "can_join":
|
||||
guests_can_join = True
|
||||
room["guest_can_join"] = guests_can_join
|
||||
|
||||
results = {"chunk": chunk, "total_room_count_estimate": total_room_count}
|
||||
response["chunk"] = results
|
||||
|
||||
if since_token:
|
||||
results["new_rooms"] = bool(newly_visible)
|
||||
# TODO for federation, we currently don't remove m.federate=False rooms
|
||||
# from the total room count estimate.
|
||||
response["total_room_count_estimate"] = yield self.store.count_public_rooms()
|
||||
|
||||
if not since_token or since_token.direction_is_forward:
|
||||
if new_limit is not None:
|
||||
results["next_batch"] = RoomListNextBatch(
|
||||
stream_ordering=stream_token,
|
||||
public_room_stream_id=public_room_stream_id,
|
||||
current_limit=new_limit,
|
||||
direction_is_forward=True,
|
||||
).to_token()
|
||||
|
||||
if since_token:
|
||||
results["prev_batch"] = since_token.copy_and_replace(
|
||||
direction_is_forward=False,
|
||||
current_limit=since_token.current_limit + 1,
|
||||
).to_token()
|
||||
else:
|
||||
if new_limit is not None:
|
||||
results["prev_batch"] = RoomListNextBatch(
|
||||
stream_ordering=stream_token,
|
||||
public_room_stream_id=public_room_stream_id,
|
||||
current_limit=new_limit,
|
||||
direction_is_forward=False,
|
||||
).to_token()
|
||||
|
||||
if since_token:
|
||||
results["next_batch"] = since_token.copy_and_replace(
|
||||
direction_is_forward=True,
|
||||
current_limit=since_token.current_limit - 1,
|
||||
).to_token()
|
||||
|
||||
return results
|
||||
return response
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _append_room_entry_to_chunk(
|
||||
@@ -587,7 +499,6 @@ class RoomListNextBatch(
|
||||
),
|
||||
)
|
||||
):
|
||||
|
||||
KEY_DICT = {
|
||||
"stream_ordering": "s",
|
||||
"public_room_stream_id": "p",
|
||||
|
||||
@@ -49,9 +49,6 @@ class StatsHandler(StateDeltasHandler):
|
||||
# The current position in the current_state_delta stream
|
||||
self.pos = None
|
||||
|
||||
# Guard to ensure we only process deltas one at a time
|
||||
self._is_processing = False
|
||||
|
||||
if hs.config.stats_enabled:
|
||||
self.notifier.add_replication_callback(self.notify_new_event)
|
||||
|
||||
@@ -65,43 +62,60 @@ class StatsHandler(StateDeltasHandler):
|
||||
if not self.hs.config.stats_enabled:
|
||||
return
|
||||
|
||||
if self._is_processing:
|
||||
return
|
||||
lock = self.store.stats_delta_processing_lock
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def process():
|
||||
yield lock.acquire()
|
||||
try:
|
||||
yield self._unsafe_process()
|
||||
finally:
|
||||
self._is_processing = False
|
||||
yield lock.release()
|
||||
|
||||
self._is_processing = True
|
||||
run_as_background_process("stats.notify_new_event", process)
|
||||
if not lock.locked:
|
||||
# we only want to run this process one-at-a-time,
|
||||
# and also, if the initial background updater wants us to keep out,
|
||||
# we should respect that.
|
||||
run_as_background_process("stats.notify_new_event", process)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _unsafe_process(self):
|
||||
# If self.pos is None then means we haven't fetched it from DB
|
||||
if self.pos is None:
|
||||
self.pos = yield self.store.get_stats_stream_pos()
|
||||
# If None is one of the values, then means that the stats regenerator has not (or had not) yet unwedged us
|
||||
# but note that this might be outdated, so we retrieve the positions again.
|
||||
if self.pos is None or None in self.pos.values():
|
||||
self.pos = yield self.store.get_stats_positions()
|
||||
|
||||
# If still None then the initial background update hasn't happened yet
|
||||
if self.pos is None:
|
||||
# If still contains a None position, then the stats regenerator hasn't started yet
|
||||
if None in self.pos.values():
|
||||
return None
|
||||
|
||||
# Loop round handling deltas until we're up to date
|
||||
|
||||
while True:
|
||||
with Measure(self.clock, "stats_delta"):
|
||||
deltas = yield self.store.get_current_state_deltas(self.pos)
|
||||
if not deltas:
|
||||
return
|
||||
deltas = yield self.store.get_current_state_deltas(
|
||||
self.pos["state_delta_stream_id"]
|
||||
)
|
||||
|
||||
logger.info("Handling %d state deltas", len(deltas))
|
||||
logger.debug("Handling %d state deltas", len(deltas))
|
||||
yield self._handle_deltas(deltas)
|
||||
|
||||
self.pos = deltas[-1]["stream_id"]
|
||||
yield self.store.update_stats_stream_pos(self.pos)
|
||||
self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
|
||||
yield self.store.update_stats_positions(self.pos)
|
||||
|
||||
event_processing_positions.labels("stats").set(self.pos)
|
||||
event_processing_positions.labels("stats").set(
|
||||
self.pos["state_delta_stream_id"]
|
||||
)
|
||||
|
||||
# Then count deltas for total_events and total_event_bytes.
|
||||
with Measure(self.clock, "stats_total_events_and_bytes"):
|
||||
self.pos, had_counts = yield self.store.incremental_update_room_total_events_and_bytes(
|
||||
self.pos
|
||||
)
|
||||
|
||||
if not deltas and not had_counts:
|
||||
break
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _handle_deltas(self, deltas):
|
||||
@@ -119,7 +133,7 @@ class StatsHandler(StateDeltasHandler):
|
||||
|
||||
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
|
||||
|
||||
token = yield self.store.get_earliest_token_for_room_stats(room_id)
|
||||
token = yield self.store.get_earliest_token_for_stats("room", room_id)
|
||||
|
||||
# If the earliest token to begin from is larger than our current
|
||||
# stream ID, skip processing this delta.
|
||||
@@ -131,7 +145,10 @@ class StatsHandler(StateDeltasHandler):
|
||||
continue
|
||||
|
||||
if event_id is None and prev_event_id is None:
|
||||
# Errr...
|
||||
logger.error(
|
||||
"event ID is None and so is the previous event ID. stream_id: %s",
|
||||
stream_id,
|
||||
)
|
||||
continue
|
||||
|
||||
event_content = {}
|
||||
@@ -143,92 +160,87 @@ class StatsHandler(StateDeltasHandler):
|
||||
|
||||
# We use stream_pos here rather than fetch by event_id as event_id
|
||||
# may be None
|
||||
now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
|
||||
stream_timestamp = yield self.store.get_received_ts_by_stream_pos(
|
||||
stream_pos
|
||||
)
|
||||
stream_timestamp = int(stream_timestamp)
|
||||
|
||||
# quantise time to the nearest bucket
|
||||
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
|
||||
# All the values in this dict are deltas (RELATIVE changes)
|
||||
room_stats_delta = {}
|
||||
is_newly_created = False
|
||||
|
||||
if prev_event_id is None:
|
||||
# this state event doesn't overwrite another,
|
||||
# so it is a new effective/current state event
|
||||
room_stats_delta["current_state_events"] = 1
|
||||
|
||||
if typ == EventTypes.Member:
|
||||
# we could use _get_key_change here but it's a bit inefficient
|
||||
# given we're not testing for a specific result; might as well
|
||||
# just grab the prev_membership and membership strings and
|
||||
# compare them.
|
||||
prev_event_content = {}
|
||||
# We take None rather than leave as a previous membership
|
||||
# in the absence of a previous event because we do not want to
|
||||
# reduce the leave count when a new-to-the-room user joins.
|
||||
prev_membership = None
|
||||
if prev_event_id is not None:
|
||||
prev_event = yield self.store.get_event(
|
||||
prev_event_id, allow_none=True
|
||||
)
|
||||
if prev_event:
|
||||
prev_event_content = prev_event.content
|
||||
prev_membership = prev_event_content.get(
|
||||
"membership", Membership.LEAVE
|
||||
)
|
||||
|
||||
membership = event_content.get("membership", Membership.LEAVE)
|
||||
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
|
||||
|
||||
if prev_membership == membership:
|
||||
continue
|
||||
|
||||
if prev_membership == Membership.JOIN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "joined_members", -1
|
||||
)
|
||||
if prev_membership is None:
|
||||
logger.debug("No previous membership for this user.")
|
||||
elif membership == prev_membership:
|
||||
pass # noop
|
||||
elif prev_membership == Membership.JOIN:
|
||||
room_stats_delta["joined_members"] = -1
|
||||
elif prev_membership == Membership.INVITE:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "invited_members", -1
|
||||
)
|
||||
room_stats_delta["invited_members"] = -1
|
||||
elif prev_membership == Membership.LEAVE:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "left_members", -1
|
||||
)
|
||||
room_stats_delta["left_members"] = -1
|
||||
elif prev_membership == Membership.BAN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "banned_members", -1
|
||||
)
|
||||
room_stats_delta["banned_members"] = -1
|
||||
else:
|
||||
err = "%s is not a valid prev_membership" % (repr(prev_membership),)
|
||||
logger.error(err)
|
||||
raise ValueError(err)
|
||||
raise ValueError(
|
||||
"%r is not a valid prev_membership" % (prev_membership,)
|
||||
)
|
||||
|
||||
if membership == prev_membership:
|
||||
pass # noop
|
||||
if membership == Membership.JOIN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "joined_members", +1
|
||||
)
|
||||
room_stats_delta["joined_members"] = +1
|
||||
elif membership == Membership.INVITE:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "invited_members", +1
|
||||
)
|
||||
room_stats_delta["invited_members"] = +1
|
||||
elif membership == Membership.LEAVE:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "left_members", +1
|
||||
)
|
||||
room_stats_delta["left_members"] = +1
|
||||
elif membership == Membership.BAN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "banned_members", +1
|
||||
)
|
||||
room_stats_delta["banned_members"] = +1
|
||||
else:
|
||||
err = "%s is not a valid membership" % (repr(membership),)
|
||||
logger.error(err)
|
||||
raise ValueError(err)
|
||||
raise ValueError("%r is not a valid membership" % (membership,))
|
||||
|
||||
user_id = state_key
|
||||
if self.is_mine_id(user_id):
|
||||
# update user_stats as it's one of our users
|
||||
public = yield self._is_public_room(room_id)
|
||||
# this accounts for transitions like leave → ban and so on.
|
||||
has_changed_joinedness = (prev_membership == Membership.JOIN) != (
|
||||
membership == Membership.JOIN
|
||||
)
|
||||
|
||||
if has_changed_joinedness:
|
||||
# update user_stats as it's one of our users
|
||||
public = yield self._is_public_room(room_id)
|
||||
|
||||
field = "public_rooms" if public else "private_rooms"
|
||||
delta = +1 if membership == Membership.JOIN else -1
|
||||
|
||||
if membership == Membership.LEAVE:
|
||||
yield self.store.update_stats_delta(
|
||||
now,
|
||||
"user",
|
||||
user_id,
|
||||
"public_rooms" if public else "private_rooms",
|
||||
-1,
|
||||
)
|
||||
elif membership == Membership.JOIN:
|
||||
yield self.store.update_stats_delta(
|
||||
now,
|
||||
"user",
|
||||
user_id,
|
||||
"public_rooms" if public else "private_rooms",
|
||||
+1,
|
||||
stream_timestamp, "user", user_id, {field: delta}
|
||||
)
|
||||
|
||||
elif typ == EventTypes.Create:
|
||||
@@ -246,28 +258,50 @@ class StatsHandler(StateDeltasHandler):
|
||||
},
|
||||
)
|
||||
|
||||
is_newly_created = True
|
||||
|
||||
elif typ == EventTypes.JoinRules:
|
||||
old_room_state = yield self.store.get_room_state(room_id)
|
||||
yield self.store.update_room_state(
|
||||
room_id, {"join_rules": event_content.get("join_rule")}
|
||||
)
|
||||
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
|
||||
# whether the room would be public anyway,
|
||||
# because of history_visibility
|
||||
other_field_gives_publicity = (
|
||||
old_room_state["history_visibility"] == "world_readable"
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(now, room_id, is_public)
|
||||
|
||||
if not other_field_gives_publicity:
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(
|
||||
stream_timestamp, room_id, is_public
|
||||
)
|
||||
|
||||
elif typ == EventTypes.RoomHistoryVisibility:
|
||||
old_room_state = yield self.store.get_room_state(room_id)
|
||||
yield self.store.update_room_state(
|
||||
room_id,
|
||||
{"history_visibility": event_content.get("history_visibility")},
|
||||
)
|
||||
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "history_visibility", "world_readable"
|
||||
# whether the room would be public anyway,
|
||||
# because of join_rule
|
||||
other_field_gives_publicity = (
|
||||
old_room_state["join_rules"] == JoinRules.PUBLIC
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(now, room_id, is_public)
|
||||
|
||||
if not other_field_gives_publicity:
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "history_visibility", "world_readable"
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(
|
||||
stream_timestamp, room_id, is_public
|
||||
)
|
||||
|
||||
elif typ == EventTypes.Encryption:
|
||||
yield self.store.update_room_state(
|
||||
@@ -290,6 +324,20 @@ class StatsHandler(StateDeltasHandler):
|
||||
room_id, {"canonical_alias": event_content.get("alias")}
|
||||
)
|
||||
|
||||
if is_newly_created:
|
||||
yield self.store.update_stats_delta(
|
||||
stream_timestamp,
|
||||
"room",
|
||||
room_id,
|
||||
room_stats_delta,
|
||||
complete_with_stream_id=stream_id,
|
||||
)
|
||||
|
||||
elif len(room_stats_delta) > 0:
|
||||
yield self.store.update_stats_delta(
|
||||
stream_timestamp, "room", room_id, room_stats_delta
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_public_room_stats(self, ts, room_id, is_public):
|
||||
"""
|
||||
@@ -308,10 +356,13 @@ class StatsHandler(StateDeltasHandler):
|
||||
for user_id in user_ids:
|
||||
if self.hs.is_mine(UserID.from_string(user_id)):
|
||||
yield self.store.update_stats_delta(
|
||||
ts, "user", user_id, "public_rooms", +1 if is_public else -1
|
||||
)
|
||||
yield self.store.update_stats_delta(
|
||||
ts, "user", user_id, "private_rooms", -1 if is_public else +1
|
||||
ts,
|
||||
"user",
|
||||
user_id,
|
||||
{
|
||||
"public_rooms": +1 if is_public else -1,
|
||||
"private_rooms": -1 if is_public else +1,
|
||||
},
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
||||
@@ -350,6 +350,10 @@ class PublicRoomListRestServlet(TransactionRestServlet):
|
||||
limit = parse_integer(request, "limit", 0)
|
||||
since_token = parse_string(request, "since", None)
|
||||
|
||||
if limit == 0:
|
||||
# zero is a special value which corresponds to no limit.
|
||||
limit = None
|
||||
|
||||
handler = self.hs.get_room_list_handler()
|
||||
if server:
|
||||
data = yield handler.get_remote_public_room_list(
|
||||
@@ -387,6 +391,10 @@ class PublicRoomListRestServlet(TransactionRestServlet):
|
||||
else:
|
||||
network_tuple = ThirdPartyInstanceID.from_string(third_party_instance_id)
|
||||
|
||||
if limit == 0:
|
||||
# zero is a special value which corresponds to no limit.
|
||||
limit = None
|
||||
|
||||
handler = self.hs.get_room_list_handler()
|
||||
if server:
|
||||
data = yield handler.get_remote_public_room_list(
|
||||
|
||||
@@ -2270,8 +2270,9 @@ class EventsStore(
|
||||
"room_aliases",
|
||||
"room_depth",
|
||||
"room_memberships",
|
||||
"room_state",
|
||||
"room_stats",
|
||||
"room_stats_state",
|
||||
"room_stats_current",
|
||||
"room_stats_historical",
|
||||
"room_stats_earliest_token",
|
||||
"rooms",
|
||||
"stream_ordering_to_exterm",
|
||||
|
||||
@@ -869,6 +869,17 @@ class RegistrationStore(
|
||||
(user_id_obj.localpart, create_profile_with_displayname),
|
||||
)
|
||||
|
||||
if self.hs.config.stats_enabled:
|
||||
# we create a new completed user statistics row
|
||||
|
||||
# we don't strictly need current_token since this user really can't
|
||||
# have any state deltas before now (as it is a new user), but still,
|
||||
# we include it for completeness.
|
||||
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
|
||||
self._update_stats_delta_txn(
|
||||
txn, now, "user", user_id, {}, complete_with_stream_id=current_token
|
||||
)
|
||||
|
||||
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
|
||||
txn.call_after(self.is_guest.invalidate, (user_id,))
|
||||
|
||||
@@ -1140,6 +1151,7 @@ class RegistrationStore(
|
||||
deferred str|None: A str representing a link to redirect the user
|
||||
to if there is one.
|
||||
"""
|
||||
|
||||
# Insert everything into a transaction in order to run atomically
|
||||
def validate_threepid_session_txn(txn):
|
||||
row = self._simple_select_one_txn(
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -161,6 +162,196 @@ class RoomWorkerStore(SQLBaseStore):
|
||||
"get_public_room_changes", get_public_room_changes_txn
|
||||
)
|
||||
|
||||
def count_public_rooms(self):
|
||||
"""
|
||||
Counts the number of public rooms as tracked in the room_stats_current
|
||||
and room_stats_state
|
||||
table.
|
||||
A public room is one who has is_public set
|
||||
AND is publicly-joinable and/or world-readable.
|
||||
Returns:
|
||||
number of public rooms on this homeserver's room directory
|
||||
|
||||
"""
|
||||
|
||||
def _count_public_rooms_txn(txn):
|
||||
sql = """
|
||||
SELECT COUNT(*)
|
||||
FROM room_stats_current
|
||||
JOIN room_stats_state USING (room_id)
|
||||
JOIN rooms USING (room_id)
|
||||
WHERE
|
||||
is_public
|
||||
AND (
|
||||
join_rules = 'public'
|
||||
OR history_visibility = 'world_readable'
|
||||
)
|
||||
"""
|
||||
txn.execute(sql)
|
||||
return txn.fetchone()[0]
|
||||
|
||||
return self.runInteraction("count_public_rooms", _count_public_rooms_txn)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_largest_public_rooms(
|
||||
self,
|
||||
network_tuple,
|
||||
search_filter,
|
||||
limit,
|
||||
pagination_token,
|
||||
forwards,
|
||||
fetch_creation_event_ids=False,
|
||||
):
|
||||
"""Gets the largest public rooms (where largest is in terms of joined
|
||||
members, as tracked in the statistics table).
|
||||
|
||||
Args:
|
||||
network_tuple (ThirdPartyInstanceID|None):
|
||||
search_filter (dict|None):
|
||||
limit (int|None): Maxmimum number of rows to return, unlimited otherwise.
|
||||
pagination_token (str|None): if present, a room ID which is to be
|
||||
the (first/last) included in the results.
|
||||
forwards (bool): true iff going forwards, going backwards otherwise
|
||||
fetch_creation_event_ids (bool): if true, room creation_event_ids will
|
||||
be included in the results.
|
||||
|
||||
Returns:
|
||||
Rooms in order: biggest number of joined users first.
|
||||
We then arbitrarily use the room_id as a tie breaker.
|
||||
|
||||
"""
|
||||
|
||||
# TODO we probably want to use full text search on Postgres?
|
||||
|
||||
sql = """
|
||||
SELECT
|
||||
room_id, name, topic, canonical_alias, joined_members,
|
||||
avatar, history_visibility, joined_members
|
||||
"""
|
||||
|
||||
if fetch_creation_event_ids:
|
||||
sql += """
|
||||
, cse_create.event_id AS creation_event_id
|
||||
"""
|
||||
|
||||
sql += """
|
||||
FROM
|
||||
room_stats_current
|
||||
JOIN room_stats_state USING (room_id)
|
||||
JOIN rooms USING (room_id)
|
||||
"""
|
||||
query_args = []
|
||||
|
||||
if network_tuple:
|
||||
sql += """
|
||||
LEFT JOIN appservice_room_list arl USING (room_id)
|
||||
"""
|
||||
|
||||
if fetch_creation_event_ids:
|
||||
sql += """
|
||||
LEFT JOIN current_state_events cse_create USING (room_id)
|
||||
"""
|
||||
|
||||
sql += """
|
||||
WHERE
|
||||
is_public
|
||||
AND (
|
||||
join_rules = 'public'
|
||||
OR history_visibility = 'world_readable'
|
||||
)
|
||||
"""
|
||||
|
||||
if fetch_creation_event_ids:
|
||||
sql += """
|
||||
AND cse_create.type = 'm.room.create'
|
||||
AND cse_create.state_key = ''
|
||||
"""
|
||||
|
||||
if pagination_token:
|
||||
pt_joined = yield self._simple_select_one_onecol(
|
||||
table="room_stats_current",
|
||||
keyvalues={"room_id": pagination_token},
|
||||
retcol="joined_members",
|
||||
desc="get_largest_public_rooms",
|
||||
)
|
||||
|
||||
if forwards:
|
||||
sql += """
|
||||
AND (
|
||||
(joined_members < ?)
|
||||
OR (joined_members = ? AND room_id >= ?)
|
||||
)
|
||||
"""
|
||||
else:
|
||||
sql += """
|
||||
AND (
|
||||
(joined_members > ?)
|
||||
OR (joined_members = ? AND room_id <= ?)
|
||||
)
|
||||
"""
|
||||
query_args += [pt_joined, pt_joined, pagination_token]
|
||||
|
||||
if search_filter and search_filter.get("generic_search_term", None):
|
||||
search_term = "%" + search_filter["generic_search_term"] + "%"
|
||||
sql += """
|
||||
AND (
|
||||
name LIKE ?
|
||||
OR topic LIKE ?
|
||||
OR canonical_alias LIKE ?
|
||||
)
|
||||
"""
|
||||
query_args += [search_term, search_term, search_term]
|
||||
|
||||
if network_tuple:
|
||||
sql += "AND ("
|
||||
if network_tuple.appservice_id:
|
||||
sql += "appservice_id = ? AND "
|
||||
query_args.append(network_tuple.appservice_id)
|
||||
else:
|
||||
sql += "appservice_id IS NULL AND "
|
||||
|
||||
if network_tuple.network_id:
|
||||
sql += "network_id = ?)"
|
||||
query_args.append(network_tuple.network_id)
|
||||
else:
|
||||
sql += "network_id IS NULL)"
|
||||
|
||||
if forwards:
|
||||
sql += """
|
||||
ORDER BY
|
||||
joined_members DESC, room_id ASC
|
||||
"""
|
||||
else:
|
||||
sql += """
|
||||
ORDER BY
|
||||
joined_members ASC, room_id DESC
|
||||
"""
|
||||
|
||||
if limit is not None:
|
||||
# be cautious about SQL injection
|
||||
assert isinstance(limit, int)
|
||||
|
||||
sql += """
|
||||
LIMIT %d
|
||||
""" % (
|
||||
limit,
|
||||
)
|
||||
|
||||
def _get_largest_public_rooms_txn(txn):
|
||||
txn.execute(sql, query_args)
|
||||
|
||||
results = self.cursor_to_dict(txn)
|
||||
|
||||
if not forwards:
|
||||
results.reverse()
|
||||
|
||||
return results
|
||||
|
||||
ret_val = yield self.runInteraction(
|
||||
"get_largest_public_rooms", _get_largest_public_rooms_txn
|
||||
)
|
||||
defer.returnValue(ret_val)
|
||||
|
||||
@cached(max_entries=10000)
|
||||
def is_room_blocked(self, room_id):
|
||||
return self._simple_select_one_onecol(
|
||||
|
||||
144
synapse/storage/schema/delta/56/stats_separated1.sql
Normal file
144
synapse/storage/schema/delta/56/stats_separated1.sql
Normal file
@@ -0,0 +1,144 @@
|
||||
/* Copyright 2018 New Vector Ltd
|
||||
* Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
----- First clean up from previous versions of room stats.
|
||||
|
||||
-- First remove old stats stuff
|
||||
DROP TABLE IF EXISTS room_stats;
|
||||
DROP TABLE IF EXISTS user_stats;
|
||||
DROP TABLE IF EXISTS room_stats_earliest_tokens;
|
||||
DROP TABLE IF EXISTS _temp_populate_stats_position;
|
||||
DROP TABLE IF EXISTS _temp_populate_stats_rooms;
|
||||
DROP TABLE IF EXISTS stats_stream_pos;
|
||||
|
||||
-- Unschedule old background updates if they're still scheduled
|
||||
DELETE FROM background_updates WHERE update_name IN (
|
||||
'populate_stats_createtables',
|
||||
'populate_stats_process_rooms',
|
||||
'populate_stats_cleanup'
|
||||
);
|
||||
|
||||
----- Create tables for our version of room stats.
|
||||
|
||||
-- single-row table to track position of incremental updates
|
||||
CREATE TABLE IF NOT EXISTS stats_incremental_position (
|
||||
-- the stream_id of the last-processed state delta
|
||||
state_delta_stream_id BIGINT,
|
||||
|
||||
-- the stream_ordering of the last-processed backfilled event
|
||||
-- (this is negative)
|
||||
total_events_min_stream_ordering BIGINT,
|
||||
|
||||
-- the stream_ordering of the last-processed normally-created event
|
||||
-- (this is positive)
|
||||
total_events_max_stream_ordering BIGINT,
|
||||
|
||||
-- If true, this represents the contract agreed upon by the stats
|
||||
-- regenerator.
|
||||
-- If false, this is suitable for use by the delta/incremental processor.
|
||||
is_background_contract BOOLEAN NOT NULL PRIMARY KEY
|
||||
);
|
||||
|
||||
-- insert a null row and make sure it is the only one.
|
||||
DELETE FROM stats_incremental_position;
|
||||
INSERT INTO stats_incremental_position (
|
||||
state_delta_stream_id,
|
||||
total_events_min_stream_ordering,
|
||||
total_events_max_stream_ordering,
|
||||
is_background_contract
|
||||
) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1));
|
||||
|
||||
-- represents PRESENT room statistics for a room
|
||||
-- only holds absolute fields
|
||||
CREATE TABLE IF NOT EXISTS room_stats_current (
|
||||
room_id TEXT NOT NULL PRIMARY KEY,
|
||||
|
||||
current_state_events INT NOT NULL,
|
||||
total_events INT NOT NULL,
|
||||
total_event_bytes BIGINT NOT NULL,
|
||||
joined_members INT NOT NULL,
|
||||
invited_members INT NOT NULL,
|
||||
left_members INT NOT NULL,
|
||||
banned_members INT NOT NULL,
|
||||
|
||||
-- If initial stats regen is still to be performed: NULL
|
||||
-- If initial stats regen has been performed: the maximum delta stream
|
||||
-- position that this row takes into account.
|
||||
completed_delta_stream_id BIGINT
|
||||
);
|
||||
|
||||
|
||||
-- represents HISTORICAL room statistics for a room
|
||||
CREATE TABLE IF NOT EXISTS room_stats_historical (
|
||||
room_id TEXT NOT NULL,
|
||||
-- These stats cover the time from (end_ts - bucket_size)...end_ts (in ms).
|
||||
-- Note that end_ts is quantised.
|
||||
end_ts BIGINT NOT NULL,
|
||||
bucket_size INT NOT NULL,
|
||||
|
||||
current_state_events INT NOT NULL,
|
||||
total_events INT NOT NULL,
|
||||
total_event_bytes BIGINT NOT NULL,
|
||||
joined_members INT NOT NULL,
|
||||
invited_members INT NOT NULL,
|
||||
left_members INT NOT NULL,
|
||||
banned_members INT NOT NULL,
|
||||
|
||||
PRIMARY KEY (room_id, end_ts)
|
||||
);
|
||||
|
||||
-- We use this index to speed up deletion of ancient room stats.
|
||||
CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical (end_ts);
|
||||
|
||||
-- We don't need an index on (room_id, end_ts) because PRIMARY KEY sorts that
|
||||
-- out for us. (We would want it to review stats for a particular room.)
|
||||
|
||||
|
||||
-- represents PRESENT statistics for a user
|
||||
-- only holds absolute fields
|
||||
CREATE TABLE IF NOT EXISTS user_stats_current (
|
||||
user_id TEXT NOT NULL PRIMARY KEY,
|
||||
|
||||
public_rooms INT NOT NULL,
|
||||
private_rooms INT NOT NULL,
|
||||
|
||||
-- If initial stats regen is still to be performed: NULL
|
||||
-- If initial stats regen has been performed: the maximum delta stream
|
||||
-- position that this row takes into account.
|
||||
completed_delta_stream_id BIGINT
|
||||
);
|
||||
|
||||
-- represents HISTORICAL statistics for a user
|
||||
CREATE TABLE IF NOT EXISTS user_stats_historical (
|
||||
user_id TEXT NOT NULL,
|
||||
end_ts BIGINT NOT NULL,
|
||||
bucket_size INT NOT NULL,
|
||||
|
||||
public_rooms INT NOT NULL,
|
||||
private_rooms INT NOT NULL,
|
||||
|
||||
PRIMARY KEY (user_id, end_ts)
|
||||
);
|
||||
|
||||
-- We use this index to speed up deletion of ancient user stats.
|
||||
CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical (end_ts);
|
||||
|
||||
-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that
|
||||
-- out for us. (We would want it to review stats for a particular user.)
|
||||
|
||||
-- Also rename room_state to room_stats_state to make its ownership clear.
|
||||
ALTER TABLE room_state RENAME TO room_stats_state;
|
||||
@@ -0,0 +1,24 @@
|
||||
/* Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- These partial indices helps us with finding incomplete stats row
|
||||
CREATE INDEX IF NOT EXISTS room_stats_not_complete
|
||||
ON room_stats_current (room_id)
|
||||
WHERE completed_delta_stream_id IS NULL;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS user_stats_not_complete
|
||||
ON user_stats_current (user_id)
|
||||
WHERE completed_delta_stream_id IS NULL;
|
||||
|
||||
27
synapse/storage/schema/delta/56/stats_separated2.sql.sqlite
Normal file
27
synapse/storage/schema/delta/56/stats_separated2.sql.sqlite
Normal file
@@ -0,0 +1,27 @@
|
||||
/* Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- even though SQLite >= 3.8 can support partial indices, we won't enable
|
||||
-- them, in case the SQLite database may be later used on another system.
|
||||
-- It's also the case that SQLite is only likely to be used in small
|
||||
-- deployments or testing, where the optimisations gained by use of a
|
||||
-- partial index are not a big concern.
|
||||
|
||||
CREATE INDEX IF NOT EXISTS room_stats_not_complete
|
||||
ON room_stats_current (completed_delta_stream_id, room_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS user_stats_not_complete
|
||||
ON user_stats_current (completed_delta_stream_id, user_id);
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2018, 2019 New Vector Ltd
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -14,17 +15,20 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from itertools import chain
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import DeferredLock
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.storage.prepare_database import get_statements
|
||||
from synapse.storage import PostgresEngine
|
||||
from synapse.storage.state_deltas import StateDeltasStore
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# these fields track absolutes (e.g. total number of rooms on the server)
|
||||
# You can think of these as Prometheus Gauges.
|
||||
# You can draw these stats on a line graph.
|
||||
# Example: number of users in a room
|
||||
ABSOLUTE_STATS_FIELDS = {
|
||||
"room": (
|
||||
"current_state_events",
|
||||
@@ -32,14 +36,18 @@ ABSOLUTE_STATS_FIELDS = {
|
||||
"invited_members",
|
||||
"left_members",
|
||||
"banned_members",
|
||||
"state_events",
|
||||
"total_events",
|
||||
"total_event_bytes",
|
||||
),
|
||||
"user": ("public_rooms", "private_rooms"),
|
||||
}
|
||||
|
||||
TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
|
||||
# these fields are per-timeslice and so should be reset to 0 upon a new slice
|
||||
# You can draw these stats on a histogram.
|
||||
# Example: number of events sent locally during a time slice
|
||||
PER_SLICE_FIELDS = {"room": (), "user": ()}
|
||||
|
||||
TEMP_TABLE = "_temp_populate_stats"
|
||||
TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
|
||||
|
||||
|
||||
class StatsStore(StateDeltasStore):
|
||||
@@ -51,291 +59,111 @@ class StatsStore(StateDeltasStore):
|
||||
self.stats_enabled = hs.config.stats_enabled
|
||||
self.stats_bucket_size = hs.config.stats_bucket_size
|
||||
|
||||
self.register_background_update_handler(
|
||||
"populate_stats_createtables", self._populate_stats_createtables
|
||||
)
|
||||
self.register_background_update_handler(
|
||||
"populate_stats_process_rooms", self._populate_stats_process_rooms
|
||||
)
|
||||
self.register_background_update_handler(
|
||||
"populate_stats_cleanup", self._populate_stats_cleanup
|
||||
)
|
||||
self.stats_delta_processing_lock = DeferredLock()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _populate_stats_createtables(self, progress, batch_size):
|
||||
self.register_noop_background_update("populate_stats_createtables")
|
||||
self.register_noop_background_update("populate_stats_process_rooms")
|
||||
self.register_noop_background_update("populate_stats_cleanup")
|
||||
|
||||
if not self.stats_enabled:
|
||||
yield self._end_background_update("populate_stats_createtables")
|
||||
return 1
|
||||
|
||||
# Get all the rooms that we want to process.
|
||||
def _make_staging_area(txn):
|
||||
# Create the temporary tables
|
||||
stmts = get_statements(
|
||||
"""
|
||||
-- We just recreate the table, we'll be reinserting the
|
||||
-- correct entries again later anyway.
|
||||
DROP TABLE IF EXISTS {temp}_rooms;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS {temp}_rooms(
|
||||
room_id TEXT NOT NULL,
|
||||
events BIGINT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX {temp}_rooms_events
|
||||
ON {temp}_rooms(events);
|
||||
CREATE INDEX {temp}_rooms_id
|
||||
ON {temp}_rooms(room_id);
|
||||
""".format(
|
||||
temp=TEMP_TABLE
|
||||
).splitlines()
|
||||
)
|
||||
|
||||
for statement in stmts:
|
||||
txn.execute(statement)
|
||||
|
||||
sql = (
|
||||
"CREATE TABLE IF NOT EXISTS "
|
||||
+ TEMP_TABLE
|
||||
+ "_position(position TEXT NOT NULL)"
|
||||
)
|
||||
txn.execute(sql)
|
||||
|
||||
# Get rooms we want to process from the database, only adding
|
||||
# those that we haven't (i.e. those not in room_stats_earliest_token)
|
||||
sql = """
|
||||
INSERT INTO %s_rooms (room_id, events)
|
||||
SELECT c.room_id, count(*) FROM current_state_events AS c
|
||||
LEFT JOIN room_stats_earliest_token AS t USING (room_id)
|
||||
WHERE t.room_id IS NULL
|
||||
GROUP BY c.room_id
|
||||
""" % (
|
||||
TEMP_TABLE,
|
||||
)
|
||||
txn.execute(sql)
|
||||
|
||||
new_pos = yield self.get_max_stream_id_in_current_state_deltas()
|
||||
yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
|
||||
yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
|
||||
self.get_earliest_token_for_room_stats.invalidate_all()
|
||||
|
||||
yield self._end_background_update("populate_stats_createtables")
|
||||
return 1
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _populate_stats_cleanup(self, progress, batch_size):
|
||||
def quantise_stats_time(self, ts):
|
||||
"""
|
||||
Update the user directory stream position, then clean up the old tables.
|
||||
Quantises a timestamp to be a multiple of the bucket size.
|
||||
|
||||
Args:
|
||||
ts (int): the timestamp to quantise, in milliseconds since the Unix
|
||||
Epoch
|
||||
|
||||
Returns:
|
||||
int: a timestamp which
|
||||
- is divisible by the bucket size;
|
||||
- is no later than `ts`; and
|
||||
- is the largest such timestamp.
|
||||
"""
|
||||
if not self.stats_enabled:
|
||||
yield self._end_background_update("populate_stats_cleanup")
|
||||
return 1
|
||||
return (ts // self.stats_bucket_size) * self.stats_bucket_size
|
||||
|
||||
position = yield self._simple_select_one_onecol(
|
||||
TEMP_TABLE + "_position", None, "position"
|
||||
)
|
||||
yield self.update_stats_stream_pos(position)
|
||||
|
||||
def _delete_staging_area(txn):
|
||||
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
|
||||
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
|
||||
|
||||
yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
|
||||
|
||||
yield self._end_background_update("populate_stats_cleanup")
|
||||
return 1
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _populate_stats_process_rooms(self, progress, batch_size):
|
||||
|
||||
if not self.stats_enabled:
|
||||
yield self._end_background_update("populate_stats_process_rooms")
|
||||
return 1
|
||||
|
||||
# If we don't have progress filed, delete everything.
|
||||
if not progress:
|
||||
yield self.delete_all_stats()
|
||||
|
||||
def _get_next_batch(txn):
|
||||
# Only fetch 250 rooms, so we don't fetch too many at once, even
|
||||
# if those 250 rooms have less than batch_size state events.
|
||||
sql = """
|
||||
SELECT room_id, events FROM %s_rooms
|
||||
ORDER BY events DESC
|
||||
LIMIT 250
|
||||
""" % (
|
||||
TEMP_TABLE,
|
||||
)
|
||||
txn.execute(sql)
|
||||
rooms_to_work_on = txn.fetchall()
|
||||
|
||||
if not rooms_to_work_on:
|
||||
return None
|
||||
|
||||
# Get how many are left to process, so we can give status on how
|
||||
# far we are in processing
|
||||
txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
|
||||
progress["remaining"] = txn.fetchone()[0]
|
||||
|
||||
return rooms_to_work_on
|
||||
|
||||
rooms_to_work_on = yield self.runInteraction(
|
||||
"populate_stats_temp_read", _get_next_batch
|
||||
)
|
||||
|
||||
# No more rooms -- complete the transaction.
|
||||
if not rooms_to_work_on:
|
||||
yield self._end_background_update("populate_stats_process_rooms")
|
||||
return 1
|
||||
|
||||
logger.info(
|
||||
"Processing the next %d rooms of %d remaining",
|
||||
len(rooms_to_work_on),
|
||||
progress["remaining"],
|
||||
)
|
||||
|
||||
# Number of state events we've processed by going through each room
|
||||
processed_event_count = 0
|
||||
|
||||
for room_id, event_count in rooms_to_work_on:
|
||||
|
||||
current_state_ids = yield self.get_current_state_ids(room_id)
|
||||
|
||||
join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
|
||||
history_visibility_id = current_state_ids.get(
|
||||
(EventTypes.RoomHistoryVisibility, "")
|
||||
)
|
||||
encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
|
||||
name_id = current_state_ids.get((EventTypes.Name, ""))
|
||||
topic_id = current_state_ids.get((EventTypes.Topic, ""))
|
||||
avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
|
||||
canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
|
||||
|
||||
event_ids = [
|
||||
join_rules_id,
|
||||
history_visibility_id,
|
||||
encryption_id,
|
||||
name_id,
|
||||
topic_id,
|
||||
avatar_id,
|
||||
canonical_alias_id,
|
||||
]
|
||||
|
||||
state_events = yield self.get_events(
|
||||
[ev for ev in event_ids if ev is not None]
|
||||
)
|
||||
|
||||
def _get_or_none(event_id, arg):
|
||||
event = state_events.get(event_id)
|
||||
if event:
|
||||
return event.content.get(arg)
|
||||
return None
|
||||
|
||||
yield self.update_room_state(
|
||||
room_id,
|
||||
{
|
||||
"join_rules": _get_or_none(join_rules_id, "join_rule"),
|
||||
"history_visibility": _get_or_none(
|
||||
history_visibility_id, "history_visibility"
|
||||
),
|
||||
"encryption": _get_or_none(encryption_id, "algorithm"),
|
||||
"name": _get_or_none(name_id, "name"),
|
||||
"topic": _get_or_none(topic_id, "topic"),
|
||||
"avatar": _get_or_none(avatar_id, "url"),
|
||||
"canonical_alias": _get_or_none(canonical_alias_id, "alias"),
|
||||
},
|
||||
)
|
||||
|
||||
now = self.hs.get_reactor().seconds()
|
||||
|
||||
# quantise time to the nearest bucket
|
||||
now = (now // self.stats_bucket_size) * self.stats_bucket_size
|
||||
|
||||
def _fetch_data(txn):
|
||||
|
||||
# Get the current token of the room
|
||||
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
|
||||
|
||||
current_state_events = len(current_state_ids)
|
||||
|
||||
membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
|
||||
|
||||
total_state_events = self._get_total_state_event_counts_txn(
|
||||
txn, room_id
|
||||
)
|
||||
|
||||
self._update_stats_txn(
|
||||
txn,
|
||||
"room",
|
||||
room_id,
|
||||
now,
|
||||
{
|
||||
"bucket_size": self.stats_bucket_size,
|
||||
"current_state_events": current_state_events,
|
||||
"joined_members": membership_counts.get(Membership.JOIN, 0),
|
||||
"invited_members": membership_counts.get(Membership.INVITE, 0),
|
||||
"left_members": membership_counts.get(Membership.LEAVE, 0),
|
||||
"banned_members": membership_counts.get(Membership.BAN, 0),
|
||||
"state_events": total_state_events,
|
||||
},
|
||||
)
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
"room_stats_earliest_token",
|
||||
{"room_id": room_id, "token": current_token},
|
||||
)
|
||||
|
||||
# We've finished a room. Delete it from the table.
|
||||
self._simple_delete_one_txn(
|
||||
txn, TEMP_TABLE + "_rooms", {"room_id": room_id}
|
||||
)
|
||||
|
||||
yield self.runInteraction("update_room_stats", _fetch_data)
|
||||
|
||||
# Update the remaining counter.
|
||||
progress["remaining"] -= 1
|
||||
yield self.runInteraction(
|
||||
"populate_stats",
|
||||
self._background_update_progress_txn,
|
||||
"populate_stats_process_rooms",
|
||||
progress,
|
||||
)
|
||||
|
||||
processed_event_count += event_count
|
||||
|
||||
if processed_event_count > batch_size:
|
||||
# Don't process any more rooms, we've hit our batch size.
|
||||
return processed_event_count
|
||||
|
||||
return processed_event_count
|
||||
|
||||
def delete_all_stats(self):
|
||||
def get_stats_positions(self, for_initial_processor=False):
|
||||
"""
|
||||
Delete all statistics records.
|
||||
Returns the stats processor positions.
|
||||
|
||||
Args:
|
||||
for_initial_processor (bool, optional): If true, returns the position
|
||||
promised by the latest stats regeneration, rather than the current
|
||||
incremental processor's position.
|
||||
Otherwise (if false), return the incremental processor's position.
|
||||
|
||||
Returns (dict):
|
||||
Dict containing :-
|
||||
state_delta_stream_id: stream_id of last-processed state delta
|
||||
total_events_min_stream_ordering: stream_ordering of latest-processed
|
||||
backfilled event, in the context of total_events counting.
|
||||
total_events_max_stream_ordering: stream_ordering of latest-processed
|
||||
non-backfilled event, in the context of total_events counting.
|
||||
"""
|
||||
|
||||
def _delete_all_stats_txn(txn):
|
||||
txn.execute("DELETE FROM room_state")
|
||||
txn.execute("DELETE FROM room_stats")
|
||||
txn.execute("DELETE FROM room_stats_earliest_token")
|
||||
txn.execute("DELETE FROM user_stats")
|
||||
|
||||
return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
|
||||
|
||||
def get_stats_stream_pos(self):
|
||||
return self._simple_select_one_onecol(
|
||||
table="stats_stream_pos",
|
||||
keyvalues={},
|
||||
retcol="stream_id",
|
||||
desc="stats_stream_pos",
|
||||
return self._simple_select_one(
|
||||
table="stats_incremental_position",
|
||||
keyvalues={"is_background_contract": for_initial_processor},
|
||||
retcols=(
|
||||
"state_delta_stream_id",
|
||||
"total_events_min_stream_ordering",
|
||||
"total_events_max_stream_ordering",
|
||||
),
|
||||
desc="stats_incremental_position",
|
||||
)
|
||||
|
||||
def update_stats_stream_pos(self, stream_id):
|
||||
def _get_stats_positions_txn(self, txn, for_initial_processor=False):
|
||||
"""
|
||||
See L{get_stats_positions}.
|
||||
|
||||
Args:
|
||||
txn (cursor): Database cursor
|
||||
"""
|
||||
return self._simple_select_one_txn(
|
||||
txn=txn,
|
||||
table="stats_incremental_position",
|
||||
keyvalues={"is_background_contract": for_initial_processor},
|
||||
retcols=(
|
||||
"state_delta_stream_id",
|
||||
"total_events_min_stream_ordering",
|
||||
"total_events_max_stream_ordering",
|
||||
),
|
||||
)
|
||||
|
||||
def update_stats_positions(self, positions, for_initial_processor=False):
|
||||
"""
|
||||
Updates the stats processor positions.
|
||||
|
||||
Args:
|
||||
positions: See L{get_stats_positions}
|
||||
for_initial_processor: See L{get_stats_positions}
|
||||
"""
|
||||
if positions is None:
|
||||
positions = {
|
||||
"state_delta_stream_id": None,
|
||||
"total_events_min_stream_ordering": None,
|
||||
"total_events_max_stream_ordering": None,
|
||||
}
|
||||
return self._simple_update_one(
|
||||
table="stats_stream_pos",
|
||||
keyvalues={},
|
||||
updatevalues={"stream_id": stream_id},
|
||||
desc="update_stats_stream_pos",
|
||||
table="stats_incremental_position",
|
||||
keyvalues={"is_background_contract": for_initial_processor},
|
||||
updatevalues=positions,
|
||||
desc="update_stats_incremental_position",
|
||||
)
|
||||
|
||||
def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False):
|
||||
"""
|
||||
See L{update_stats_positions}
|
||||
"""
|
||||
if positions is None:
|
||||
positions = {
|
||||
"state_delta_stream_id": None,
|
||||
"total_events_min_stream_ordering": None,
|
||||
"total_events_max_stream_ordering": None,
|
||||
}
|
||||
return self._simple_update_one_txn(
|
||||
txn,
|
||||
table="stats_incremental_position",
|
||||
keyvalues={"is_background_contract": for_initial_processor},
|
||||
updatevalues=positions,
|
||||
)
|
||||
|
||||
def update_room_state(self, room_id, fields):
|
||||
@@ -361,42 +189,14 @@ class StatsStore(StateDeltasStore):
|
||||
fields[col] = None
|
||||
|
||||
return self._simple_upsert(
|
||||
table="room_state",
|
||||
table="room_stats_state",
|
||||
keyvalues={"room_id": room_id},
|
||||
values=fields,
|
||||
desc="update_room_state",
|
||||
)
|
||||
|
||||
def get_deltas_for_room(self, room_id, start, size=100):
|
||||
"""
|
||||
Get statistics deltas for a given room.
|
||||
|
||||
Args:
|
||||
room_id (str)
|
||||
start (int): Pagination start. Number of entries, not timestamp.
|
||||
size (int): How many entries to return.
|
||||
|
||||
Returns:
|
||||
Deferred[list[dict]], where the dict has the keys of
|
||||
ABSOLUTE_STATS_FIELDS["room"] and "ts".
|
||||
"""
|
||||
return self._simple_select_list_paginate(
|
||||
"room_stats",
|
||||
{"room_id": room_id},
|
||||
"ts",
|
||||
start,
|
||||
size,
|
||||
retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
|
||||
order_direction="DESC",
|
||||
)
|
||||
|
||||
def get_all_room_state(self):
|
||||
return self._simple_select_list(
|
||||
"room_state", None, retcols=("name", "topic", "canonical_alias")
|
||||
)
|
||||
|
||||
@cached()
|
||||
def get_earliest_token_for_room_stats(self, room_id):
|
||||
def get_earliest_token_for_stats(self, stats_type, id):
|
||||
"""
|
||||
Fetch the "earliest token". This is used by the room stats delta
|
||||
processor to ignore deltas that have been processed between the
|
||||
@@ -406,79 +206,410 @@ class StatsStore(StateDeltasStore):
|
||||
Returns:
|
||||
Deferred[int]
|
||||
"""
|
||||
table, id_col = TYPE_TO_TABLE[stats_type]
|
||||
|
||||
return self._simple_select_one_onecol(
|
||||
"room_stats_earliest_token",
|
||||
{"room_id": room_id},
|
||||
retcol="token",
|
||||
"%s_current" % (table,),
|
||||
{id_col: id},
|
||||
retcol="completed_delta_stream_id",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
def update_stats(self, stats_type, stats_id, ts, fields):
|
||||
table, id_col = TYPE_TO_ROOM[stats_type]
|
||||
return self._simple_upsert(
|
||||
table=table,
|
||||
keyvalues={id_col: stats_id, "ts": ts},
|
||||
values=fields,
|
||||
desc="update_stats",
|
||||
def update_stats_delta(
|
||||
self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
|
||||
):
|
||||
"""
|
||||
Updates the statistics for a subject, with a delta (difference/relative
|
||||
change).
|
||||
|
||||
Args:
|
||||
ts (int): timestamp of the change
|
||||
stats_type (str): "room" or "user" – the kind of subject
|
||||
stats_id (str): the subject's ID (room ID or user ID)
|
||||
fields (dict[str, int]): Deltas of stats values.
|
||||
complete_with_stream_id (int, optional):
|
||||
If supplied, converts an incomplete row into a complete row,
|
||||
with the supplied stream_id marked as the stream_id where the
|
||||
row was completed.
|
||||
"""
|
||||
|
||||
return self.runInteraction(
|
||||
"update_stats_delta",
|
||||
self._update_stats_delta_txn,
|
||||
ts,
|
||||
stats_type,
|
||||
stats_id,
|
||||
fields,
|
||||
complete_with_stream_id=complete_with_stream_id,
|
||||
)
|
||||
|
||||
def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
|
||||
table, id_col = TYPE_TO_ROOM[stats_type]
|
||||
return self._simple_upsert_txn(
|
||||
txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
|
||||
)
|
||||
def _update_stats_delta_txn(
|
||||
self,
|
||||
txn,
|
||||
ts,
|
||||
stats_type,
|
||||
stats_id,
|
||||
fields,
|
||||
complete_with_stream_id=None,
|
||||
absolute_field_overrides=None,
|
||||
):
|
||||
"""
|
||||
See L{update_stats_delta}
|
||||
Additional Args:
|
||||
absolute_field_overrides (dict[str, int]): Current stats values
|
||||
(i.e. not deltas) of absolute fields.
|
||||
Does not work with per-slice fields.
|
||||
"""
|
||||
|
||||
def update_stats_delta(self, ts, stats_type, stats_id, field, value):
|
||||
def _update_stats_delta(txn):
|
||||
table, id_col = TYPE_TO_ROOM[stats_type]
|
||||
if absolute_field_overrides is None:
|
||||
absolute_field_overrides = {}
|
||||
|
||||
sql = (
|
||||
"SELECT * FROM %s"
|
||||
" WHERE %s=? and ts=("
|
||||
" SELECT MAX(ts) FROM %s"
|
||||
" WHERE %s=?"
|
||||
")"
|
||||
) % (table, id_col, table, id_col)
|
||||
txn.execute(sql, (stats_id, stats_id))
|
||||
rows = self.cursor_to_dict(txn)
|
||||
if len(rows) == 0:
|
||||
# silently skip as we don't have anything to apply a delta to yet.
|
||||
# this tries to minimise any race between the initial sync and
|
||||
# subsequent deltas arriving.
|
||||
return
|
||||
table, id_col = TYPE_TO_TABLE[stats_type]
|
||||
|
||||
current_ts = ts
|
||||
latest_ts = rows[0]["ts"]
|
||||
if current_ts < latest_ts:
|
||||
# This one is in the past, but we're just encountering it now.
|
||||
# Mark it as part of the current bucket.
|
||||
current_ts = latest_ts
|
||||
elif ts != latest_ts:
|
||||
# we have to copy our absolute counters over to the new entry.
|
||||
values = {
|
||||
key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
|
||||
}
|
||||
values[id_col] = stats_id
|
||||
values["ts"] = ts
|
||||
values["bucket_size"] = self.stats_bucket_size
|
||||
quantised_ts = self.quantise_stats_time(int(ts))
|
||||
end_ts = quantised_ts + self.stats_bucket_size
|
||||
|
||||
self._simple_insert_txn(txn, table=table, values=values)
|
||||
|
||||
# actually update the new value
|
||||
if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
|
||||
self._simple_update_txn(
|
||||
txn,
|
||||
table=table,
|
||||
keyvalues={id_col: stats_id, "ts": current_ts},
|
||||
updatevalues={field: value},
|
||||
abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
|
||||
slice_field_names = PER_SLICE_FIELDS[stats_type]
|
||||
for field in chain(fields.keys(), absolute_field_overrides.keys()):
|
||||
if field not in abs_field_names and field not in slice_field_names:
|
||||
# guard against potential SQL injection dodginess
|
||||
raise ValueError(
|
||||
"%s is not a recognised field"
|
||||
" for stats type %s" % (field, stats_type)
|
||||
)
|
||||
|
||||
# only absolute stats fields are tracked in the `_current` stats tables,
|
||||
# so those are the only ones that we process deltas for when
|
||||
# we upsert against the `_current` table.
|
||||
|
||||
# This calculates the deltas (`field = field + ?` values)
|
||||
# for absolute fields,
|
||||
# * defaulting to 0 if not specified
|
||||
# (required for the INSERT part of upserting to work)
|
||||
# * omitting overrides specified in `absolute_field_overrides`
|
||||
deltas_of_absolute_fields = {
|
||||
key: fields.get(key, 0)
|
||||
for key in abs_field_names
|
||||
if key not in absolute_field_overrides
|
||||
}
|
||||
|
||||
if complete_with_stream_id is not None:
|
||||
absolute_field_overrides = absolute_field_overrides.copy()
|
||||
absolute_field_overrides[
|
||||
"completed_delta_stream_id"
|
||||
] = complete_with_stream_id
|
||||
|
||||
# first upsert the `_current` table
|
||||
self._upsert_with_additive_relatives_txn(
|
||||
txn=txn,
|
||||
table=table + "_current",
|
||||
keyvalues={id_col: stats_id},
|
||||
absolutes=absolute_field_overrides,
|
||||
additive_relatives=deltas_of_absolute_fields,
|
||||
)
|
||||
|
||||
if self.has_completed_background_updates():
|
||||
# TODO want to check specifically for stats regenerator, not all
|
||||
# background updates…
|
||||
# then upsert the `_historical` table.
|
||||
# we don't support absolute_fields for per-slice fields as it makes
|
||||
# no sense.
|
||||
per_slice_additive_relatives = {
|
||||
key: fields.get(key, 0) for key in slice_field_names
|
||||
}
|
||||
self._upsert_copy_from_table_with_additive_relatives_txn(
|
||||
txn=txn,
|
||||
into_table=table + "_historical",
|
||||
keyvalues={id_col: stats_id},
|
||||
extra_dst_insvalues={"bucket_size": self.stats_bucket_size},
|
||||
extra_dst_keyvalues={"end_ts": end_ts},
|
||||
additive_relatives=per_slice_additive_relatives,
|
||||
src_table=table + "_current",
|
||||
copy_columns=abs_field_names,
|
||||
additional_where=" AND completed_delta_stream_id IS NOT NULL",
|
||||
)
|
||||
|
||||
def _upsert_with_additive_relatives_txn(
|
||||
self, txn, table, keyvalues, absolutes, additive_relatives
|
||||
):
|
||||
"""Used to update values in the stats tables.
|
||||
|
||||
Args:
|
||||
txn: Transaction
|
||||
table (str): Table name
|
||||
keyvalues (dict[str, any]): Row-identifying key values
|
||||
absolutes (dict[str, any]): Absolute (set) fields
|
||||
additive_relatives (dict[str, int]): Fields that will be added onto
|
||||
if existing row present.
|
||||
"""
|
||||
if self.database_engine.can_native_upsert:
|
||||
absolute_updates = [
|
||||
"%(field)s = EXCLUDED.%(field)s" % {"field": field}
|
||||
for field in absolutes.keys()
|
||||
]
|
||||
|
||||
relative_updates = [
|
||||
"%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
|
||||
% {"table": table, "field": field}
|
||||
for field in additive_relatives.keys()
|
||||
]
|
||||
|
||||
insert_cols = []
|
||||
qargs = []
|
||||
|
||||
for (key, val) in chain(
|
||||
keyvalues.items(), absolutes.items(), additive_relatives.items()
|
||||
):
|
||||
insert_cols.append(key)
|
||||
qargs.append(val)
|
||||
|
||||
sql = """
|
||||
INSERT INTO %(table)s (%(insert_cols_cs)s)
|
||||
VALUES (%(insert_vals_qs)s)
|
||||
ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
|
||||
""" % {
|
||||
"table": table,
|
||||
"insert_cols_cs": ", ".join(insert_cols),
|
||||
"insert_vals_qs": ", ".join(
|
||||
["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
|
||||
),
|
||||
"key_columns": ", ".join(keyvalues),
|
||||
"updates": ", ".join(chain(absolute_updates, relative_updates)),
|
||||
}
|
||||
|
||||
txn.execute(sql, qargs)
|
||||
else:
|
||||
self.database_engine.lock_table(txn, table)
|
||||
retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
|
||||
current_row = self._simple_select_one_txn(
|
||||
txn, table, keyvalues, retcols, allow_none=True
|
||||
)
|
||||
if current_row is None:
|
||||
merged_dict = {**keyvalues, **absolutes, **additive_relatives}
|
||||
self._simple_insert_txn(txn, table, merged_dict)
|
||||
else:
|
||||
sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
|
||||
table,
|
||||
field,
|
||||
field,
|
||||
id_col,
|
||||
)
|
||||
txn.execute(sql, (value, stats_id, current_ts))
|
||||
for (key, val) in additive_relatives.items():
|
||||
current_row[key] += val
|
||||
current_row.update(absolutes)
|
||||
self._simple_update_one_txn(txn, table, keyvalues, current_row)
|
||||
|
||||
return self.runInteraction("update_stats_delta", _update_stats_delta)
|
||||
def _upsert_copy_from_table_with_additive_relatives_txn(
|
||||
self,
|
||||
txn,
|
||||
into_table,
|
||||
keyvalues,
|
||||
extra_dst_keyvalues,
|
||||
extra_dst_insvalues,
|
||||
additive_relatives,
|
||||
src_table,
|
||||
copy_columns,
|
||||
additional_where="",
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
txn: Transaction
|
||||
into_table (str): The destination table to UPSERT the row into
|
||||
keyvalues (dict[str, any]): Row-identifying key values
|
||||
extra_dst_keyvalues (dict[str, any]): Additional keyvalues
|
||||
for `into_table`.
|
||||
extra_dst_insvalues (dict[str, any]): Additional values to insert
|
||||
on new row creation for `into_table`.
|
||||
additive_relatives (dict[str, any]): Fields that will be added onto
|
||||
if existing row present. (Must be disjoint from copy_columns.)
|
||||
src_table (str): The source table to copy from
|
||||
copy_columns (iterable[str]): The list of columns to copy
|
||||
additional_where (str): Additional SQL for where (prefix with AND
|
||||
if using).
|
||||
"""
|
||||
if self.database_engine.can_native_upsert:
|
||||
ins_columns = chain(
|
||||
keyvalues,
|
||||
copy_columns,
|
||||
additive_relatives,
|
||||
extra_dst_keyvalues,
|
||||
extra_dst_insvalues,
|
||||
)
|
||||
sel_exprs = chain(
|
||||
keyvalues,
|
||||
copy_columns,
|
||||
(
|
||||
"?"
|
||||
for _ in chain(
|
||||
additive_relatives, extra_dst_keyvalues, extra_dst_insvalues
|
||||
)
|
||||
),
|
||||
)
|
||||
keyvalues_where = ("%s = ?" % f for f in keyvalues)
|
||||
|
||||
sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
|
||||
sets_ar = (
|
||||
"%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f)
|
||||
for f in additive_relatives
|
||||
)
|
||||
|
||||
sql = """
|
||||
INSERT INTO %(into_table)s (%(ins_columns)s)
|
||||
SELECT %(sel_exprs)s
|
||||
FROM %(src_table)s
|
||||
WHERE %(keyvalues_where)s %(additional_where)s
|
||||
ON CONFLICT (%(keyvalues)s)
|
||||
DO UPDATE SET %(sets)s
|
||||
""" % {
|
||||
"into_table": into_table,
|
||||
"ins_columns": ", ".join(ins_columns),
|
||||
"sel_exprs": ", ".join(sel_exprs),
|
||||
"keyvalues_where": " AND ".join(keyvalues_where),
|
||||
"src_table": src_table,
|
||||
"keyvalues": ", ".join(
|
||||
chain(keyvalues.keys(), extra_dst_keyvalues.keys())
|
||||
),
|
||||
"sets": ", ".join(chain(sets_cc, sets_ar)),
|
||||
"additional_where": additional_where,
|
||||
}
|
||||
|
||||
qargs = list(
|
||||
chain(
|
||||
additive_relatives.values(),
|
||||
extra_dst_keyvalues.values(),
|
||||
extra_dst_insvalues.values(),
|
||||
keyvalues.values(),
|
||||
)
|
||||
)
|
||||
txn.execute(sql, qargs)
|
||||
else:
|
||||
self.database_engine.lock_table(txn, into_table)
|
||||
src_row = self._simple_select_one_txn(
|
||||
txn, src_table, keyvalues, copy_columns
|
||||
)
|
||||
dest_current_row = self._simple_select_one_txn(
|
||||
txn,
|
||||
into_table,
|
||||
keyvalues,
|
||||
retcols=list(chain(additive_relatives.keys(), copy_columns)),
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
if dest_current_row is None:
|
||||
merged_dict = {
|
||||
**keyvalues,
|
||||
**extra_dst_keyvalues,
|
||||
**extra_dst_insvalues,
|
||||
**src_row,
|
||||
**additive_relatives,
|
||||
}
|
||||
self._simple_insert_txn(txn, into_table, merged_dict)
|
||||
else:
|
||||
for (key, val) in additive_relatives.items():
|
||||
src_row[key] = dest_current_row[key] + val
|
||||
self._simple_update_txn(txn, into_table, keyvalues, src_row)
|
||||
|
||||
def incremental_update_room_total_events_and_bytes(self, in_positions):
|
||||
"""
|
||||
Counts the number of events and total event bytes per-room and then adds
|
||||
these to the respective total_events and total_event_bytes room counts.
|
||||
|
||||
Args:
|
||||
in_positions (dict): Positions,
|
||||
as retrieved from L{get_stats_positions}.
|
||||
|
||||
Returns (Deferred[tuple[dict, bool]]):
|
||||
First element (dict):
|
||||
The new positions. Note that this is for reference only –
|
||||
the new positions WILL be committed by this function.
|
||||
Second element (bool):
|
||||
true iff there was a change to the positions, false otherwise
|
||||
"""
|
||||
|
||||
def incremental_update_total_events_and_bytes_txn(txn):
|
||||
positions = in_positions.copy()
|
||||
|
||||
max_pos = self.get_room_max_stream_ordering()
|
||||
min_pos = self.get_room_min_stream_ordering()
|
||||
self.update_total_event_and_bytes_count_between_txn(
|
||||
txn,
|
||||
low_pos=positions["total_events_max_stream_ordering"],
|
||||
high_pos=max_pos,
|
||||
)
|
||||
|
||||
self.update_total_event_and_bytes_count_between_txn(
|
||||
txn,
|
||||
low_pos=min_pos,
|
||||
high_pos=positions["total_events_min_stream_ordering"],
|
||||
)
|
||||
|
||||
if (
|
||||
positions["total_events_max_stream_ordering"] != max_pos
|
||||
or positions["total_events_min_stream_ordering"] != min_pos
|
||||
):
|
||||
positions["total_events_max_stream_ordering"] = max_pos
|
||||
positions["total_events_min_stream_ordering"] = min_pos
|
||||
|
||||
self._update_stats_positions_txn(txn, positions)
|
||||
|
||||
return positions, True
|
||||
else:
|
||||
return positions, False
|
||||
|
||||
return self.runInteraction(
|
||||
"stats_incremental_total_events_and_bytes",
|
||||
incremental_update_total_events_and_bytes_txn,
|
||||
)
|
||||
|
||||
def update_total_event_and_bytes_count_between_txn(self, txn, low_pos, high_pos):
|
||||
"""
|
||||
Updates the total_events and total_event_bytes counts for rooms,
|
||||
in a range of stream_orderings.
|
||||
|
||||
Inclusivity of low_pos and high_pos is dependent upon their signs.
|
||||
This makes it intuitive to use this function for both backfilled
|
||||
and non-backfilled events.
|
||||
|
||||
Examples:
|
||||
(low, high) → (kind)
|
||||
(3, 7) → 3 <git … <= 7 (normal-filled; low already processed before)
|
||||
(-4, -2) → -4 <= … < -2 (backfilled; high already processed before)
|
||||
(-7, 7) → -7 <= … <= 7 (both)
|
||||
|
||||
Args:
|
||||
txn: Database transaction.
|
||||
low_pos: Low stream ordering
|
||||
high_pos: High stream ordering
|
||||
"""
|
||||
|
||||
if low_pos >= high_pos:
|
||||
# nothing to do here.
|
||||
return
|
||||
|
||||
now = self.hs.clock.time_msec()
|
||||
|
||||
# we choose comparators based on the signs
|
||||
low_comparator = "<=" if low_pos < 0 else "<"
|
||||
high_comparator = "<" if high_pos < 0 else "<="
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
new_bytes_expression = "OCTET_LENGTH(json)"
|
||||
else:
|
||||
new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
|
||||
|
||||
sql = """
|
||||
SELECT room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
|
||||
FROM events INNER JOIN event_json USING (event_id)
|
||||
WHERE ? %s stream_ordering AND stream_ordering %s ?
|
||||
GROUP BY room_id
|
||||
""" % (
|
||||
low_comparator,
|
||||
high_comparator,
|
||||
new_bytes_expression,
|
||||
)
|
||||
|
||||
txn.execute(sql, (low_pos, high_pos))
|
||||
|
||||
for room_id, new_events, new_bytes in txn.fetchall():
|
||||
self._update_stats_delta_txn(
|
||||
txn,
|
||||
now,
|
||||
"room",
|
||||
room_id,
|
||||
{"total_events": new_events, "total_event_bytes": new_bytes},
|
||||
)
|
||||
|
||||
@@ -1,304 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mock import Mock
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client.v1 import login, room
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class StatsRoomTests(unittest.HomeserverTestCase):
|
||||
|
||||
servlets = [
|
||||
admin.register_servlets_for_client_rest_resource,
|
||||
room.register_servlets,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor, clock, hs):
|
||||
|
||||
self.store = hs.get_datastore()
|
||||
self.handler = self.hs.get_stats_handler()
|
||||
|
||||
def _add_background_updates(self):
|
||||
"""
|
||||
Add the background updates we need to run.
|
||||
"""
|
||||
# Ugh, have to reset this flag
|
||||
self.store._all_done = False
|
||||
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": "populate_stats_process_rooms",
|
||||
"progress_json": "{}",
|
||||
"depends_on": "populate_stats_createtables",
|
||||
},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": "populate_stats_cleanup",
|
||||
"progress_json": "{}",
|
||||
"depends_on": "populate_stats_process_rooms",
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
def test_initial_room(self):
|
||||
"""
|
||||
The background updates will build the table from scratch.
|
||||
"""
|
||||
r = self.get_success(self.store.get_all_room_state())
|
||||
self.assertEqual(len(r), 0)
|
||||
|
||||
# Disable stats
|
||||
self.hs.config.stats_enabled = False
|
||||
self.handler.stats_enabled = False
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1_token = self.login("u1", "pass")
|
||||
|
||||
room_1 = self.helper.create_room_as(u1, tok=u1_token)
|
||||
self.helper.send_state(
|
||||
room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
|
||||
)
|
||||
|
||||
# Stats disabled, shouldn't have done anything
|
||||
r = self.get_success(self.store.get_all_room_state())
|
||||
self.assertEqual(len(r), 0)
|
||||
|
||||
# Enable stats
|
||||
self.hs.config.stats_enabled = True
|
||||
self.handler.stats_enabled = True
|
||||
|
||||
# Do the initial population of the user directory via the background update
|
||||
self._add_background_updates()
|
||||
|
||||
while not self.get_success(self.store.has_completed_background_updates()):
|
||||
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||
|
||||
r = self.get_success(self.store.get_all_room_state())
|
||||
|
||||
self.assertEqual(len(r), 1)
|
||||
self.assertEqual(r[0]["topic"], "foo")
|
||||
|
||||
def test_initial_earliest_token(self):
|
||||
"""
|
||||
Ingestion via notify_new_event will ignore tokens that the background
|
||||
update have already processed.
|
||||
"""
|
||||
self.reactor.advance(86401)
|
||||
|
||||
self.hs.config.stats_enabled = False
|
||||
self.handler.stats_enabled = False
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1_token = self.login("u1", "pass")
|
||||
|
||||
u2 = self.register_user("u2", "pass")
|
||||
u2_token = self.login("u2", "pass")
|
||||
|
||||
u3 = self.register_user("u3", "pass")
|
||||
u3_token = self.login("u3", "pass")
|
||||
|
||||
room_1 = self.helper.create_room_as(u1, tok=u1_token)
|
||||
self.helper.send_state(
|
||||
room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
|
||||
)
|
||||
|
||||
# Begin the ingestion by creating the temp tables. This will also store
|
||||
# the position that the deltas should begin at, once they take over.
|
||||
self.hs.config.stats_enabled = True
|
||||
self.handler.stats_enabled = True
|
||||
self.store._all_done = False
|
||||
self.get_success(self.store.update_stats_stream_pos(None))
|
||||
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
|
||||
)
|
||||
)
|
||||
|
||||
while not self.get_success(self.store.has_completed_background_updates()):
|
||||
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||
|
||||
# Now, before the table is actually ingested, add some more events.
|
||||
self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token)
|
||||
self.helper.join(room=room_1, user=u2, tok=u2_token)
|
||||
|
||||
# Now do the initial ingestion.
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{"update_name": "populate_stats_process_rooms", "progress_json": "{}"},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": "populate_stats_cleanup",
|
||||
"progress_json": "{}",
|
||||
"depends_on": "populate_stats_process_rooms",
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
self.store._all_done = False
|
||||
while not self.get_success(self.store.has_completed_background_updates()):
|
||||
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||
|
||||
self.reactor.advance(86401)
|
||||
|
||||
# Now add some more events, triggering ingestion. Because of the stream
|
||||
# position being set to before the events sent in the middle, a simpler
|
||||
# implementation would reprocess those events, and say there were four
|
||||
# users, not three.
|
||||
self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token)
|
||||
self.helper.join(room=room_1, user=u3, tok=u3_token)
|
||||
|
||||
# Get the deltas! There should be two -- day 1, and day 2.
|
||||
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
|
||||
|
||||
# The oldest has 2 joined members
|
||||
self.assertEqual(r[-1]["joined_members"], 2)
|
||||
|
||||
# The newest has 3
|
||||
self.assertEqual(r[0]["joined_members"], 3)
|
||||
|
||||
def test_incorrect_state_transition(self):
|
||||
"""
|
||||
If the state transition is not one of (JOIN, INVITE, LEAVE, BAN) to
|
||||
(JOIN, INVITE, LEAVE, BAN), an error is raised.
|
||||
"""
|
||||
events = {
|
||||
"a1": {"membership": Membership.LEAVE},
|
||||
"a2": {"membership": "not a real thing"},
|
||||
}
|
||||
|
||||
def get_event(event_id, allow_none=True):
|
||||
m = Mock()
|
||||
m.content = events[event_id]
|
||||
d = defer.Deferred()
|
||||
self.reactor.callLater(0.0, d.callback, m)
|
||||
return d
|
||||
|
||||
def get_received_ts(event_id):
|
||||
return defer.succeed(1)
|
||||
|
||||
self.store.get_received_ts = get_received_ts
|
||||
self.store.get_event = get_event
|
||||
|
||||
deltas = [
|
||||
{
|
||||
"type": EventTypes.Member,
|
||||
"state_key": "some_user",
|
||||
"room_id": "room",
|
||||
"event_id": "a1",
|
||||
"prev_event_id": "a2",
|
||||
"stream_id": 60,
|
||||
}
|
||||
]
|
||||
|
||||
f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
|
||||
self.assertEqual(
|
||||
f.value.args[0], "'not a real thing' is not a valid prev_membership"
|
||||
)
|
||||
|
||||
# And the other way...
|
||||
deltas = [
|
||||
{
|
||||
"type": EventTypes.Member,
|
||||
"state_key": "some_user",
|
||||
"room_id": "room",
|
||||
"event_id": "a2",
|
||||
"prev_event_id": "a1",
|
||||
"stream_id": 100,
|
||||
}
|
||||
]
|
||||
|
||||
f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
|
||||
self.assertEqual(
|
||||
f.value.args[0], "'not a real thing' is not a valid membership"
|
||||
)
|
||||
|
||||
def test_redacted_prev_event(self):
|
||||
"""
|
||||
If the prev_event does not exist, then it is assumed to be a LEAVE.
|
||||
"""
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1_token = self.login("u1", "pass")
|
||||
|
||||
room_1 = self.helper.create_room_as(u1, tok=u1_token)
|
||||
|
||||
# Do the initial population of the user directory via the background update
|
||||
self._add_background_updates()
|
||||
|
||||
while not self.get_success(self.store.has_completed_background_updates()):
|
||||
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||
|
||||
events = {"a1": None, "a2": {"membership": Membership.JOIN}}
|
||||
|
||||
def get_event(event_id, allow_none=True):
|
||||
if events.get(event_id):
|
||||
m = Mock()
|
||||
m.content = events[event_id]
|
||||
else:
|
||||
m = None
|
||||
d = defer.Deferred()
|
||||
self.reactor.callLater(0.0, d.callback, m)
|
||||
return d
|
||||
|
||||
def get_received_ts(event_id):
|
||||
return defer.succeed(1)
|
||||
|
||||
self.store.get_received_ts = get_received_ts
|
||||
self.store.get_event = get_event
|
||||
|
||||
deltas = [
|
||||
{
|
||||
"type": EventTypes.Member,
|
||||
"state_key": "some_user:test",
|
||||
"room_id": room_1,
|
||||
"event_id": "a2",
|
||||
"prev_event_id": "a1",
|
||||
"stream_id": 100,
|
||||
}
|
||||
]
|
||||
|
||||
# Handle our fake deltas, which has a user going from LEAVE -> JOIN.
|
||||
self.get_success(self.handler._handle_deltas(deltas))
|
||||
|
||||
# One delta, with two joined members -- the room creator, and our fake
|
||||
# user.
|
||||
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
|
||||
self.assertEqual(len(r), 1)
|
||||
self.assertEqual(r[0]["joined_members"], 2)
|
||||
Reference in New Issue
Block a user