mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Compare commits
6 Commits
madlittlem
...
rei/room_d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
19d8d3fc81 | ||
|
|
69f6a46cb5 | ||
|
|
8502c668bf | ||
|
|
dc68c2a101 | ||
|
|
181c1a6072 | ||
|
|
20ae4afe7e |
1
changelog.d/5691.bugfix
Normal file
1
changelog.d/5691.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix improper or missing room_stats updates when handling state events (deltas).
|
||||
@@ -764,6 +764,10 @@ class PublicRoomList(BaseFederationServlet):
|
||||
else:
|
||||
network_tuple = ThirdPartyInstanceID(None, None)
|
||||
|
||||
if limit == 0:
|
||||
# zero is a special value which corresponds to no limit.
|
||||
limit = None
|
||||
|
||||
data = yield self.handler.get_local_public_room_list(
|
||||
limit, since_token, network_tuple=network_tuple, from_federation=True
|
||||
)
|
||||
|
||||
@@ -17,16 +17,15 @@ import logging
|
||||
from collections import namedtuple
|
||||
|
||||
from six import PY3, iteritems
|
||||
from six.moves import range
|
||||
|
||||
import msgpack
|
||||
from unpaddedbase64 import decode_base64, encode_base64
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import maybeDeferred
|
||||
|
||||
from synapse.api.constants import EventTypes, JoinRules
|
||||
from synapse.types import ThirdPartyInstanceID
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
|
||||
@@ -36,7 +35,6 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
|
||||
|
||||
|
||||
# This is used to indicate we should only return rooms published to the main list.
|
||||
EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
|
||||
|
||||
@@ -71,6 +69,8 @@ class RoomListHandler(BaseHandler):
|
||||
This can be (None, None) to indicate the main list, or a particular
|
||||
appservice and network id to use an appservice specific one.
|
||||
Setting to None returns all public rooms across all lists.
|
||||
from_federation (bool): true iff the request comes from the federation
|
||||
API
|
||||
"""
|
||||
if not self.enable_room_list_search:
|
||||
return defer.succeed({"chunk": [], "total_room_count_estimate": 0})
|
||||
@@ -132,200 +132,127 @@ class RoomListHandler(BaseHandler):
|
||||
from_federation (bool): Whether this request originated from a
|
||||
federating server or a client. Used for room filtering.
|
||||
timeout (int|None): Amount of seconds to wait for a response before
|
||||
timing out.
|
||||
timing out. TODO
|
||||
"""
|
||||
if since_token and since_token != "END":
|
||||
since_token = RoomListNextBatch.from_token(since_token)
|
||||
pagination_token = None
|
||||
if since_token and since_token != "END": # todo ought we support END and START?
|
||||
if since_token[0] in ("+", "-"):
|
||||
forwards = since_token[0] == "+"
|
||||
pagination_token = since_token[1:]
|
||||
else:
|
||||
raise SyntaxError("shrug ") # TODO
|
||||
else:
|
||||
since_token = None
|
||||
forwards = True
|
||||
|
||||
rooms_to_order_value = {}
|
||||
rooms_to_num_joined = {}
|
||||
# we request one more than wanted to see if there are more pages to come
|
||||
probing_limit = limit + 1 if limit is not None else None
|
||||
|
||||
newly_visible = []
|
||||
newly_unpublished = []
|
||||
if since_token:
|
||||
stream_token = since_token.stream_ordering
|
||||
current_public_id = yield self.store.get_current_public_room_stream_id()
|
||||
public_room_stream_id = since_token.public_room_stream_id
|
||||
newly_visible, newly_unpublished = yield self.store.get_public_room_changes(
|
||||
public_room_stream_id, current_public_id, network_tuple=network_tuple
|
||||
)
|
||||
else:
|
||||
stream_token = yield self.store.get_room_max_stream_ordering()
|
||||
public_room_stream_id = yield self.store.get_current_public_room_stream_id()
|
||||
|
||||
room_ids = yield self.store.get_public_room_ids_at_stream_id(
|
||||
public_room_stream_id, network_tuple=network_tuple
|
||||
results = yield self.store.get_largest_public_rooms(
|
||||
network_tuple, search_filter, probing_limit, pagination_token, forwards
|
||||
)
|
||||
|
||||
# We want to return rooms in a particular order: the number of joined
|
||||
# users. We then arbitrarily use the room_id as a tie breaker.
|
||||
def build_room_entry(room):
|
||||
entry = {
|
||||
"room_id": room["room_id"],
|
||||
"name": room["name"],
|
||||
"topic": room["topic"],
|
||||
"canonical_alias": room["canonical_alias"],
|
||||
"num_joined_members": room["joined_members"],
|
||||
"avatar_url": room["avatar"],
|
||||
"world_readable": room["history_visibility"] == "world_readable",
|
||||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_order_for_room(room_id):
|
||||
# Most of the rooms won't have changed between the since token and
|
||||
# now (especially if the since token is "now"). So, we can ask what
|
||||
# the current users are in a room (that will hit a cache) and then
|
||||
# check if the room has changed since the since token. (We have to
|
||||
# do it in that order to avoid races).
|
||||
# If things have changed then fall back to getting the current state
|
||||
# at the since token.
|
||||
joined_users = yield self.store.get_users_in_room(room_id)
|
||||
if self.store.has_room_changed_since(room_id, stream_token):
|
||||
latest_event_ids = yield self.store.get_forward_extremeties_for_room(
|
||||
room_id, stream_token
|
||||
)
|
||||
# Filter out Nones – rather omit the field altogether
|
||||
return {
|
||||
k: v for k, v in entry.items() if v is not None
|
||||
}
|
||||
|
||||
if not latest_event_ids:
|
||||
return
|
||||
|
||||
joined_users = yield self.state_handler.get_current_users_in_room(
|
||||
room_id, latest_event_ids
|
||||
)
|
||||
|
||||
num_joined_users = len(joined_users)
|
||||
rooms_to_num_joined[room_id] = num_joined_users
|
||||
|
||||
if num_joined_users == 0:
|
||||
return
|
||||
|
||||
# We want larger rooms to be first, hence negating num_joined_users
|
||||
rooms_to_order_value[room_id] = (-num_joined_users, room_id)
|
||||
|
||||
logger.info(
|
||||
"Getting ordering for %i rooms since %s", len(room_ids), stream_token
|
||||
)
|
||||
yield concurrently_execute(get_order_for_room, room_ids, 10)
|
||||
|
||||
sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1])
|
||||
sorted_rooms = [room_id for room_id, _ in sorted_entries]
|
||||
|
||||
# `sorted_rooms` should now be a list of all public room ids that is
|
||||
# stable across pagination. Therefore, we can use indices into this
|
||||
# list as our pagination tokens.
|
||||
|
||||
# Filter out rooms that we don't want to return
|
||||
rooms_to_scan = [
|
||||
r
|
||||
for r in sorted_rooms
|
||||
if r not in newly_unpublished and rooms_to_num_joined[r] > 0
|
||||
results = [
|
||||
build_room_entry(r) for r in results
|
||||
]
|
||||
|
||||
total_room_count = len(rooms_to_scan)
|
||||
|
||||
if since_token:
|
||||
# Filter out rooms we've already returned previously
|
||||
# `since_token.current_limit` is the index of the last room we
|
||||
# sent down, so we exclude it and everything before/after it.
|
||||
if since_token.direction_is_forward:
|
||||
rooms_to_scan = rooms_to_scan[since_token.current_limit + 1 :]
|
||||
response = {}
|
||||
num_results = len(results)
|
||||
if num_results > 0:
|
||||
final_room_id = results[-1]["room_id"]
|
||||
initial_room_id = results[0]["room_id"]
|
||||
if limit is not None:
|
||||
more_to_come = num_results == probing_limit
|
||||
results = results[0:limit]
|
||||
else:
|
||||
rooms_to_scan = rooms_to_scan[: since_token.current_limit]
|
||||
rooms_to_scan.reverse()
|
||||
more_to_come = False
|
||||
|
||||
logger.info("After sorting and filtering, %i rooms remain", len(rooms_to_scan))
|
||||
if not forwards or (forwards and more_to_come):
|
||||
response["next_batch"] = "+%s" % (final_room_id,)
|
||||
|
||||
# _append_room_entry_to_chunk will append to chunk but will stop if
|
||||
# len(chunk) > limit
|
||||
#
|
||||
# Normally we will generate enough results on the first iteration here,
|
||||
# but if there is a search filter, _append_room_entry_to_chunk may
|
||||
# filter some results out, in which case we loop again.
|
||||
#
|
||||
# We don't want to scan over the entire range either as that
|
||||
# would potentially waste a lot of work.
|
||||
#
|
||||
# XXX if there is no limit, we may end up DoSing the server with
|
||||
# calls to get_current_state_ids for every single room on the
|
||||
# server. Surely we should cap this somehow?
|
||||
#
|
||||
if limit:
|
||||
step = limit + 1
|
||||
else:
|
||||
# step cannot be zero
|
||||
step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
|
||||
if since_token and (forwards or (not forwards and more_to_come)):
|
||||
if num_results > 0:
|
||||
response["prev_batch"] = "-%s" % (initial_room_id,)
|
||||
else:
|
||||
response["prev_batch"] = "-%s" % (pagination_token,)
|
||||
|
||||
chunk = []
|
||||
for i in range(0, len(rooms_to_scan), step):
|
||||
if timeout and self.clock.time() > timeout:
|
||||
raise Exception("Timed out searching room directory")
|
||||
if from_federation:
|
||||
# only show rooms with m.federate=True or absent (default is True)
|
||||
|
||||
batch = rooms_to_scan[i : i + step]
|
||||
logger.info("Processing %i rooms for result", len(batch))
|
||||
yield concurrently_execute(
|
||||
lambda r: self._append_room_entry_to_chunk(
|
||||
r,
|
||||
rooms_to_num_joined[r],
|
||||
chunk,
|
||||
limit,
|
||||
search_filter,
|
||||
from_federation=from_federation,
|
||||
),
|
||||
batch,
|
||||
5,
|
||||
# get rooms' state
|
||||
room_state_ids = yield defer.gatherResults(
|
||||
[
|
||||
maybeDeferred(self.store.get_current_state_ids, room["room_id"])
|
||||
for room in results
|
||||
],
|
||||
consumeErrors=True,
|
||||
)
|
||||
logger.info("Now %i rooms in result", len(chunk))
|
||||
if len(chunk) >= limit + 1:
|
||||
break
|
||||
|
||||
chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
|
||||
# get rooms' creation state events' IDs
|
||||
room_creation_event_ids = {
|
||||
room["room_id"]: event_ids.get((EventTypes.Create, ""))
|
||||
for (room, event_ids) in zip(results, room_state_ids)
|
||||
}
|
||||
|
||||
# Work out the new limit of the batch for pagination, or None if we
|
||||
# know there are no more results that would be returned.
|
||||
# i.e., [since_token.current_limit..new_limit] is the batch of rooms
|
||||
# we've returned (or the reverse if we paginated backwards)
|
||||
# We tried to pull out limit + 1 rooms above, so if we have <= limit
|
||||
# then we know there are no more results to return
|
||||
new_limit = None
|
||||
if chunk and (not limit or len(chunk) > limit):
|
||||
# get rooms' creation state events
|
||||
creation_events_by_id = yield self.store.get_events(
|
||||
room_creation_event_ids.values()
|
||||
)
|
||||
|
||||
if not since_token or since_token.direction_is_forward:
|
||||
if limit:
|
||||
chunk = chunk[:limit]
|
||||
last_room_id = chunk[-1]["room_id"]
|
||||
else:
|
||||
if limit:
|
||||
chunk = chunk[-limit:]
|
||||
last_room_id = chunk[0]["room_id"]
|
||||
# associate them with the room IDs
|
||||
room_creation_events = {
|
||||
room_id: creation_events_by_id[event_id]
|
||||
for (room_id, event_id) in room_creation_event_ids.items()
|
||||
}
|
||||
|
||||
new_limit = sorted_rooms.index(last_room_id)
|
||||
# now filter out rooms with m.federate: False in their create event
|
||||
results = [
|
||||
room
|
||||
for room in results
|
||||
if room_creation_events[room["room_id"]].content.get("m.federate", True)
|
||||
]
|
||||
|
||||
results = {"chunk": chunk, "total_room_count_estimate": total_room_count}
|
||||
for room in results:
|
||||
# populate search result entries with additional fields, namely
|
||||
# 'aliases' and 'guest_can_join'
|
||||
room_id = room["room_id"]
|
||||
|
||||
if since_token:
|
||||
results["new_rooms"] = bool(newly_visible)
|
||||
aliases = yield self.store.get_aliases_for_room(room_id)
|
||||
if aliases:
|
||||
room["aliases"] = aliases
|
||||
|
||||
if not since_token or since_token.direction_is_forward:
|
||||
if new_limit is not None:
|
||||
results["next_batch"] = RoomListNextBatch(
|
||||
stream_ordering=stream_token,
|
||||
public_room_stream_id=public_room_stream_id,
|
||||
current_limit=new_limit,
|
||||
direction_is_forward=True,
|
||||
).to_token()
|
||||
state_ids = yield self.store.get_current_state_ids(room_id)
|
||||
guests_can_join = False
|
||||
guest_access_state_id = state_ids.get((EventTypes.GuestAccess, ""))
|
||||
if guest_access_state_id is not None:
|
||||
guest_access = yield self.store.get_event(guest_access_state_id)
|
||||
if guest_access is not None:
|
||||
if guest_access.content.get("guest_access") == "can_join":
|
||||
guests_can_join = True
|
||||
room["guest_can_join"] = guests_can_join
|
||||
|
||||
if since_token:
|
||||
results["prev_batch"] = since_token.copy_and_replace(
|
||||
direction_is_forward=False,
|
||||
current_limit=since_token.current_limit + 1,
|
||||
).to_token()
|
||||
else:
|
||||
if new_limit is not None:
|
||||
results["prev_batch"] = RoomListNextBatch(
|
||||
stream_ordering=stream_token,
|
||||
public_room_stream_id=public_room_stream_id,
|
||||
current_limit=new_limit,
|
||||
direction_is_forward=False,
|
||||
).to_token()
|
||||
response["chunk"] = results
|
||||
|
||||
if since_token:
|
||||
results["next_batch"] = since_token.copy_and_replace(
|
||||
direction_is_forward=True,
|
||||
current_limit=since_token.current_limit - 1,
|
||||
).to_token()
|
||||
# TODO for federation, we currently don't remove m.federate=False rooms
|
||||
# from the total room count estimate.
|
||||
response["total_room_count_estimate"] = yield self.store.count_public_rooms()
|
||||
|
||||
defer.returnValue(results)
|
||||
defer.returnValue(response)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _append_room_entry_to_chunk(
|
||||
@@ -560,7 +487,6 @@ class RoomListNextBatch(
|
||||
),
|
||||
)
|
||||
):
|
||||
|
||||
KEY_DICT = {
|
||||
"stream_ordering": "s",
|
||||
"public_room_stream_id": "p",
|
||||
|
||||
@@ -148,26 +148,44 @@ class StatsHandler(StateDeltasHandler):
|
||||
# quantise time to the nearest bucket
|
||||
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
|
||||
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "state_events", +1
|
||||
)
|
||||
|
||||
if prev_event_id is None:
|
||||
# this state event doesn't overwrite another,
|
||||
# so it is a new effective/current state event
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "current_state_events", +1
|
||||
)
|
||||
|
||||
if typ == EventTypes.Member:
|
||||
# we could use _get_key_change here but it's a bit inefficient
|
||||
# given we're not testing for a specific result; might as well
|
||||
# just grab the prev_membership and membership strings and
|
||||
# compare them.
|
||||
prev_event_content = {}
|
||||
# We take None rather than leave as a previous membership
|
||||
# in the absence of a previous event because we do not want to
|
||||
# reduce the leave count when a new-to-the-room user joins.
|
||||
prev_membership = None
|
||||
if prev_event_id is not None:
|
||||
prev_event = yield self.store.get_event(
|
||||
prev_event_id, allow_none=True
|
||||
)
|
||||
if prev_event:
|
||||
prev_event_content = prev_event.content
|
||||
prev_membership = prev_event_content.get(
|
||||
"membership", Membership.LEAVE
|
||||
)
|
||||
|
||||
membership = event_content.get("membership", Membership.LEAVE)
|
||||
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
|
||||
|
||||
if prev_membership == membership:
|
||||
continue
|
||||
|
||||
if prev_membership == Membership.JOIN:
|
||||
if prev_membership is None:
|
||||
logger.debug("No previous membership for this user.")
|
||||
elif prev_membership == Membership.JOIN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "joined_members", -1
|
||||
)
|
||||
@@ -246,6 +264,23 @@ class StatsHandler(StateDeltasHandler):
|
||||
},
|
||||
)
|
||||
|
||||
# Also add room stats with just the one state event
|
||||
# (the room creation state event)
|
||||
yield self.store.update_stats(
|
||||
"room",
|
||||
room_id,
|
||||
now,
|
||||
{
|
||||
"bucket_size": self.stats_bucket_size,
|
||||
"current_state_events": 1,
|
||||
"joined_members": 0,
|
||||
"invited_members": 0,
|
||||
"left_members": 0,
|
||||
"banned_members": 0,
|
||||
"state_events": 1,
|
||||
},
|
||||
)
|
||||
|
||||
elif typ == EventTypes.JoinRules:
|
||||
yield self.store.update_room_state(
|
||||
room_id, {"join_rules": event_content.get("join_rule")}
|
||||
|
||||
@@ -332,6 +332,10 @@ class PublicRoomListRestServlet(TransactionRestServlet):
|
||||
limit = parse_integer(request, "limit", 0)
|
||||
since_token = parse_string(request, "since", None)
|
||||
|
||||
if limit == 0:
|
||||
# zero is a special value which corresponds to no limit.
|
||||
limit = None
|
||||
|
||||
handler = self.hs.get_room_list_handler()
|
||||
if server:
|
||||
data = yield handler.get_remote_public_room_list(
|
||||
@@ -369,6 +373,10 @@ class PublicRoomListRestServlet(TransactionRestServlet):
|
||||
else:
|
||||
network_tuple = ThirdPartyInstanceID.from_string(third_party_instance_id)
|
||||
|
||||
if limit == 0:
|
||||
# zero is a special value which corresponds to no limit.
|
||||
limit = None
|
||||
|
||||
handler = self.hs.get_room_list_handler()
|
||||
if server:
|
||||
data = yield handler.get_remote_public_room_list(
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -161,6 +162,167 @@ class RoomWorkerStore(SQLBaseStore):
|
||||
"get_public_room_changes", get_public_room_changes_txn
|
||||
)
|
||||
|
||||
def count_public_rooms(self):
|
||||
"""
|
||||
Counts the number of public rooms as tracked in the room_stats and room_state
|
||||
table.
|
||||
A public room is one who has is_public set
|
||||
AND is publicly-joinable and/or world-readable.
|
||||
Returns:
|
||||
number of public rooms on this homeserver's room directory
|
||||
|
||||
"""
|
||||
|
||||
def _count_public_rooms_txn(txn):
|
||||
sql = """
|
||||
SELECT COUNT(*)
|
||||
FROM room_stats
|
||||
JOIN room_state USING (room_id)
|
||||
JOIN rooms USING (room_id)
|
||||
WHERE
|
||||
is_public
|
||||
AND (
|
||||
join_rules = 'public'
|
||||
OR history_visibility = 'world_readable'
|
||||
)
|
||||
"""
|
||||
txn.execute(sql)
|
||||
return txn.fetchone()[0]
|
||||
|
||||
return self.runInteraction("count_public_rooms", _count_public_rooms_txn)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_largest_public_rooms(
|
||||
self, network_tuple, search_filter, limit, pagination_token, forwards
|
||||
):
|
||||
"""TODO doc this
|
||||
|
||||
Args:
|
||||
network_tuple (ThirdPartyInstanceID|None):
|
||||
search_filter (dict|None):
|
||||
limit (int|None): Maxmimum number of rows to return, unlimited otherwise.
|
||||
pagination_token (str|None): if present, a room ID which is to be
|
||||
the (first/last) included in the results.
|
||||
forwards (bool): true iff going forwards, going backwards otherwise
|
||||
|
||||
Returns:
|
||||
Rooms in order: biggest number of joined users first.
|
||||
We then arbitrarily use the room_id as a tie breaker.
|
||||
|
||||
"""
|
||||
|
||||
# TODO probably want to use ts_… on Postgres?
|
||||
|
||||
sql = """
|
||||
SELECT
|
||||
room_id, name, topic, canonical_alias, joined_members,
|
||||
avatar, history_visibility, joined_members
|
||||
FROM
|
||||
room_stats
|
||||
JOIN room_state USING (room_id)
|
||||
JOIN rooms USING (room_id)
|
||||
"""
|
||||
query_args = []
|
||||
|
||||
if network_tuple:
|
||||
sql += """
|
||||
LEFT JOIN appservice_room_list arl USING (room_id)
|
||||
"""
|
||||
|
||||
sql += """
|
||||
WHERE
|
||||
is_public
|
||||
AND (
|
||||
join_rules = 'public'
|
||||
OR history_visibility = 'world_readable'
|
||||
)
|
||||
"""
|
||||
|
||||
if pagination_token:
|
||||
pt_joined = yield self._simple_select_one_onecol(
|
||||
table="room_stats",
|
||||
keyvalues={"room_id": pagination_token},
|
||||
retcol="joined_members",
|
||||
desc="get_largest_public_rooms",
|
||||
)
|
||||
|
||||
if forwards:
|
||||
sql += """
|
||||
AND (
|
||||
(joined_members < ?)
|
||||
OR (joined_members = ? AND room_id >= ?)
|
||||
)
|
||||
"""
|
||||
else:
|
||||
sql += """
|
||||
AND (
|
||||
(joined_members > ?)
|
||||
OR (joined_members = ? AND room_id <= ?)
|
||||
)
|
||||
"""
|
||||
query_args += [pt_joined, pt_joined, pagination_token]
|
||||
|
||||
if search_filter and search_filter.get("generic_search_term", None):
|
||||
search_term = "%" + search_filter["generic_search_term"] + "%"
|
||||
sql += """
|
||||
AND (
|
||||
name LIKE ?
|
||||
OR topic LIKE ?
|
||||
OR canonical_alias LIKE ?
|
||||
)
|
||||
"""
|
||||
query_args += [search_term, search_term, search_term]
|
||||
|
||||
if network_tuple:
|
||||
sql += "AND ("
|
||||
if network_tuple.appservice_id:
|
||||
sql += "appservice_id = ? AND "
|
||||
query_args.append(network_tuple.appservice_id)
|
||||
else:
|
||||
sql += "appservice_id IS NULL AND "
|
||||
|
||||
if network_tuple.network_id:
|
||||
sql += "network_id = ?)"
|
||||
query_args.append(network_tuple.network_id)
|
||||
else:
|
||||
sql += "network_id IS NULL)"
|
||||
|
||||
if forwards:
|
||||
sql += """
|
||||
ORDER BY
|
||||
joined_members DESC, room_id ASC
|
||||
"""
|
||||
else:
|
||||
sql += """
|
||||
ORDER BY
|
||||
joined_members ASC, room_id DESC
|
||||
"""
|
||||
|
||||
if limit is not None:
|
||||
# be cautious about SQL injection
|
||||
assert isinstance(limit, int)
|
||||
|
||||
sql += """
|
||||
LIMIT %d
|
||||
""" % (
|
||||
limit,
|
||||
)
|
||||
|
||||
def _get_largest_public_rooms_txn(txn):
|
||||
txn.execute(sql, query_args)
|
||||
|
||||
results = self.cursor_to_dict(txn)
|
||||
|
||||
if not forwards:
|
||||
results.reverse()
|
||||
|
||||
return results
|
||||
|
||||
ret_val = yield self.runInteraction(
|
||||
"get_largest_public_rooms", _get_largest_public_rooms_txn
|
||||
)
|
||||
defer.returnValue(ret_val)
|
||||
|
||||
@cached(max_entries=10000)
|
||||
def is_room_blocked(self, room_id):
|
||||
return self._simple_select_one_onecol(
|
||||
|
||||
@@ -68,6 +68,9 @@ class StatsStore(StateDeltasStore):
|
||||
yield self._end_background_update("populate_stats_createtables")
|
||||
defer.returnValue(1)
|
||||
|
||||
# TODO dev only
|
||||
yield self.delete_all_stats()
|
||||
|
||||
# Get all the rooms that we want to process.
|
||||
def _make_staging_area(txn):
|
||||
# Create the temporary tables
|
||||
|
||||
Reference in New Issue
Block a user