Compare commits

...

18 Commits

Author SHA1 Message Date
Richard van der Hoff
b07a33f024 Avoid doing presence updates on replication reconnect
Presence is supposed to be disabled on matrix.org, so we shouldn't send a load
of USER_SYNC commands every time the synchrotron reconnects to the master.
2018-02-15 09:51:09 +00:00
hera
74539aa25b Disable auth on room_members for now
because the moznet bridge is broken (https://github.com/matrix-org/matrix-appservice-irc/issues/506)
2018-02-15 09:51:09 +00:00
Erik Johnston
8f25cd6627 Bump LAST_SEEN_GRANULARITY in client_ips 2018-02-15 09:51:09 +00:00
Erik Johnston
dc1299a4b0 Prefill client_ip_last_seen in replication 2018-02-15 09:51:09 +00:00
Erik Johnston
2c72d66cda Move event sending to end in shutdown room admin api 2018-02-15 09:51:09 +00:00
Erik Johnston
cde90a89ed Add dummy presence REST handler to frontend proxy
The handler no-ops all requests as presence is disabled.
2018-02-15 09:51:09 +00:00
Erik Johnston
5aec53ad95 Don't intern type/state_keys in state store 2018-02-15 09:51:09 +00:00
Erik Johnston
d10d19f0ad Increase store._state_group_cache cache size 2018-02-15 09:51:09 +00:00
Erik Johnston
daec1d77be Make _get_joined_hosts_cache cache non-iterable 2018-02-15 09:51:09 +00:00
Erik Johnston
61885f7849 Increase MAX_EVENTS_BEHIND for replication clients 2018-02-15 09:51:09 +00:00
Erik Johnston
ba30d489d9 Disable presence in txn queue 2018-02-15 09:51:09 +00:00
Erik Johnston
5e2d0650df Handle exceptions in get_hosts_for_room when sending events over federation 2018-02-15 09:51:09 +00:00
Erik Johnston
841bcbcafa Limit concurrent AS joins 2018-02-15 09:51:09 +00:00
Erik Johnston
08a6b88e3d Disable presence
This reverts commit 0ebd376a53 and
disables presence a bit more
2018-02-15 09:51:09 +00:00
Erik Johnston
aece8e73b1 Make push actions rotation configurable 2018-02-15 09:51:09 +00:00
Mark Haines
328bd35e00 Deleting from event_push_actions needs to use an index 2018-02-15 09:51:09 +00:00
Erik Johnston
7de9a28b8e Disable auto search for prefixes in event search 2018-02-15 09:51:09 +00:00
Erik Johnston
2a9c3aea89 Add timeout to ResponseCache of /public_rooms 2018-02-15 09:51:09 +00:00
20 changed files with 119 additions and 55 deletions

View File

@@ -36,6 +36,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns
from synapse.rest.client.v2_alpha._base import client_v2_patterns from synapse.rest.client.v2_alpha._base import client_v2_patterns
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
@@ -49,6 +50,35 @@ from twisted.web.resource import Resource
logger = logging.getLogger("synapse.app.frontend_proxy") logger = logging.getLogger("synapse.app.frontend_proxy")
class PresenceStatusStubServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")
def __init__(self, hs):
super(PresenceStatusStubServlet, self).__init__(hs)
self.http_client = hs.get_simple_http_client()
self.auth = hs.get_auth()
self.main_uri = hs.config.worker_main_http_uri
@defer.inlineCallbacks
def on_GET(self, request, user_id):
# Pass through the auth headers, if any, in case the access token
# is there.
auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
headers = {
"Authorization": auth_headers,
}
result = yield self.http_client.get_json(
self.main_uri + request.uri,
headers=headers,
)
defer.returnValue((200, result))
@defer.inlineCallbacks
def on_PUT(self, request, user_id):
yield self.auth.get_user_by_req(request)
defer.returnValue((200, {}))
class KeyUploadServlet(RestServlet): class KeyUploadServlet(RestServlet):
PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$") PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
@@ -135,6 +165,7 @@ class FrontendProxyServer(HomeServer):
elif name == "client": elif name == "client":
resource = JsonResource(self, canonical_json=False) resource = JsonResource(self, canonical_json=False)
KeyUploadServlet(self).register(resource) KeyUploadServlet(self).register(resource)
PresenceStatusStubServlet(self).register(resource)
resources.update({ resources.update({
"/_matrix/client/r0": resource, "/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource, "/_matrix/client/unstable": resource,

View File

@@ -117,6 +117,7 @@ class SynchrotronPresence(object):
logger.info("Presence process_id is %r", self.process_id) logger.info("Presence process_id is %r", self.process_id)
def send_user_sync(self, user_id, is_syncing, last_sync_ms): def send_user_sync(self, user_id, is_syncing, last_sync_ms):
return
self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms) self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
def mark_as_coming_online(self, user_id): def mark_as_coming_online(self, user_id):
@@ -214,6 +215,8 @@ class SynchrotronPresence(object):
yield self.notify_from_replication(states, stream_id) yield self.notify_from_replication(states, stream_id)
def get_currently_syncing_users(self): def get_currently_syncing_users(self):
# presence is disabled on matrix.org, so we return the empty set
return set()
return [ return [
user_id for user_id, count in self.user_to_num_current_syncs.iteritems() user_id for user_id, count in self.user_to_num_current_syncs.iteritems()
if count > 0 if count > 0

View File

@@ -184,17 +184,22 @@ class TransactionQueue(object):
if not is_mine and send_on_behalf_of is None: if not is_mine and send_on_behalf_of is None:
continue continue
# Get the state from before the event. try:
# We need to make sure that this is the state from before # Get the state from before the event.
# the event and not from after it. # We need to make sure that this is the state from before
# Otherwise if the last member on a server in a room is # the event and not from after it.
# banned then it won't receive the event because it won't # Otherwise if the last member on a server in a room is
# be in the room after the ban. # banned then it won't receive the event because it won't
destinations = yield self.state.get_current_hosts_in_room( # be in the room after the ban.
event.room_id, latest_event_ids=[ destinations = yield self.state.get_current_hosts_in_room(
prev_id for prev_id, _ in event.prev_events event.room_id, latest_event_ids=[
], prev_id for prev_id, _ in event.prev_events
) ],
)
except Exception:
logger.exception("Failed to calculate hosts in room")
continue
destinations = set(destinations) destinations = set(destinations)
if send_on_behalf_of is not None: if send_on_behalf_of is not None:
@@ -254,6 +259,7 @@ class TransactionQueue(object):
Args: Args:
states (list(UserPresenceState)) states (list(UserPresenceState))
""" """
return
# First we queue up the new presence by user ID, so multiple presence # First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled # updates in quick successtion are correctly handled

View File

@@ -372,6 +372,7 @@ class InitialSyncHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def get_presence(): def get_presence():
defer.returnValue([])
states = yield presence_handler.get_states( states = yield presence_handler.get_states(
[m.user_id for m in room_members], [m.user_id for m in room_members],
as_event=True, as_event=True,

View File

@@ -283,7 +283,7 @@ class MessageHandler(BaseHandler):
# If this is an AS, double check that they are allowed to see the members. # If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or becuase there # This can either be because the AS user is in the room or becuase there
# is a user in the room that the AS is "interested in" # is a user in the room that the AS is "interested in"
if requester.app_service and user_id not in users_with_profile: if False and requester.app_service and user_id not in users_with_profile:
for uid in users_with_profile: for uid in users_with_profile:
if requester.app_service.is_interested_in_user(uid): if requester.app_service.is_interested_in_user(uid):
break break

View File

@@ -372,6 +372,7 @@ class PresenceHandler(object):
"""We've seen the user do something that indicates they're interacting """We've seen the user do something that indicates they're interacting
with the app. with the app.
""" """
return
user_id = user.to_string() user_id = user.to_string()
bump_active_time_counter.inc() bump_active_time_counter.inc()
@@ -401,6 +402,7 @@ class PresenceHandler(object):
Useful for streams that are not associated with an actual Useful for streams that are not associated with an actual
client that is being used by a user. client that is being used by a user.
""" """
affect_presence = False
if affect_presence: if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0) curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1 self.user_to_num_current_syncs[user_id] = curr_sync + 1
@@ -443,6 +445,8 @@ class PresenceHandler(object):
Returns: Returns:
set(str): A set of user_id strings. set(str): A set of user_id strings.
""" """
# presence is disabled on matrix.org, so we return the empty set
return set()
syncing_user_ids = { syncing_user_ids = {
user_id for user_id, count in self.user_to_num_current_syncs.items() user_id for user_id, count in self.user_to_num_current_syncs.items()
if count if count
@@ -462,6 +466,7 @@ class PresenceHandler(object):
syncing_user_ids(set(str)): The set of user_ids that are syncing_user_ids(set(str)): The set of user_ids that are
currently syncing on that server. currently syncing on that server.
""" """
return
# Grab the previous list of user_ids that were syncing on that process # Grab the previous list of user_ids that were syncing on that process
prev_syncing_user_ids = ( prev_syncing_user_ids = (

View File

@@ -44,7 +44,7 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
class RoomListHandler(BaseHandler): class RoomListHandler(BaseHandler):
def __init__(self, hs): def __init__(self, hs):
super(RoomListHandler, self).__init__(hs) super(RoomListHandler, self).__init__(hs)
self.response_cache = ResponseCache(hs) self.response_cache = ResponseCache(hs, timeout_ms=10 * 60 * 1000)
self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000) self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000)
def get_local_public_room_list(self, limit=None, since_token=None, def get_local_public_room_list(self, limit=None, since_token=None,

View File

@@ -28,7 +28,7 @@ from synapse.api.constants import (
) )
from synapse.api.errors import AuthError, SynapseError, Codes from synapse.api.errors import AuthError, SynapseError, Codes
from synapse.types import UserID, RoomID from synapse.types import UserID, RoomID
from synapse.util.async import Linearizer from synapse.util.async import Linearizer, Limiter
from synapse.util.distributor import user_left_room, user_joined_room from synapse.util.distributor import user_left_room, user_joined_room
from ._base import BaseHandler from ._base import BaseHandler
@@ -50,6 +50,7 @@ class RoomMemberHandler(BaseHandler):
self.event_creation_hander = hs.get_event_creation_handler() self.event_creation_hander = hs.get_event_creation_handler()
self.member_linearizer = Linearizer(name="member") self.member_linearizer = Linearizer(name="member")
self.member_limiter = Limiter(3)
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.spam_checker = hs.get_spam_checker() self.spam_checker = hs.get_spam_checker()
@@ -161,18 +162,23 @@ class RoomMemberHandler(BaseHandler):
): ):
key = (room_id,) key = (room_id,)
with (yield self.member_linearizer.queue(key)): as_id = object()
result = yield self._update_membership( if requester.app_service:
requester, as_id = requester.app_service.id
target,
room_id, with (yield self.member_limiter.queue(as_id)):
action, with (yield self.member_linearizer.queue(key)):
txn_id=txn_id, result = yield self._update_membership(
remote_room_hosts=remote_room_hosts, requester,
third_party_signed=third_party_signed, target,
ratelimit=ratelimit, room_id,
content=content, action,
) txn_id=txn_id,
remote_room_hosts=remote_room_hosts,
third_party_signed=third_party_signed,
ratelimit=ratelimit,
content=content,
)
defer.returnValue(result) defer.returnValue(result)

View File

@@ -585,7 +585,7 @@ class SyncHandler(object):
since_token is None and since_token is None and
sync_config.filter_collection.blocks_all_presence() sync_config.filter_collection.blocks_all_presence()
) )
if not block_all_presence_data: if False and not block_all_presence_data:
yield self._generate_sync_entry_for_presence( yield self._generate_sync_entry_for_presence(
sync_result_builder, newly_joined_rooms, newly_joined_users sync_result_builder, newly_joined_rooms, newly_joined_users
) )

View File

@@ -42,6 +42,8 @@ class SlavedClientIpStore(BaseSlavedStore):
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
return return
self.client_ip_last_seen.prefill(key, now)
self.hs.get_tcp_replication().send_user_ip( self.hs.get_tcp_replication().send_user_ip(
user_id, access_token, ip, user_agent, device_id, now user_id, access_token, ip, user_agent, device_id, now
) )

View File

@@ -33,7 +33,7 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
MAX_EVENTS_BEHIND = 10000 MAX_EVENTS_BEHIND = 500000
EventStreamRow = namedtuple("EventStreamRow", ( EventStreamRow = namedtuple("EventStreamRow", (

View File

@@ -212,17 +212,6 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
) )
new_room_id = info["room_id"] new_room_id = info["room_id"]
yield self.event_creation_handler.create_and_send_nonmember_event(
room_creator_requester,
{
"type": "m.room.message",
"content": {"body": message, "msgtype": "m.text"},
"room_id": new_room_id,
"sender": new_room_user_id,
},
ratelimit=False,
)
requester_user_id = requester.user.to_string() requester_user_id = requester.user.to_string()
logger.info("Shutting down room %r", room_id) logger.info("Shutting down room %r", room_id)
@@ -260,6 +249,17 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
kicked_users.append(user_id) kicked_users.append(user_id)
yield self.event_creation_handler.create_and_send_nonmember_event(
room_creator_requester,
{
"type": "m.room.message",
"content": {"body": message, "msgtype": "m.text"},
"room_id": new_room_id,
"sender": new_room_user_id,
},
ratelimit=False,
)
aliases_for_room = yield self.store.get_aliases_for_room(room_id) aliases_for_room = yield self.store.get_aliases_for_room(room_id)
yield self.store.update_aliases_for_room( yield self.store.update_aliases_for_room(

View File

@@ -81,7 +81,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet):
except Exception: except Exception:
raise SynapseError(400, "Unable to parse state") raise SynapseError(400, "Unable to parse state")
yield self.presence_handler.set_state(user, state) # yield self.presence_handler.set_state(user, state)
defer.returnValue((200, {})) defer.returnValue((200, {}))

View File

@@ -28,7 +28,7 @@ logger = logging.getLogger(__name__)
# Number of msec of granularity to store the user IP 'last seen' time. Smaller # Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits # times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes # 120 seconds == 2 minutes
LAST_SEEN_GRANULARITY = 120 * 1000 LAST_SEEN_GRANULARITY = 10 * 60 * 1000
class ClientIpStore(background_updates.BackgroundUpdateStore): class ClientIpStore(background_updates.BackgroundUpdateStore):

View File

@@ -87,6 +87,8 @@ class EventPushActionsStore(SQLBaseStore):
self._rotate_notif_loop = self._clock.looping_call( self._rotate_notif_loop = self._clock.looping_call(
self._rotate_notifs, 30 * 60 * 1000 self._rotate_notifs, 30 * 60 * 1000
) )
self._rotate_delay = 3
self._rotate_count = 10000
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
""" """
@@ -629,7 +631,7 @@ class EventPushActionsStore(SQLBaseStore):
) )
if caught_up: if caught_up:
break break
yield sleep(5) yield sleep(self._rotate_delay)
finally: finally:
self._doing_notif_rotation = False self._doing_notif_rotation = False
@@ -650,8 +652,8 @@ class EventPushActionsStore(SQLBaseStore):
txn.execute(""" txn.execute("""
SELECT stream_ordering FROM event_push_actions SELECT stream_ordering FROM event_push_actions
WHERE stream_ordering > ? WHERE stream_ordering > ?
ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000 ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
""", (old_rotate_stream_ordering,)) """, (old_rotate_stream_ordering, self._rotate_count))
stream_row = txn.fetchone() stream_row = txn.fetchone()
if stream_row: if stream_row:
offset_stream_ordering, = stream_row offset_stream_ordering, = stream_row

View File

@@ -1043,7 +1043,6 @@ class EventsStore(SQLBaseStore):
"event_edge_hashes", "event_edge_hashes",
"event_edges", "event_edges",
"event_forward_extremities", "event_forward_extremities",
"event_push_actions",
"event_reference_hashes", "event_reference_hashes",
"event_search", "event_search",
"event_signatures", "event_signatures",
@@ -1063,6 +1062,14 @@ class EventsStore(SQLBaseStore):
[(ev.event_id,) for ev, _ in events_and_contexts] [(ev.event_id,) for ev, _ in events_and_contexts]
) )
for table in (
"event_push_actions",
):
txn.executemany(
"DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
[(ev.event_id,) for ev, _ in events_and_contexts]
)
def _store_event_txn(self, txn, events_and_contexts): def _store_event_txn(self, txn, events_and_contexts):
"""Insert new events into the event and event_json tables """Insert new events into the event and event_json tables

View File

@@ -675,7 +675,7 @@ class RoomMemberStore(SQLBaseStore):
defer.returnValue(result) defer.returnValue(result)
@cached(max_entries=10000, iterable=True) @cached(max_entries=10000)
def _get_joined_hosts_cache(self, room_id): def _get_joined_hosts_cache(self, room_id):
return _JoinedHostsCache(self, room_id) return _JoinedHostsCache(self, room_id)

View File

@@ -719,7 +719,7 @@ def _parse_query(database_engine, search_term):
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
if isinstance(database_engine, PostgresEngine): if isinstance(database_engine, PostgresEngine):
return " & ".join(result + ":*" for result in results) return " & ".join(result for result in results)
elif isinstance(database_engine, Sqlite3Engine): elif isinstance(database_engine, Sqlite3Engine):
return " & ".join(result + "*" for result in results) return " & ".join(result + "*" for result in results)
else: else:

View File

@@ -54,7 +54,7 @@ class StateGroupWorkerStore(SQLBaseStore):
super(StateGroupWorkerStore, self).__init__(db_conn, hs) super(StateGroupWorkerStore, self).__init__(db_conn, hs)
self._state_group_cache = DictionaryCache( self._state_group_cache = DictionaryCache(
"*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR "*stateGroupCache*", 500000 * CACHE_SIZE_FACTOR
) )
@cached(max_entries=100000, iterable=True) @cached(max_entries=100000, iterable=True)
@@ -532,8 +532,7 @@ class StateGroupWorkerStore(SQLBaseStore):
state_dict = results[group] state_dict = results[group]
state_dict.update( state_dict.update(
((intern_string(k[0]), intern_string(k[1])), to_ascii(v)) group_state_dict.iteritems()
for k, v in group_state_dict.iteritems()
) )
self._state_group_cache.update( self._state_group_cache.update(

View File

@@ -984,11 +984,13 @@ class RoomInitialSyncTestCase(RestTestCase):
self.assertTrue("presence" in response) self.assertTrue("presence" in response)
presence_by_user = { # presence is turned off on hotfixes
e["content"]["user_id"]: e for e in response["presence"]
} # presence_by_user = {
self.assertTrue(self.user_id in presence_by_user) # e["content"]["user_id"]: e for e in response["presence"]
self.assertEquals("m.presence", presence_by_user[self.user_id]["type"]) # }
# self.assertTrue(self.user_id in presence_by_user)
# self.assertEquals("m.presence", presence_by_user[self.user_id]["type"])
class RoomMessageListTestCase(RestTestCase): class RoomMessageListTestCase(RestTestCase):