Compare commits

...

3 Commits

Author SHA1 Message Date
Erik Johnston
86090eadb0 Don't send outlier events to ASes 2018-04-06 10:21:17 +01:00
Erik Johnston
edbeed06ca Fix MRO for replication stores 2018-04-06 10:20:50 +01:00
Erik Johnston
277d2c506d Add cache for if ASes have users in a room 2018-04-05 17:28:27 +01:00
9 changed files with 178 additions and 41 deletions

View File

@@ -42,7 +42,7 @@ logger = logging.getLogger("synapse.app.appservice")
class AppserviceSlaveStore( class AppserviceSlaveStore(
DirectoryStore, SlavedEventStore, SlavedApplicationServiceStore, DirectoryStore, SlavedApplicationServiceStore, SlavedEventStore,
SlavedRegistrationStore, SlavedRegistrationStore,
): ):
pass pass

View File

@@ -50,11 +50,11 @@ logger = logging.getLogger("synapse.app.client_reader")
class ClientReaderSlavedStore( class ClientReaderSlavedStore(
SlavedApplicationServiceStore,
SlavedEventStore, SlavedEventStore,
SlavedKeyStore, SlavedKeyStore,
RoomStore, RoomStore,
DirectoryStore, DirectoryStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore, SlavedRegistrationStore,
TransactionStore, TransactionStore,
SlavedClientIpStore, SlavedClientIpStore,

View File

@@ -64,6 +64,7 @@ logger = logging.getLogger("synapse.app.synchrotron")
class SynchrotronSlavedStore( class SynchrotronSlavedStore(
SlavedReceiptsStore, SlavedReceiptsStore,
SlavedAccountDataStore, SlavedAccountDataStore,
SlavedPushRuleStore,
SlavedApplicationServiceStore, SlavedApplicationServiceStore,
SlavedRegistrationStore, SlavedRegistrationStore,
SlavedFilteringStore, SlavedFilteringStore,
@@ -71,7 +72,6 @@ class SynchrotronSlavedStore(
SlavedGroupServerStore, SlavedGroupServerStore,
SlavedDeviceInboxStore, SlavedDeviceInboxStore,
SlavedDeviceStore, SlavedDeviceStore,
SlavedPushRuleStore,
SlavedEventStore, SlavedEventStore,
SlavedClientIpStore, SlavedClientIpStore,
RoomStore, RoomStore,

View File

@@ -49,8 +49,8 @@ logger = logging.getLogger("synapse.app.user_dir")
class UserDirectorySlaveStore( class UserDirectorySlaveStore(
SlavedEventStore,
SlavedApplicationServiceStore, SlavedApplicationServiceStore,
SlavedEventStore,
SlavedRegistrationStore, SlavedRegistrationStore,
SlavedClientIpStore, SlavedClientIpStore,
UserDirectoryStore, UserDirectoryStore,

View File

@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.types import GroupID, get_domain_from_id from synapse.types import GroupID, get_domain_from_id
from twisted.internet import defer from twisted.internet import defer
@@ -173,6 +172,7 @@ class ApplicationService(object):
if self.is_interested_in_user(event.sender): if self.is_interested_in_user(event.sender):
defer.returnValue(True) defer.returnValue(True)
# also check m.room.member state key # also check m.room.member state key
if (event.type == EventTypes.Member and if (event.type == EventTypes.Member and
self.is_interested_in_user(event.state_key)): self.is_interested_in_user(event.state_key)):
@@ -181,20 +181,18 @@ class ApplicationService(object):
if not store: if not store:
defer.returnValue(False) defer.returnValue(False)
does_match = yield self._matches_user_in_member_list(event.room_id, store) does_match = yield self._matches_user_in_member_list(
event, store,
)
defer.returnValue(does_match) defer.returnValue(does_match)
@cachedInlineCallbacks(num_args=1, cache_context=True) @defer.inlineCallbacks
def _matches_user_in_member_list(self, room_id, store, cache_context): def _matches_user_in_member_list(self, event, store):
member_list = yield store.get_users_in_room( ases = yield store.get_appservices_with_user_in_room(
room_id, on_invalidate=cache_context.invalidate event,
) )
# check joined member events defer.returnValue(self.id in ases)
for user_id in member_list:
if self.is_interested_in_user(user_id):
defer.returnValue(True)
defer.returnValue(False)
def _matches_room_id(self, event): def _matches_room_id(self, event):
if hasattr(event, "room_id"): if hasattr(event, "room_id"):

View File

@@ -17,8 +17,10 @@
from synapse.storage.appservice import ( from synapse.storage.appservice import (
ApplicationServiceWorkerStore, ApplicationServiceTransactionWorkerStore, ApplicationServiceWorkerStore, ApplicationServiceTransactionWorkerStore,
) )
from synapse.replication.slave.storage.events import SlavedEventStore
class SlavedApplicationServiceStore(ApplicationServiceTransactionWorkerStore, class SlavedApplicationServiceStore(ApplicationServiceTransactionWorkerStore,
ApplicationServiceWorkerStore): ApplicationServiceWorkerStore,
SlavedEventStore):
pass pass

View File

@@ -18,9 +18,14 @@ import re
import simplejson as json import simplejson as json
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.appservice import AppServiceTransaction from synapse.appservice import AppServiceTransaction
from synapse.config.appservice import load_appservices from synapse.config.appservice import load_appservices
from synapse.storage.events import EventsWorkerStore from synapse.storage.events import EventsWorkerStore
from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.storage.state import StateGroupWorkerStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.async import Linearizer
from ._base import SQLBaseStore from ._base import SQLBaseStore
@@ -46,7 +51,8 @@ def _make_exclusive_regex(services_cache):
return exclusive_user_regex return exclusive_user_regex
class ApplicationServiceWorkerStore(SQLBaseStore): class ApplicationServiceWorkerStore(RoomMemberWorkerStore, StateGroupWorkerStore,
SQLBaseStore):
def __init__(self, db_conn, hs): def __init__(self, db_conn, hs):
self.services_cache = load_appservices( self.services_cache = load_appservices(
hs.hostname, hs.hostname,
@@ -111,6 +117,38 @@ class ApplicationServiceWorkerStore(SQLBaseStore):
return service return service
return None return None
@defer.inlineCallbacks
def get_appservices_with_user_in_room(self, event):
"""Get the list of appservices in the room at the given event
Args:
event (Event)
Returns:
Deferred[set(str)]: The IDs of all ASes in the room
"""
state_group = yield self._get_state_group_for_event(event.event_id)
if not state_group:
raise Exception("No state group for event %s", event.event_id)
ases_in_room = yield self._get_appservices_with_user_in_room(
event.room_id, state_group,
)
defer.returnValue(ases_in_room)
@cachedInlineCallbacks(num_args=2, max_entries=10000)
def _get_appservices_with_user_in_room(self, room_id, state_group):
cache = self._get_appservices_with_user_in_room_cache(room_id)
ases_in_room = yield cache.get_appservices_in_room_by_user(state_group)
defer.returnValue(ases_in_room)
@cached(max_entries=10000)
def _get_appservices_with_user_in_room_cache(self, room_id):
return _AppserviceUsersCache(self, room_id)
class ApplicationServiceStore(ApplicationServiceWorkerStore): class ApplicationServiceStore(ApplicationServiceWorkerStore):
# This is currently empty due to there not being any AS storage functions # This is currently empty due to there not being any AS storage functions
@@ -346,6 +384,7 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
" (SELECT stream_ordering FROM appservice_stream_position)" " (SELECT stream_ordering FROM appservice_stream_position)"
" < e.stream_ordering" " < e.stream_ordering"
" AND e.stream_ordering <= ?" " AND e.stream_ordering <= ?"
" AND NOT e.outlier"
" ORDER BY e.stream_ordering ASC" " ORDER BY e.stream_ordering ASC"
" LIMIT ?" " LIMIT ?"
) )
@@ -374,3 +413,119 @@ class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStor
# to keep consistency with the other stores, we keep this empty class for # to keep consistency with the other stores, we keep this empty class for
# now. # now.
pass pass
class _AppserviceUsersCache(object):
"""Attempts to calculate which appservices have users in a given room by
looking at state groups and their delta_ids
"""
def __init__(self, store, room_id):
self.store = store
self.room_id = room_id
self.linearizer = Linearizer("_AppserviceUsersCache")
# The last state group we calculated the ASes in the room for.
self.state_group = object()
# A dict of all appservices in the room at the above state group,
# along with a user_id of an AS user in the room.
# Dict of as_id -> user_id.
self.appservices_in_room = {}
@defer.inlineCallbacks
def get_appservices_in_room_by_user(self, state_group):
"""
Args:
state_group(str)
Returns:
Deferred[set(str)]: The IDs of all ASes in the room
"""
assert state_group is not None
if state_group == self.state_group:
defer.returnValue(frozenset(self.appservices_in_room))
with (yield self.linearizer.queue(())):
# Set of ASes that we need to recalculate their membership of
# the room
uhandled_ases = set()
# If the state groups match then there is nothing to do
if state_group == self.state_group:
defer.returnValue(frozenset(self.appservices_in_room))
prev_group, delta_ids = yield self.store.get_state_group_delta(state_group)
# If the prev_group matches the last state group we can calculate
# the new value by looking at the deltas
if prev_group and prev_group == self.state_group:
for (typ, state_key), event_id in delta_ids.iteritems():
if typ != EventTypes.Member:
continue
user_id = state_key
event = yield self.store.get_event(event_id)
is_join = event.membership == Membership.JOIN
for appservice in self.store.get_app_services():
as_id = appservice.id
# If this is a join and the appservice is already in
# the room then its a noop
if is_join:
if as_id in self.appservices_in_room:
continue
# If this is not a join, then we only need to recalculate
# if the AS is in the room and the cached joined AS user
# matches this event.
elif self.appservices_in_room.get(as_id, None) != user_id:
continue
# If the AS is not interested in the user then its a
# noop.
if not appservice.is_interested_in_user(user_id):
continue
if is_join:
# If an AS user is joining then the AS is now
# interested in the room
self.appservices_in_room[as_id] = user_id
else:
# If an AS user has left then we need to
# recalcualte if they're in the room.
uhandled_ases.add(appservice)
self.appservices_in_room.pop(as_id, None)
else:
uhandled_ases = set(self.store.get_app_services())
if uhandled_ases:
# We need to recalculate which ASes are in the room, so lets
# get the current state and try and find a join event
# that the AS is interested in.
current_state_ids = yield self.store.get_state_ids_for_group(state_group)
for appservice in uhandled_ases:
as_id = appservice.id
self.appservices_in_room.pop(as_id, None)
for (etype, state_key), event_id in current_state_ids.iteritems():
if etype != EventTypes.Member:
continue
if not appservice.is_interested_in_user(state_key):
continue
event = yield self.store.get_event(event_id)
if event.membership == Membership.JOIN:
self.appservices_in_room[as_id] = state_key
break
self.state_group = state_group
defer.returnValue(frozenset(self.appservices_in_room))

View File

@@ -440,7 +440,6 @@ class RoomMemberWorkerStore(EventsWorkerStore):
) )
@cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True) @cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True)
# @defer.inlineCallbacks
def _get_joined_hosts(self, room_id, state_group, current_state_ids, state_entry): def _get_joined_hosts(self, room_id, state_group, current_state_ids, state_entry):
# We don't use `state_group`, its there so that we can cache based # We don't use `state_group`, its there so that we can cache based
# on it. However, its important that its never None, since two current_state's # on it. However, its important that its never None, since two current_state's

View File

@@ -55,7 +55,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
_regex("@irc_.*") _regex("@irc_.*")
) )
self.event.sender = "@irc_foobar:matrix.org" self.event.sender = "@irc_foobar:matrix.org"
self.assertTrue((yield self.service.is_interested(self.event))) self.assertTrue((yield self.service.is_interested(self.event, None)))
@defer.inlineCallbacks @defer.inlineCallbacks
def test_regex_user_id_prefix_no_match(self): def test_regex_user_id_prefix_no_match(self):
@@ -63,7 +63,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
_regex("@irc_.*") _regex("@irc_.*")
) )
self.event.sender = "@someone_else:matrix.org" self.event.sender = "@someone_else:matrix.org"
self.assertFalse((yield self.service.is_interested(self.event))) self.assertFalse((yield self.service.is_interested(self.event, None)))
@defer.inlineCallbacks @defer.inlineCallbacks
def test_regex_room_member_is_checked(self): def test_regex_room_member_is_checked(self):
@@ -73,7 +73,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
self.event.sender = "@someone_else:matrix.org" self.event.sender = "@someone_else:matrix.org"
self.event.type = "m.room.member" self.event.type = "m.room.member"
self.event.state_key = "@irc_foobar:matrix.org" self.event.state_key = "@irc_foobar:matrix.org"
self.assertTrue((yield self.service.is_interested(self.event))) self.assertTrue((yield self.service.is_interested(self.event, None)))
@defer.inlineCallbacks @defer.inlineCallbacks
def test_regex_room_id_match(self): def test_regex_room_id_match(self):
@@ -81,7 +81,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
_regex("!some_prefix.*some_suffix:matrix.org") _regex("!some_prefix.*some_suffix:matrix.org")
) )
self.event.room_id = "!some_prefixs0m3th1nGsome_suffix:matrix.org" self.event.room_id = "!some_prefixs0m3th1nGsome_suffix:matrix.org"
self.assertTrue((yield self.service.is_interested(self.event))) self.assertTrue((yield self.service.is_interested(self.event, None)))
@defer.inlineCallbacks @defer.inlineCallbacks
def test_regex_room_id_no_match(self): def test_regex_room_id_no_match(self):
@@ -89,7 +89,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
_regex("!some_prefix.*some_suffix:matrix.org") _regex("!some_prefix.*some_suffix:matrix.org")
) )
self.event.room_id = "!XqBunHwQIXUiqCaoxq:matrix.org" self.event.room_id = "!XqBunHwQIXUiqCaoxq:matrix.org"
self.assertFalse((yield self.service.is_interested(self.event))) self.assertFalse((yield self.service.is_interested(self.event, None)))
@defer.inlineCallbacks @defer.inlineCallbacks
def test_regex_alias_match(self): def test_regex_alias_match(self):
@@ -160,7 +160,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
self.store.get_aliases_for_room.return_value = [ self.store.get_aliases_for_room.return_value = [
"#xmpp_foobar:matrix.org", "#athing:matrix.org" "#xmpp_foobar:matrix.org", "#athing:matrix.org"
] ]
self.store.get_users_in_room.return_value = [] self.store.get_appservices_with_user_in_room.return_value = []
self.assertFalse((yield self.service.is_interested( self.assertFalse((yield self.service.is_interested(
self.event, self.store self.event, self.store
))) )))
@@ -193,20 +193,3 @@ class ApplicationServiceTestCase(unittest.TestCase):
} }
self.event.state_key = self.service.sender self.event.state_key = self.service.sender
self.assertTrue((yield self.service.is_interested(self.event))) self.assertTrue((yield self.service.is_interested(self.event)))
@defer.inlineCallbacks
def test_member_list_match(self):
self.service.namespaces[ApplicationService.NS_USERS].append(
_regex("@irc_.*")
)
self.store.get_users_in_room.return_value = [
"@alice:here",
"@irc_fo:here", # AS user
"@bob:here",
]
self.store.get_aliases_for_room.return_value = []
self.event.sender = "@xmpp_foobar:matrix.org"
self.assertTrue((yield self.service.is_interested(
event=self.event, store=self.store
)))