Compare commits

...

6 Commits

Author SHA1 Message Date
Olivier Wilkinson (reivilibre)
19d8d3fc81 Don't populate empty/null fields in publicRooms.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-07-24 11:29:51 +01:00
Olivier Wilkinson (reivilibre)
69f6a46cb5 Use room_stats and room_state for room directory search
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-07-17 15:50:29 +01:00
Olivier Wilkinson (reivilibre)
8502c668bf Changelog for #5691
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-07-17 09:46:01 +01:00
Olivier Wilkinson (reivilibre)
dc68c2a101 Update state_events and current_state_events upon receipt of a state
event #5690.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-07-17 09:46:01 +01:00
Olivier Wilkinson (reivilibre)
181c1a6072 Don't decrease left_members if the user is joining for the first time.
Fixes #5423

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-07-17 09:46:01 +01:00
Olivier Wilkinson (reivilibre)
20ae4afe7e Create room_stats rows for new rooms. #5624
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-07-17 09:46:01 +01:00
7 changed files with 314 additions and 175 deletions

1
changelog.d/5691.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix improper or missing room_stats updates when handling state events (deltas).

View File

@@ -764,6 +764,10 @@ class PublicRoomList(BaseFederationServlet):
else:
network_tuple = ThirdPartyInstanceID(None, None)
if limit == 0:
# zero is a special value which corresponds to no limit.
limit = None
data = yield self.handler.get_local_public_room_list(
limit, since_token, network_tuple=network_tuple, from_federation=True
)

View File

@@ -17,16 +17,15 @@ import logging
from collections import namedtuple
from six import PY3, iteritems
from six.moves import range
import msgpack
from unpaddedbase64 import decode_base64, encode_base64
from twisted.internet import defer
from twisted.internet.defer import maybeDeferred
from synapse.api.constants import EventTypes, JoinRules
from synapse.types import ThirdPartyInstanceID
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache
@@ -36,7 +35,6 @@ logger = logging.getLogger(__name__)
REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
# This is used to indicate we should only return rooms published to the main list.
EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
@@ -71,6 +69,8 @@ class RoomListHandler(BaseHandler):
This can be (None, None) to indicate the main list, or a particular
appservice and network id to use an appservice specific one.
Setting to None returns all public rooms across all lists.
from_federation (bool): true iff the request comes from the federation
API
"""
if not self.enable_room_list_search:
return defer.succeed({"chunk": [], "total_room_count_estimate": 0})
@@ -132,200 +132,127 @@ class RoomListHandler(BaseHandler):
from_federation (bool): Whether this request originated from a
federating server or a client. Used for room filtering.
timeout (int|None): Amount of seconds to wait for a response before
timing out.
timing out. TODO
"""
if since_token and since_token != "END":
since_token = RoomListNextBatch.from_token(since_token)
pagination_token = None
if since_token and since_token != "END": # todo ought we support END and START?
if since_token[0] in ("+", "-"):
forwards = since_token[0] == "+"
pagination_token = since_token[1:]
else:
raise SyntaxError("shrug ") # TODO
else:
since_token = None
forwards = True
rooms_to_order_value = {}
rooms_to_num_joined = {}
# we request one more than wanted to see if there are more pages to come
probing_limit = limit + 1 if limit is not None else None
newly_visible = []
newly_unpublished = []
if since_token:
stream_token = since_token.stream_ordering
current_public_id = yield self.store.get_current_public_room_stream_id()
public_room_stream_id = since_token.public_room_stream_id
newly_visible, newly_unpublished = yield self.store.get_public_room_changes(
public_room_stream_id, current_public_id, network_tuple=network_tuple
)
else:
stream_token = yield self.store.get_room_max_stream_ordering()
public_room_stream_id = yield self.store.get_current_public_room_stream_id()
room_ids = yield self.store.get_public_room_ids_at_stream_id(
public_room_stream_id, network_tuple=network_tuple
results = yield self.store.get_largest_public_rooms(
network_tuple, search_filter, probing_limit, pagination_token, forwards
)
# We want to return rooms in a particular order: the number of joined
# users. We then arbitrarily use the room_id as a tie breaker.
def build_room_entry(room):
entry = {
"room_id": room["room_id"],
"name": room["name"],
"topic": room["topic"],
"canonical_alias": room["canonical_alias"],
"num_joined_members": room["joined_members"],
"avatar_url": room["avatar"],
"world_readable": room["history_visibility"] == "world_readable",
}
@defer.inlineCallbacks
def get_order_for_room(room_id):
# Most of the rooms won't have changed between the since token and
# now (especially if the since token is "now"). So, we can ask what
# the current users are in a room (that will hit a cache) and then
# check if the room has changed since the since token. (We have to
# do it in that order to avoid races).
# If things have changed then fall back to getting the current state
# at the since token.
joined_users = yield self.store.get_users_in_room(room_id)
if self.store.has_room_changed_since(room_id, stream_token):
latest_event_ids = yield self.store.get_forward_extremeties_for_room(
room_id, stream_token
)
# Filter out Nones rather omit the field altogether
return {
k: v for k, v in entry.items() if v is not None
}
if not latest_event_ids:
return
joined_users = yield self.state_handler.get_current_users_in_room(
room_id, latest_event_ids
)
num_joined_users = len(joined_users)
rooms_to_num_joined[room_id] = num_joined_users
if num_joined_users == 0:
return
# We want larger rooms to be first, hence negating num_joined_users
rooms_to_order_value[room_id] = (-num_joined_users, room_id)
logger.info(
"Getting ordering for %i rooms since %s", len(room_ids), stream_token
)
yield concurrently_execute(get_order_for_room, room_ids, 10)
sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1])
sorted_rooms = [room_id for room_id, _ in sorted_entries]
# `sorted_rooms` should now be a list of all public room ids that is
# stable across pagination. Therefore, we can use indices into this
# list as our pagination tokens.
# Filter out rooms that we don't want to return
rooms_to_scan = [
r
for r in sorted_rooms
if r not in newly_unpublished and rooms_to_num_joined[r] > 0
results = [
build_room_entry(r) for r in results
]
total_room_count = len(rooms_to_scan)
if since_token:
# Filter out rooms we've already returned previously
# `since_token.current_limit` is the index of the last room we
# sent down, so we exclude it and everything before/after it.
if since_token.direction_is_forward:
rooms_to_scan = rooms_to_scan[since_token.current_limit + 1 :]
response = {}
num_results = len(results)
if num_results > 0:
final_room_id = results[-1]["room_id"]
initial_room_id = results[0]["room_id"]
if limit is not None:
more_to_come = num_results == probing_limit
results = results[0:limit]
else:
rooms_to_scan = rooms_to_scan[: since_token.current_limit]
rooms_to_scan.reverse()
more_to_come = False
logger.info("After sorting and filtering, %i rooms remain", len(rooms_to_scan))
if not forwards or (forwards and more_to_come):
response["next_batch"] = "+%s" % (final_room_id,)
# _append_room_entry_to_chunk will append to chunk but will stop if
# len(chunk) > limit
#
# Normally we will generate enough results on the first iteration here,
# but if there is a search filter, _append_room_entry_to_chunk may
# filter some results out, in which case we loop again.
#
# We don't want to scan over the entire range either as that
# would potentially waste a lot of work.
#
# XXX if there is no limit, we may end up DoSing the server with
# calls to get_current_state_ids for every single room on the
# server. Surely we should cap this somehow?
#
if limit:
step = limit + 1
else:
# step cannot be zero
step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
if since_token and (forwards or (not forwards and more_to_come)):
if num_results > 0:
response["prev_batch"] = "-%s" % (initial_room_id,)
else:
response["prev_batch"] = "-%s" % (pagination_token,)
chunk = []
for i in range(0, len(rooms_to_scan), step):
if timeout and self.clock.time() > timeout:
raise Exception("Timed out searching room directory")
if from_federation:
# only show rooms with m.federate=True or absent (default is True)
batch = rooms_to_scan[i : i + step]
logger.info("Processing %i rooms for result", len(batch))
yield concurrently_execute(
lambda r: self._append_room_entry_to_chunk(
r,
rooms_to_num_joined[r],
chunk,
limit,
search_filter,
from_federation=from_federation,
),
batch,
5,
# get rooms' state
room_state_ids = yield defer.gatherResults(
[
maybeDeferred(self.store.get_current_state_ids, room["room_id"])
for room in results
],
consumeErrors=True,
)
logger.info("Now %i rooms in result", len(chunk))
if len(chunk) >= limit + 1:
break
chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
# get rooms' creation state events' IDs
room_creation_event_ids = {
room["room_id"]: event_ids.get((EventTypes.Create, ""))
for (room, event_ids) in zip(results, room_state_ids)
}
# Work out the new limit of the batch for pagination, or None if we
# know there are no more results that would be returned.
# i.e., [since_token.current_limit..new_limit] is the batch of rooms
# we've returned (or the reverse if we paginated backwards)
# We tried to pull out limit + 1 rooms above, so if we have <= limit
# then we know there are no more results to return
new_limit = None
if chunk and (not limit or len(chunk) > limit):
# get rooms' creation state events
creation_events_by_id = yield self.store.get_events(
room_creation_event_ids.values()
)
if not since_token or since_token.direction_is_forward:
if limit:
chunk = chunk[:limit]
last_room_id = chunk[-1]["room_id"]
else:
if limit:
chunk = chunk[-limit:]
last_room_id = chunk[0]["room_id"]
# associate them with the room IDs
room_creation_events = {
room_id: creation_events_by_id[event_id]
for (room_id, event_id) in room_creation_event_ids.items()
}
new_limit = sorted_rooms.index(last_room_id)
# now filter out rooms with m.federate: False in their create event
results = [
room
for room in results
if room_creation_events[room["room_id"]].content.get("m.federate", True)
]
results = {"chunk": chunk, "total_room_count_estimate": total_room_count}
for room in results:
# populate search result entries with additional fields, namely
# 'aliases' and 'guest_can_join'
room_id = room["room_id"]
if since_token:
results["new_rooms"] = bool(newly_visible)
aliases = yield self.store.get_aliases_for_room(room_id)
if aliases:
room["aliases"] = aliases
if not since_token or since_token.direction_is_forward:
if new_limit is not None:
results["next_batch"] = RoomListNextBatch(
stream_ordering=stream_token,
public_room_stream_id=public_room_stream_id,
current_limit=new_limit,
direction_is_forward=True,
).to_token()
state_ids = yield self.store.get_current_state_ids(room_id)
guests_can_join = False
guest_access_state_id = state_ids.get((EventTypes.GuestAccess, ""))
if guest_access_state_id is not None:
guest_access = yield self.store.get_event(guest_access_state_id)
if guest_access is not None:
if guest_access.content.get("guest_access") == "can_join":
guests_can_join = True
room["guest_can_join"] = guests_can_join
if since_token:
results["prev_batch"] = since_token.copy_and_replace(
direction_is_forward=False,
current_limit=since_token.current_limit + 1,
).to_token()
else:
if new_limit is not None:
results["prev_batch"] = RoomListNextBatch(
stream_ordering=stream_token,
public_room_stream_id=public_room_stream_id,
current_limit=new_limit,
direction_is_forward=False,
).to_token()
response["chunk"] = results
if since_token:
results["next_batch"] = since_token.copy_and_replace(
direction_is_forward=True,
current_limit=since_token.current_limit - 1,
).to_token()
# TODO for federation, we currently don't remove m.federate=False rooms
# from the total room count estimate.
response["total_room_count_estimate"] = yield self.store.count_public_rooms()
defer.returnValue(results)
defer.returnValue(response)
@defer.inlineCallbacks
def _append_room_entry_to_chunk(
@@ -560,7 +487,6 @@ class RoomListNextBatch(
),
)
):
KEY_DICT = {
"stream_ordering": "s",
"public_room_stream_id": "p",

View File

@@ -148,26 +148,44 @@ class StatsHandler(StateDeltasHandler):
# quantise time to the nearest bucket
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
yield self.store.update_stats_delta(
now, "room", room_id, "state_events", +1
)
if prev_event_id is None:
# this state event doesn't overwrite another,
# so it is a new effective/current state event
yield self.store.update_stats_delta(
now, "room", room_id, "current_state_events", +1
)
if typ == EventTypes.Member:
# we could use _get_key_change here but it's a bit inefficient
# given we're not testing for a specific result; might as well
# just grab the prev_membership and membership strings and
# compare them.
prev_event_content = {}
# We take None rather than leave as a previous membership
# in the absence of a previous event because we do not want to
# reduce the leave count when a new-to-the-room user joins.
prev_membership = None
if prev_event_id is not None:
prev_event = yield self.store.get_event(
prev_event_id, allow_none=True
)
if prev_event:
prev_event_content = prev_event.content
prev_membership = prev_event_content.get(
"membership", Membership.LEAVE
)
membership = event_content.get("membership", Membership.LEAVE)
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
if prev_membership == membership:
continue
if prev_membership == Membership.JOIN:
if prev_membership is None:
logger.debug("No previous membership for this user.")
elif prev_membership == Membership.JOIN:
yield self.store.update_stats_delta(
now, "room", room_id, "joined_members", -1
)
@@ -246,6 +264,23 @@ class StatsHandler(StateDeltasHandler):
},
)
# Also add room stats with just the one state event
# (the room creation state event)
yield self.store.update_stats(
"room",
room_id,
now,
{
"bucket_size": self.stats_bucket_size,
"current_state_events": 1,
"joined_members": 0,
"invited_members": 0,
"left_members": 0,
"banned_members": 0,
"state_events": 1,
},
)
elif typ == EventTypes.JoinRules:
yield self.store.update_room_state(
room_id, {"join_rules": event_content.get("join_rule")}

View File

@@ -332,6 +332,10 @@ class PublicRoomListRestServlet(TransactionRestServlet):
limit = parse_integer(request, "limit", 0)
since_token = parse_string(request, "since", None)
if limit == 0:
# zero is a special value which corresponds to no limit.
limit = None
handler = self.hs.get_room_list_handler()
if server:
data = yield handler.get_remote_public_room_list(
@@ -369,6 +373,10 @@ class PublicRoomListRestServlet(TransactionRestServlet):
else:
network_tuple = ThirdPartyInstanceID.from_string(third_party_instance_id)
if limit == 0:
# zero is a special value which corresponds to no limit.
limit = None
handler = self.hs.get_room_list_handler()
if server:
data = yield handler.get_remote_public_room_list(

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -161,6 +162,167 @@ class RoomWorkerStore(SQLBaseStore):
"get_public_room_changes", get_public_room_changes_txn
)
def count_public_rooms(self):
"""
Counts the number of public rooms as tracked in the room_stats and room_state
table.
A public room is one who has is_public set
AND is publicly-joinable and/or world-readable.
Returns:
number of public rooms on this homeserver's room directory
"""
def _count_public_rooms_txn(txn):
sql = """
SELECT COUNT(*)
FROM room_stats
JOIN room_state USING (room_id)
JOIN rooms USING (room_id)
WHERE
is_public
AND (
join_rules = 'public'
OR history_visibility = 'world_readable'
)
"""
txn.execute(sql)
return txn.fetchone()[0]
return self.runInteraction("count_public_rooms", _count_public_rooms_txn)
@defer.inlineCallbacks
def get_largest_public_rooms(
self, network_tuple, search_filter, limit, pagination_token, forwards
):
"""TODO doc this
Args:
network_tuple (ThirdPartyInstanceID|None):
search_filter (dict|None):
limit (int|None): Maxmimum number of rows to return, unlimited otherwise.
pagination_token (str|None): if present, a room ID which is to be
the (first/last) included in the results.
forwards (bool): true iff going forwards, going backwards otherwise
Returns:
Rooms in order: biggest number of joined users first.
We then arbitrarily use the room_id as a tie breaker.
"""
# TODO probably want to use ts_… on Postgres?
sql = """
SELECT
room_id, name, topic, canonical_alias, joined_members,
avatar, history_visibility, joined_members
FROM
room_stats
JOIN room_state USING (room_id)
JOIN rooms USING (room_id)
"""
query_args = []
if network_tuple:
sql += """
LEFT JOIN appservice_room_list arl USING (room_id)
"""
sql += """
WHERE
is_public
AND (
join_rules = 'public'
OR history_visibility = 'world_readable'
)
"""
if pagination_token:
pt_joined = yield self._simple_select_one_onecol(
table="room_stats",
keyvalues={"room_id": pagination_token},
retcol="joined_members",
desc="get_largest_public_rooms",
)
if forwards:
sql += """
AND (
(joined_members < ?)
OR (joined_members = ? AND room_id >= ?)
)
"""
else:
sql += """
AND (
(joined_members > ?)
OR (joined_members = ? AND room_id <= ?)
)
"""
query_args += [pt_joined, pt_joined, pagination_token]
if search_filter and search_filter.get("generic_search_term", None):
search_term = "%" + search_filter["generic_search_term"] + "%"
sql += """
AND (
name LIKE ?
OR topic LIKE ?
OR canonical_alias LIKE ?
)
"""
query_args += [search_term, search_term, search_term]
if network_tuple:
sql += "AND ("
if network_tuple.appservice_id:
sql += "appservice_id = ? AND "
query_args.append(network_tuple.appservice_id)
else:
sql += "appservice_id IS NULL AND "
if network_tuple.network_id:
sql += "network_id = ?)"
query_args.append(network_tuple.network_id)
else:
sql += "network_id IS NULL)"
if forwards:
sql += """
ORDER BY
joined_members DESC, room_id ASC
"""
else:
sql += """
ORDER BY
joined_members ASC, room_id DESC
"""
if limit is not None:
# be cautious about SQL injection
assert isinstance(limit, int)
sql += """
LIMIT %d
""" % (
limit,
)
def _get_largest_public_rooms_txn(txn):
txn.execute(sql, query_args)
results = self.cursor_to_dict(txn)
if not forwards:
results.reverse()
return results
ret_val = yield self.runInteraction(
"get_largest_public_rooms", _get_largest_public_rooms_txn
)
defer.returnValue(ret_val)
@cached(max_entries=10000)
def is_room_blocked(self, room_id):
return self._simple_select_one_onecol(

View File

@@ -68,6 +68,9 @@ class StatsStore(StateDeltasStore):
yield self._end_background_update("populate_stats_createtables")
defer.returnValue(1)
# TODO dev only
yield self.delete_all_stats()
# Get all the rooms that we want to process.
def _make_staging_area(txn):
# Create the temporary tables