mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-13 01:50:46 +00:00
Compare commits
3 Commits
patch-1
...
erikj/as_u
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
86090eadb0 | ||
|
|
edbeed06ca | ||
|
|
277d2c506d |
@@ -42,7 +42,7 @@ logger = logging.getLogger("synapse.app.appservice")
|
|||||||
|
|
||||||
|
|
||||||
class AppserviceSlaveStore(
|
class AppserviceSlaveStore(
|
||||||
DirectoryStore, SlavedEventStore, SlavedApplicationServiceStore,
|
DirectoryStore, SlavedApplicationServiceStore, SlavedEventStore,
|
||||||
SlavedRegistrationStore,
|
SlavedRegistrationStore,
|
||||||
):
|
):
|
||||||
pass
|
pass
|
||||||
|
|||||||
@@ -50,11 +50,11 @@ logger = logging.getLogger("synapse.app.client_reader")
|
|||||||
|
|
||||||
|
|
||||||
class ClientReaderSlavedStore(
|
class ClientReaderSlavedStore(
|
||||||
|
SlavedApplicationServiceStore,
|
||||||
SlavedEventStore,
|
SlavedEventStore,
|
||||||
SlavedKeyStore,
|
SlavedKeyStore,
|
||||||
RoomStore,
|
RoomStore,
|
||||||
DirectoryStore,
|
DirectoryStore,
|
||||||
SlavedApplicationServiceStore,
|
|
||||||
SlavedRegistrationStore,
|
SlavedRegistrationStore,
|
||||||
TransactionStore,
|
TransactionStore,
|
||||||
SlavedClientIpStore,
|
SlavedClientIpStore,
|
||||||
|
|||||||
@@ -64,6 +64,7 @@ logger = logging.getLogger("synapse.app.synchrotron")
|
|||||||
class SynchrotronSlavedStore(
|
class SynchrotronSlavedStore(
|
||||||
SlavedReceiptsStore,
|
SlavedReceiptsStore,
|
||||||
SlavedAccountDataStore,
|
SlavedAccountDataStore,
|
||||||
|
SlavedPushRuleStore,
|
||||||
SlavedApplicationServiceStore,
|
SlavedApplicationServiceStore,
|
||||||
SlavedRegistrationStore,
|
SlavedRegistrationStore,
|
||||||
SlavedFilteringStore,
|
SlavedFilteringStore,
|
||||||
@@ -71,7 +72,6 @@ class SynchrotronSlavedStore(
|
|||||||
SlavedGroupServerStore,
|
SlavedGroupServerStore,
|
||||||
SlavedDeviceInboxStore,
|
SlavedDeviceInboxStore,
|
||||||
SlavedDeviceStore,
|
SlavedDeviceStore,
|
||||||
SlavedPushRuleStore,
|
|
||||||
SlavedEventStore,
|
SlavedEventStore,
|
||||||
SlavedClientIpStore,
|
SlavedClientIpStore,
|
||||||
RoomStore,
|
RoomStore,
|
||||||
|
|||||||
@@ -49,8 +49,8 @@ logger = logging.getLogger("synapse.app.user_dir")
|
|||||||
|
|
||||||
|
|
||||||
class UserDirectorySlaveStore(
|
class UserDirectorySlaveStore(
|
||||||
SlavedEventStore,
|
|
||||||
SlavedApplicationServiceStore,
|
SlavedApplicationServiceStore,
|
||||||
|
SlavedEventStore,
|
||||||
SlavedRegistrationStore,
|
SlavedRegistrationStore,
|
||||||
SlavedClientIpStore,
|
SlavedClientIpStore,
|
||||||
UserDirectoryStore,
|
UserDirectoryStore,
|
||||||
|
|||||||
@@ -13,7 +13,6 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
|
||||||
from synapse.types import GroupID, get_domain_from_id
|
from synapse.types import GroupID, get_domain_from_id
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
@@ -173,6 +172,7 @@ class ApplicationService(object):
|
|||||||
|
|
||||||
if self.is_interested_in_user(event.sender):
|
if self.is_interested_in_user(event.sender):
|
||||||
defer.returnValue(True)
|
defer.returnValue(True)
|
||||||
|
|
||||||
# also check m.room.member state key
|
# also check m.room.member state key
|
||||||
if (event.type == EventTypes.Member and
|
if (event.type == EventTypes.Member and
|
||||||
self.is_interested_in_user(event.state_key)):
|
self.is_interested_in_user(event.state_key)):
|
||||||
@@ -181,20 +181,18 @@ class ApplicationService(object):
|
|||||||
if not store:
|
if not store:
|
||||||
defer.returnValue(False)
|
defer.returnValue(False)
|
||||||
|
|
||||||
does_match = yield self._matches_user_in_member_list(event.room_id, store)
|
does_match = yield self._matches_user_in_member_list(
|
||||||
|
event, store,
|
||||||
|
)
|
||||||
defer.returnValue(does_match)
|
defer.returnValue(does_match)
|
||||||
|
|
||||||
@cachedInlineCallbacks(num_args=1, cache_context=True)
|
@defer.inlineCallbacks
|
||||||
def _matches_user_in_member_list(self, room_id, store, cache_context):
|
def _matches_user_in_member_list(self, event, store):
|
||||||
member_list = yield store.get_users_in_room(
|
ases = yield store.get_appservices_with_user_in_room(
|
||||||
room_id, on_invalidate=cache_context.invalidate
|
event,
|
||||||
)
|
)
|
||||||
|
|
||||||
# check joined member events
|
defer.returnValue(self.id in ases)
|
||||||
for user_id in member_list:
|
|
||||||
if self.is_interested_in_user(user_id):
|
|
||||||
defer.returnValue(True)
|
|
||||||
defer.returnValue(False)
|
|
||||||
|
|
||||||
def _matches_room_id(self, event):
|
def _matches_room_id(self, event):
|
||||||
if hasattr(event, "room_id"):
|
if hasattr(event, "room_id"):
|
||||||
|
|||||||
@@ -17,8 +17,10 @@
|
|||||||
from synapse.storage.appservice import (
|
from synapse.storage.appservice import (
|
||||||
ApplicationServiceWorkerStore, ApplicationServiceTransactionWorkerStore,
|
ApplicationServiceWorkerStore, ApplicationServiceTransactionWorkerStore,
|
||||||
)
|
)
|
||||||
|
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||||
|
|
||||||
|
|
||||||
class SlavedApplicationServiceStore(ApplicationServiceTransactionWorkerStore,
|
class SlavedApplicationServiceStore(ApplicationServiceTransactionWorkerStore,
|
||||||
ApplicationServiceWorkerStore):
|
ApplicationServiceWorkerStore,
|
||||||
|
SlavedEventStore):
|
||||||
pass
|
pass
|
||||||
|
|||||||
@@ -18,9 +18,14 @@ import re
|
|||||||
import simplejson as json
|
import simplejson as json
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
|
from synapse.api.constants import EventTypes, Membership
|
||||||
from synapse.appservice import AppServiceTransaction
|
from synapse.appservice import AppServiceTransaction
|
||||||
from synapse.config.appservice import load_appservices
|
from synapse.config.appservice import load_appservices
|
||||||
from synapse.storage.events import EventsWorkerStore
|
from synapse.storage.events import EventsWorkerStore
|
||||||
|
from synapse.storage.roommember import RoomMemberWorkerStore
|
||||||
|
from synapse.storage.state import StateGroupWorkerStore
|
||||||
|
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||||
|
from synapse.util.async import Linearizer
|
||||||
from ._base import SQLBaseStore
|
from ._base import SQLBaseStore
|
||||||
|
|
||||||
|
|
||||||
@@ -46,7 +51,8 @@ def _make_exclusive_regex(services_cache):
|
|||||||
return exclusive_user_regex
|
return exclusive_user_regex
|
||||||
|
|
||||||
|
|
||||||
class ApplicationServiceWorkerStore(SQLBaseStore):
|
class ApplicationServiceWorkerStore(RoomMemberWorkerStore, StateGroupWorkerStore,
|
||||||
|
SQLBaseStore):
|
||||||
def __init__(self, db_conn, hs):
|
def __init__(self, db_conn, hs):
|
||||||
self.services_cache = load_appservices(
|
self.services_cache = load_appservices(
|
||||||
hs.hostname,
|
hs.hostname,
|
||||||
@@ -111,6 +117,38 @@ class ApplicationServiceWorkerStore(SQLBaseStore):
|
|||||||
return service
|
return service
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_appservices_with_user_in_room(self, event):
|
||||||
|
"""Get the list of appservices in the room at the given event
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event (Event)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[set(str)]: The IDs of all ASes in the room
|
||||||
|
"""
|
||||||
|
state_group = yield self._get_state_group_for_event(event.event_id)
|
||||||
|
|
||||||
|
if not state_group:
|
||||||
|
raise Exception("No state group for event %s", event.event_id)
|
||||||
|
|
||||||
|
ases_in_room = yield self._get_appservices_with_user_in_room(
|
||||||
|
event.room_id, state_group,
|
||||||
|
)
|
||||||
|
|
||||||
|
defer.returnValue(ases_in_room)
|
||||||
|
|
||||||
|
@cachedInlineCallbacks(num_args=2, max_entries=10000)
|
||||||
|
def _get_appservices_with_user_in_room(self, room_id, state_group):
|
||||||
|
cache = self._get_appservices_with_user_in_room_cache(room_id)
|
||||||
|
ases_in_room = yield cache.get_appservices_in_room_by_user(state_group)
|
||||||
|
|
||||||
|
defer.returnValue(ases_in_room)
|
||||||
|
|
||||||
|
@cached(max_entries=10000)
|
||||||
|
def _get_appservices_with_user_in_room_cache(self, room_id):
|
||||||
|
return _AppserviceUsersCache(self, room_id)
|
||||||
|
|
||||||
|
|
||||||
class ApplicationServiceStore(ApplicationServiceWorkerStore):
|
class ApplicationServiceStore(ApplicationServiceWorkerStore):
|
||||||
# This is currently empty due to there not being any AS storage functions
|
# This is currently empty due to there not being any AS storage functions
|
||||||
@@ -346,6 +384,7 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
|
|||||||
" (SELECT stream_ordering FROM appservice_stream_position)"
|
" (SELECT stream_ordering FROM appservice_stream_position)"
|
||||||
" < e.stream_ordering"
|
" < e.stream_ordering"
|
||||||
" AND e.stream_ordering <= ?"
|
" AND e.stream_ordering <= ?"
|
||||||
|
" AND NOT e.outlier"
|
||||||
" ORDER BY e.stream_ordering ASC"
|
" ORDER BY e.stream_ordering ASC"
|
||||||
" LIMIT ?"
|
" LIMIT ?"
|
||||||
)
|
)
|
||||||
@@ -374,3 +413,119 @@ class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStor
|
|||||||
# to keep consistency with the other stores, we keep this empty class for
|
# to keep consistency with the other stores, we keep this empty class for
|
||||||
# now.
|
# now.
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class _AppserviceUsersCache(object):
|
||||||
|
"""Attempts to calculate which appservices have users in a given room by
|
||||||
|
looking at state groups and their delta_ids
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, store, room_id):
|
||||||
|
self.store = store
|
||||||
|
self.room_id = room_id
|
||||||
|
|
||||||
|
self.linearizer = Linearizer("_AppserviceUsersCache")
|
||||||
|
|
||||||
|
# The last state group we calculated the ASes in the room for.
|
||||||
|
self.state_group = object()
|
||||||
|
|
||||||
|
# A dict of all appservices in the room at the above state group,
|
||||||
|
# along with a user_id of an AS user in the room.
|
||||||
|
# Dict of as_id -> user_id.
|
||||||
|
self.appservices_in_room = {}
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_appservices_in_room_by_user(self, state_group):
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
state_group(str)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[set(str)]: The IDs of all ASes in the room
|
||||||
|
"""
|
||||||
|
assert state_group is not None
|
||||||
|
|
||||||
|
if state_group == self.state_group:
|
||||||
|
defer.returnValue(frozenset(self.appservices_in_room))
|
||||||
|
|
||||||
|
with (yield self.linearizer.queue(())):
|
||||||
|
# Set of ASes that we need to recalculate their membership of
|
||||||
|
# the room
|
||||||
|
uhandled_ases = set()
|
||||||
|
|
||||||
|
# If the state groups match then there is nothing to do
|
||||||
|
if state_group == self.state_group:
|
||||||
|
defer.returnValue(frozenset(self.appservices_in_room))
|
||||||
|
|
||||||
|
prev_group, delta_ids = yield self.store.get_state_group_delta(state_group)
|
||||||
|
|
||||||
|
# If the prev_group matches the last state group we can calculate
|
||||||
|
# the new value by looking at the deltas
|
||||||
|
if prev_group and prev_group == self.state_group:
|
||||||
|
for (typ, state_key), event_id in delta_ids.iteritems():
|
||||||
|
if typ != EventTypes.Member:
|
||||||
|
continue
|
||||||
|
|
||||||
|
user_id = state_key
|
||||||
|
|
||||||
|
event = yield self.store.get_event(event_id)
|
||||||
|
|
||||||
|
is_join = event.membership == Membership.JOIN
|
||||||
|
for appservice in self.store.get_app_services():
|
||||||
|
as_id = appservice.id
|
||||||
|
|
||||||
|
# If this is a join and the appservice is already in
|
||||||
|
# the room then its a noop
|
||||||
|
if is_join:
|
||||||
|
if as_id in self.appservices_in_room:
|
||||||
|
continue
|
||||||
|
# If this is not a join, then we only need to recalculate
|
||||||
|
# if the AS is in the room and the cached joined AS user
|
||||||
|
# matches this event.
|
||||||
|
elif self.appservices_in_room.get(as_id, None) != user_id:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# If the AS is not interested in the user then its a
|
||||||
|
# noop.
|
||||||
|
if not appservice.is_interested_in_user(user_id):
|
||||||
|
continue
|
||||||
|
|
||||||
|
if is_join:
|
||||||
|
# If an AS user is joining then the AS is now
|
||||||
|
# interested in the room
|
||||||
|
self.appservices_in_room[as_id] = user_id
|
||||||
|
else:
|
||||||
|
# If an AS user has left then we need to
|
||||||
|
# recalcualte if they're in the room.
|
||||||
|
uhandled_ases.add(appservice)
|
||||||
|
self.appservices_in_room.pop(as_id, None)
|
||||||
|
else:
|
||||||
|
uhandled_ases = set(self.store.get_app_services())
|
||||||
|
|
||||||
|
if uhandled_ases:
|
||||||
|
# We need to recalculate which ASes are in the room, so lets
|
||||||
|
# get the current state and try and find a join event
|
||||||
|
# that the AS is interested in.
|
||||||
|
|
||||||
|
current_state_ids = yield self.store.get_state_ids_for_group(state_group)
|
||||||
|
|
||||||
|
for appservice in uhandled_ases:
|
||||||
|
as_id = appservice.id
|
||||||
|
|
||||||
|
self.appservices_in_room.pop(as_id, None)
|
||||||
|
|
||||||
|
for (etype, state_key), event_id in current_state_ids.iteritems():
|
||||||
|
if etype != EventTypes.Member:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not appservice.is_interested_in_user(state_key):
|
||||||
|
continue
|
||||||
|
|
||||||
|
event = yield self.store.get_event(event_id)
|
||||||
|
if event.membership == Membership.JOIN:
|
||||||
|
self.appservices_in_room[as_id] = state_key
|
||||||
|
break
|
||||||
|
|
||||||
|
self.state_group = state_group
|
||||||
|
|
||||||
|
defer.returnValue(frozenset(self.appservices_in_room))
|
||||||
|
|||||||
@@ -440,7 +440,6 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||||||
)
|
)
|
||||||
|
|
||||||
@cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True)
|
@cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True)
|
||||||
# @defer.inlineCallbacks
|
|
||||||
def _get_joined_hosts(self, room_id, state_group, current_state_ids, state_entry):
|
def _get_joined_hosts(self, room_id, state_group, current_state_ids, state_entry):
|
||||||
# We don't use `state_group`, its there so that we can cache based
|
# We don't use `state_group`, its there so that we can cache based
|
||||||
# on it. However, its important that its never None, since two current_state's
|
# on it. However, its important that its never None, since two current_state's
|
||||||
|
|||||||
@@ -55,7 +55,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
|||||||
_regex("@irc_.*")
|
_regex("@irc_.*")
|
||||||
)
|
)
|
||||||
self.event.sender = "@irc_foobar:matrix.org"
|
self.event.sender = "@irc_foobar:matrix.org"
|
||||||
self.assertTrue((yield self.service.is_interested(self.event)))
|
self.assertTrue((yield self.service.is_interested(self.event, None)))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_regex_user_id_prefix_no_match(self):
|
def test_regex_user_id_prefix_no_match(self):
|
||||||
@@ -63,7 +63,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
|||||||
_regex("@irc_.*")
|
_regex("@irc_.*")
|
||||||
)
|
)
|
||||||
self.event.sender = "@someone_else:matrix.org"
|
self.event.sender = "@someone_else:matrix.org"
|
||||||
self.assertFalse((yield self.service.is_interested(self.event)))
|
self.assertFalse((yield self.service.is_interested(self.event, None)))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_regex_room_member_is_checked(self):
|
def test_regex_room_member_is_checked(self):
|
||||||
@@ -73,7 +73,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
|||||||
self.event.sender = "@someone_else:matrix.org"
|
self.event.sender = "@someone_else:matrix.org"
|
||||||
self.event.type = "m.room.member"
|
self.event.type = "m.room.member"
|
||||||
self.event.state_key = "@irc_foobar:matrix.org"
|
self.event.state_key = "@irc_foobar:matrix.org"
|
||||||
self.assertTrue((yield self.service.is_interested(self.event)))
|
self.assertTrue((yield self.service.is_interested(self.event, None)))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_regex_room_id_match(self):
|
def test_regex_room_id_match(self):
|
||||||
@@ -81,7 +81,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
|||||||
_regex("!some_prefix.*some_suffix:matrix.org")
|
_regex("!some_prefix.*some_suffix:matrix.org")
|
||||||
)
|
)
|
||||||
self.event.room_id = "!some_prefixs0m3th1nGsome_suffix:matrix.org"
|
self.event.room_id = "!some_prefixs0m3th1nGsome_suffix:matrix.org"
|
||||||
self.assertTrue((yield self.service.is_interested(self.event)))
|
self.assertTrue((yield self.service.is_interested(self.event, None)))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_regex_room_id_no_match(self):
|
def test_regex_room_id_no_match(self):
|
||||||
@@ -89,7 +89,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
|||||||
_regex("!some_prefix.*some_suffix:matrix.org")
|
_regex("!some_prefix.*some_suffix:matrix.org")
|
||||||
)
|
)
|
||||||
self.event.room_id = "!XqBunHwQIXUiqCaoxq:matrix.org"
|
self.event.room_id = "!XqBunHwQIXUiqCaoxq:matrix.org"
|
||||||
self.assertFalse((yield self.service.is_interested(self.event)))
|
self.assertFalse((yield self.service.is_interested(self.event, None)))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_regex_alias_match(self):
|
def test_regex_alias_match(self):
|
||||||
@@ -160,7 +160,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
|||||||
self.store.get_aliases_for_room.return_value = [
|
self.store.get_aliases_for_room.return_value = [
|
||||||
"#xmpp_foobar:matrix.org", "#athing:matrix.org"
|
"#xmpp_foobar:matrix.org", "#athing:matrix.org"
|
||||||
]
|
]
|
||||||
self.store.get_users_in_room.return_value = []
|
self.store.get_appservices_with_user_in_room.return_value = []
|
||||||
self.assertFalse((yield self.service.is_interested(
|
self.assertFalse((yield self.service.is_interested(
|
||||||
self.event, self.store
|
self.event, self.store
|
||||||
)))
|
)))
|
||||||
@@ -193,20 +193,3 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
|||||||
}
|
}
|
||||||
self.event.state_key = self.service.sender
|
self.event.state_key = self.service.sender
|
||||||
self.assertTrue((yield self.service.is_interested(self.event)))
|
self.assertTrue((yield self.service.is_interested(self.event)))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def test_member_list_match(self):
|
|
||||||
self.service.namespaces[ApplicationService.NS_USERS].append(
|
|
||||||
_regex("@irc_.*")
|
|
||||||
)
|
|
||||||
self.store.get_users_in_room.return_value = [
|
|
||||||
"@alice:here",
|
|
||||||
"@irc_fo:here", # AS user
|
|
||||||
"@bob:here",
|
|
||||||
]
|
|
||||||
self.store.get_aliases_for_room.return_value = []
|
|
||||||
|
|
||||||
self.event.sender = "@xmpp_foobar:matrix.org"
|
|
||||||
self.assertTrue((yield self.service.is_interested(
|
|
||||||
event=self.event, store=self.store
|
|
||||||
)))
|
|
||||||
|
|||||||
Reference in New Issue
Block a user