mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-07 01:20:16 +00:00
Compare commits
300 Commits
erikj/dock
...
v1.10.0rc4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d6eae548a7 | ||
|
|
e439438b9b | ||
|
|
f8a1e0d1d2 | ||
|
|
8a29def84a | ||
|
|
77a166577a | ||
|
|
7d5268d37c | ||
|
|
c854d255e5 | ||
|
|
c660962d4d | ||
|
|
767bef0033 | ||
|
|
4d02bfd6e1 | ||
|
|
a099ab7d38 | ||
|
|
ce72a9ccdb | ||
|
|
bace86ed15 | ||
|
|
45bf455948 | ||
|
|
859663565c | ||
|
|
0876a5b641 | ||
|
|
5b5314ee41 | ||
|
|
aff9189149 | ||
|
|
2eda49a8db | ||
|
|
96b17d4e4f | ||
|
|
aadc131dc1 | ||
|
|
0a522121a0 | ||
|
|
0b5e2c8093 | ||
|
|
c665d154a2 | ||
|
|
31295b5a60 | ||
|
|
aebe20c452 | ||
|
|
508e0f9310 | ||
|
|
e04e7e830e | ||
|
|
5407e69732 | ||
|
|
2c59eb368c | ||
|
|
6d1a3e2bdd | ||
|
|
7fa4586e36 | ||
|
|
33b4aa8d99 | ||
|
|
627cf5def8 | ||
|
|
b409d51dee | ||
|
|
4a4e620f30 | ||
|
|
28889d8da5 | ||
|
|
15b2a50817 | ||
|
|
b852a8247d | ||
|
|
7b55cca011 | ||
|
|
a9577ab1f4 | ||
|
|
cb217d5d60 | ||
|
|
f4f5355bcf | ||
|
|
23bb2713d2 | ||
|
|
b2471e1109 | ||
|
|
610219d53d | ||
|
|
b464afe283 | ||
|
|
7657ad3ced | ||
|
|
721086a291 | ||
|
|
6e6b53ed3a | ||
|
|
601b50672d | ||
|
|
a7af389da0 | ||
|
|
99db0d76fd | ||
|
|
561b0f79bc | ||
|
|
8569f3cdef | ||
|
|
7b61e6f5d6 | ||
|
|
05241b3031 | ||
|
|
e01026d84d | ||
|
|
ee91c69ef7 | ||
|
|
e0eef47315 | ||
|
|
44d2ca2990 | ||
|
|
9240622c1a | ||
|
|
0dbba85e95 | ||
|
|
1ceeccb769 | ||
|
|
39883e85bd | ||
|
|
68f53b7a0e | ||
|
|
e679b008ff | ||
|
|
e80a5b7492 | ||
|
|
b272e7345f | ||
|
|
a81e0233e9 | ||
|
|
80898481ab | ||
|
|
9d4c716d85 | ||
|
|
d90b0946ed | ||
|
|
8d5762b0dc | ||
|
|
a7efbc5416 | ||
|
|
be362cb8f8 | ||
|
|
873ff9522b | ||
|
|
c1ee2999a0 | ||
|
|
9b2b386f76 | ||
|
|
65fe31786d | ||
|
|
70b6d1dfd6 | ||
|
|
ee62aed72e | ||
|
|
c02f26319d | ||
|
|
fdd182870c | ||
|
|
4102cb220a | ||
|
|
5299707329 | ||
|
|
43e01be158 | ||
|
|
589e080c6b | ||
|
|
24e48bc9ff | ||
|
|
576b62a6a3 | ||
|
|
ad2ba70959 | ||
|
|
a330505025 | ||
|
|
67b73fd147 | ||
|
|
c08e4dbadc | ||
|
|
6dbd498772 | ||
|
|
03b09b32d6 | ||
|
|
8f1711da0e | ||
|
|
6fb6c98f71 | ||
|
|
aad993f24d | ||
|
|
544e101c24 | ||
|
|
8699f380f0 | ||
|
|
e91a68ef3a | ||
|
|
9f5048c198 | ||
|
|
b3c40ba58a | ||
|
|
8d69193a42 | ||
|
|
bbcd19f2d0 | ||
|
|
3cd598135f | ||
|
|
1c8f2c34ff | ||
|
|
ca03f90ee7 | ||
|
|
9feee29d76 | ||
|
|
e7dcee13da | ||
|
|
7467738834 | ||
|
|
d75fb8ae22 | ||
|
|
ae25a8efef | ||
|
|
fc5be50d56 | ||
|
|
aadba440da | ||
|
|
ec94d6a590 | ||
|
|
42ce90c3f7 | ||
|
|
8467756dc1 | ||
|
|
613b443ff0 | ||
|
|
233b61ac61 | ||
|
|
f41c9d37d6 | ||
|
|
1048e2ca6a | ||
|
|
ce0ce1add3 | ||
|
|
b0bf1ea7bd | ||
|
|
2561b628af | ||
|
|
73c6630718 | ||
|
|
a189bb03ab | ||
|
|
404a2d70be | ||
|
|
ed8ccc3737 | ||
|
|
18b1a92162 | ||
|
|
199aa72d35 | ||
|
|
8f7dbbc14a | ||
|
|
27dbc9ac42 | ||
|
|
e9aa401994 | ||
|
|
9e9572c79e | ||
|
|
c7285607a3 | ||
|
|
a6e2546980 | ||
|
|
dc510e0e43 | ||
|
|
ed12338f35 | ||
|
|
bf3f8b8855 | ||
|
|
67acd1aa1b | ||
|
|
75c924430e | ||
|
|
6087c53830 | ||
|
|
b50fe65a22 | ||
|
|
17009e689b | ||
|
|
5d2f755d3f | ||
|
|
8d7c0264bc | ||
|
|
000d230901 | ||
|
|
eb0334b07c | ||
|
|
4d07dc0d18 | ||
|
|
0ea52872ab | ||
|
|
6868d53fe9 | ||
|
|
68af15637b | ||
|
|
4da63d9f6f | ||
|
|
085d69b0bd | ||
|
|
776fe6c184 | ||
|
|
0e07d2c7d5 | ||
|
|
90ec885805 | ||
|
|
5a28154c4d | ||
|
|
2fcb51e703 | ||
|
|
26f524872f | ||
|
|
88af0317a2 | ||
|
|
c10c71e70d | ||
|
|
93555af5c9 | ||
|
|
06622e4110 | ||
|
|
155efa9e36 | ||
|
|
3175edc5d8 | ||
|
|
d95252c01f | ||
|
|
5bd2e2c31d | ||
|
|
84528e4fb2 | ||
|
|
e4381ed514 | ||
|
|
d9235b9e29 | ||
|
|
ce5f3b1ba5 | ||
|
|
7b5c04312e | ||
|
|
f5bafd70f4 | ||
|
|
d97c3a6ce6 | ||
|
|
341c35614a | ||
|
|
fecf28319c | ||
|
|
345d8cfb69 | ||
|
|
b60d005156 | ||
|
|
6c232a69df | ||
|
|
e97c1df30c | ||
|
|
decb5698b3 | ||
|
|
62962e30e4 | ||
|
|
05413d4e20 | ||
|
|
ca46dcf683 | ||
|
|
d351be1567 | ||
|
|
c7f2eaf4f4 | ||
|
|
53d25116df | ||
|
|
08e25ffa0c | ||
|
|
1c148e442b | ||
|
|
acaca1b4e9 | ||
|
|
4777836b83 | ||
|
|
7da659dd6d | ||
|
|
77dfe51aba | ||
|
|
ef7865e2f2 | ||
|
|
5cb15c0443 | ||
|
|
b43172ffbc | ||
|
|
b4796d1814 | ||
|
|
482d06774a | ||
|
|
046d731fbd | ||
|
|
892f6c98ec | ||
|
|
7fafa2d954 | ||
|
|
1d63046542 | ||
|
|
4c238a9a91 | ||
|
|
002db39a36 | ||
|
|
c4074e4ab6 | ||
|
|
7960e814e5 | ||
|
|
080025e533 | ||
|
|
9accd63a38 | ||
|
|
3dd704ee9a | ||
|
|
28e28a1974 | ||
|
|
b699178aa1 | ||
|
|
c08c649fa1 | ||
|
|
5c0c4b4079 | ||
|
|
b55cdfaa31 | ||
|
|
34406cf22c | ||
|
|
f91aefd245 | ||
|
|
f8281f42c8 | ||
|
|
7171bdf279 | ||
|
|
9f2d14ee26 | ||
|
|
ead471e72d | ||
|
|
9a4011de46 | ||
|
|
33551be61b | ||
|
|
eeb29d99fd | ||
|
|
1a0c407e6b | ||
|
|
c4b37cbf18 | ||
|
|
7fa156af80 | ||
|
|
78825f4f1c | ||
|
|
6e15b5debe | ||
|
|
2e0d2879d0 | ||
|
|
128043072b | ||
|
|
b2fda9d20e | ||
|
|
3c8c5eabc2 | ||
|
|
2da2041e2e | ||
|
|
b5eef203f4 | ||
|
|
df73da691f | ||
|
|
30d054e0bb | ||
|
|
ebb3cc4ab6 | ||
|
|
17201abd53 | ||
|
|
2f141f4c41 | ||
|
|
638c0bf49b | ||
|
|
d1065e6f51 | ||
|
|
567863127a | ||
|
|
f5abc10724 | ||
|
|
bb795b56da | ||
|
|
4dd0604f61 | ||
|
|
c05d278ba0 | ||
|
|
49a3163958 | ||
|
|
1a568041fa | ||
|
|
c9db8b0c32 | ||
|
|
aa1bf10b91 | ||
|
|
5222907bea | ||
|
|
e1eb147f2a | ||
|
|
e43eb47c5f | ||
|
|
27eb4c45cd | ||
|
|
b136d7ff8f | ||
|
|
9e56e1ab30 | ||
|
|
742f757337 | ||
|
|
2f5dfe299c | ||
|
|
e4eec87c6a | ||
|
|
f793ff4571 | ||
|
|
195aae2f16 | ||
|
|
7c79f2cb72 | ||
|
|
f04e35c170 | ||
|
|
36bbac05bd | ||
|
|
e2a4b7681e | ||
|
|
957944eee4 | ||
|
|
bf425e533e | ||
|
|
ca21957b8a | ||
|
|
6a95270671 | ||
|
|
82781f5838 | ||
|
|
aae6d3ff69 | ||
|
|
9175225adf | ||
|
|
7a32fa0101 | ||
|
|
d46450195b | ||
|
|
c0128c1021 | ||
|
|
3320b7c9a4 | ||
|
|
4c22c9b0b6 | ||
|
|
6d6ea1bb40 | ||
|
|
9e38981ae4 | ||
|
|
463e7c2709 | ||
|
|
ce9d0b1d0c | ||
|
|
80786d5caf | ||
|
|
e18378c3e2 | ||
|
|
0ca2857baa | ||
|
|
e21c312e16 | ||
|
|
1031bd25f8 | ||
|
|
fae708c0e8 | ||
|
|
8f8ea91eef | ||
|
|
7a1406d144 | ||
|
|
6373874833 | ||
|
|
a79823e64b | ||
|
|
1766a5fdc0 | ||
|
|
e6b1ea3eb2 | ||
|
|
e5537cf983 | ||
|
|
43bb12e640 | ||
|
|
66dcbf47a3 | ||
|
|
a285fe05fd |
1
changelog.d/6126.feature
Normal file
1
changelog.d/6126.feature
Normal file
@@ -0,0 +1 @@
|
||||
Group events into larger federation transactions at times of high traffic.
|
||||
1
changelog.d/6418.bugfix
Normal file
1
changelog.d/6418.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix phone home stats reporting.
|
||||
1
changelog.d/6866.feature
Normal file
1
changelog.d/6866.feature
Normal file
@@ -0,0 +1 @@
|
||||
Add ability to run some group APIs on workers.
|
||||
1
changelog.d/6873.feature
Normal file
1
changelog.d/6873.feature
Normal file
@@ -0,0 +1 @@
|
||||
Add ability to route federation user device queries to workers.
|
||||
@@ -176,9 +176,15 @@ endpoints matching the following regular expressions:
|
||||
^/_matrix/federation/v1/query_auth/
|
||||
^/_matrix/federation/v1/event_auth/
|
||||
^/_matrix/federation/v1/exchange_third_party_invite/
|
||||
^/_matrix/federation/v1/user/devices/
|
||||
^/_matrix/federation/v1/send/
|
||||
^/_matrix/federation/v1/get_groups_publicised$
|
||||
^/_matrix/key/v2/query
|
||||
|
||||
Additionally, the following REST endpoints can be handled for GET requests:
|
||||
|
||||
^/_matrix/federation/v1/groups/
|
||||
|
||||
The above endpoints should all be routed to the federation_reader worker by the
|
||||
reverse-proxy configuration.
|
||||
|
||||
@@ -254,10 +260,13 @@ following regular expressions:
|
||||
^/_matrix/client/(api/v1|r0|unstable)/keys/changes$
|
||||
^/_matrix/client/versions$
|
||||
^/_matrix/client/(api/v1|r0|unstable)/voip/turnServer$
|
||||
^/_matrix/client/(api/v1|r0|unstable)/joined_groups$
|
||||
^/_matrix/client/(api/v1|r0|unstable)/get_groups_publicised$
|
||||
|
||||
Additionally, the following REST endpoints can be handled for GET requests:
|
||||
|
||||
^/_matrix/client/(api/v1|r0|unstable)/pushrules/.*$
|
||||
^/_matrix/client/(api/v1|r0|unstable)/groups/.*$
|
||||
|
||||
Additionally, the following REST endpoints can be handled, but all requests must
|
||||
be routed to the same instance:
|
||||
|
||||
@@ -57,6 +57,7 @@ from synapse.rest.client.v1.room import (
|
||||
RoomStateRestServlet,
|
||||
)
|
||||
from synapse.rest.client.v1.voip import VoipRestServlet
|
||||
from synapse.rest.client.v2_alpha import groups
|
||||
from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
|
||||
from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
|
||||
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
|
||||
@@ -124,6 +125,8 @@ class ClientReaderServer(HomeServer):
|
||||
PushRuleRestServlet(self).register(resource)
|
||||
VersionsRestServlet(self).register(resource)
|
||||
|
||||
groups.register_servlets(self, resource)
|
||||
|
||||
resources.update({"/_matrix/client": resource})
|
||||
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
@@ -33,8 +33,10 @@ from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.devices import SlavedDeviceStore
|
||||
from synapse.replication.slave.storage.directory import DirectoryStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
|
||||
from synapse.replication.slave.storage.keys import SlavedKeyStore
|
||||
from synapse.replication.slave.storage.profile import SlavedProfileStore
|
||||
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
|
||||
@@ -66,6 +68,8 @@ class FederationReaderSlavedStore(
|
||||
SlavedEventStore,
|
||||
SlavedKeyStore,
|
||||
SlavedRegistrationStore,
|
||||
SlavedGroupServerStore,
|
||||
SlavedDeviceStore,
|
||||
RoomStore,
|
||||
DirectoryStore,
|
||||
SlavedTransactionStore,
|
||||
|
||||
@@ -82,6 +82,8 @@ class FederationServer(FederationBase):
|
||||
self.handler = hs.get_handlers().federation_handler
|
||||
self.state = hs.get_state_handler()
|
||||
|
||||
self.device_handler = hs.get_device_handler()
|
||||
|
||||
self._server_linearizer = Linearizer("fed_server")
|
||||
self._transaction_linearizer = Linearizer("fed_txn_handler")
|
||||
|
||||
@@ -528,8 +530,9 @@ class FederationServer(FederationBase):
|
||||
def on_query_client_keys(self, origin, content):
|
||||
return self.on_query_request("client_keys", content)
|
||||
|
||||
def on_query_user_devices(self, origin, user_id):
|
||||
return self.on_query_request("user_devices", user_id)
|
||||
async def on_query_user_devices(self, origin: str, user_id: str):
|
||||
keys = await self.device_handler.on_federation_query_user_devices(user_id)
|
||||
return 200, keys
|
||||
|
||||
@trace
|
||||
async def on_claim_client_keys(self, origin, content):
|
||||
|
||||
@@ -153,9 +153,24 @@ class FederationSender(object):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _process_event_queue_loop(self):
|
||||
loop_start_time = self.clock.time_msec()
|
||||
try:
|
||||
self._is_processing = True
|
||||
while True:
|
||||
# if we've been going around this loop for a long time without
|
||||
# catching up, deprioritise transaction transmission. This should mean
|
||||
# that events get batched into fewer transactions, which is more
|
||||
# efficient, and hence give us a chance to catch up
|
||||
if (
|
||||
self.clock.time_msec() - loop_start_time > 60 * 1000
|
||||
and not self._transaction_manager.deprioritise_transmission
|
||||
):
|
||||
logger.warning(
|
||||
"Event queue is getting behind: deprioritising transaction "
|
||||
"transmission"
|
||||
)
|
||||
self._transaction_manager.deprioritise_transmission = True
|
||||
|
||||
last_token = yield self.store.get_federation_out_pos("events")
|
||||
next_token, events = yield self.store.get_all_new_events_stream(
|
||||
last_token, self._last_poked_id, limit=100
|
||||
@@ -253,6 +268,9 @@ class FederationSender(object):
|
||||
|
||||
finally:
|
||||
self._is_processing = False
|
||||
if self._transaction_manager.deprioritise_transmission:
|
||||
logger.info("Event queue caught up: re-prioritising transmission")
|
||||
self._transaction_manager.deprioritise_transmission = False
|
||||
|
||||
def _send_pdu(self, pdu, destinations):
|
||||
# We loop through all destinations to see whether we already have
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
# limitations under the License.
|
||||
import datetime
|
||||
import logging
|
||||
import random
|
||||
|
||||
from prometheus_client import Counter
|
||||
|
||||
@@ -37,6 +38,8 @@ from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
|
||||
# This is defined in the Matrix spec and enforced by the receiver.
|
||||
MAX_EDUS_PER_TRANSACTION = 100
|
||||
|
||||
DEPRIORITISE_SLEEP_TIME = 10
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -190,6 +193,18 @@ class PerDestinationQueue(object):
|
||||
|
||||
pending_pdus = []
|
||||
while True:
|
||||
if self._transaction_manager.deprioritise_transmission:
|
||||
# if the event-processing loop has got behind, sleep to give it
|
||||
# a chance to catch up. Add some randomness so that the transmitters
|
||||
# don't all wake up in sync.
|
||||
sleeptime = random.uniform(
|
||||
DEPRIORITISE_SLEEP_TIME, DEPRIORITISE_SLEEP_TIME * 2
|
||||
)
|
||||
logger.info(
|
||||
"TX [%s]: sleeping for %f seconds", self._destination, sleeptime
|
||||
)
|
||||
yield self._clock.sleep(sleeptime)
|
||||
|
||||
# We have to keep 2 free slots for presence and rr_edus
|
||||
limit = MAX_EDUS_PER_TRANSACTION - 2
|
||||
|
||||
|
||||
@@ -49,6 +49,10 @@ class TransactionManager(object):
|
||||
# HACK to get unique tx id
|
||||
self._next_txn_id = int(self.clock.time_msec())
|
||||
|
||||
# the federation sender sometimes sets this to delay transaction transmission,
|
||||
# if the sender gets behind.
|
||||
self.deprioritise_transmission = False
|
||||
|
||||
@measure_func("_send_new_transaction")
|
||||
@defer.inlineCallbacks
|
||||
def send_new_transaction(self, destination, pending_pdus, pending_edus):
|
||||
|
||||
@@ -152,7 +152,7 @@ class TransportLayerClient(object):
|
||||
# generated by the json_data_callback.
|
||||
json_data = transaction.get_dict()
|
||||
|
||||
path = _create_v1_path("/send/%s", transaction.transaction_id)
|
||||
path = _create_v1_path("/send/%s/", transaction.transaction_id)
|
||||
|
||||
response = yield self.client.put_json(
|
||||
transaction.destination,
|
||||
@@ -161,7 +161,7 @@ class TransportLayerClient(object):
|
||||
json_data_callback=json_data_callback,
|
||||
long_retries=True,
|
||||
backoff_on_404=True, # If we get a 404 the other side has gone
|
||||
try_trailing_slash_on_400=True,
|
||||
# try_trailing_slash_on_400=True,
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
@@ -36,7 +36,7 @@ logger = logging.getLogger(__name__)
|
||||
# TODO: Flairs
|
||||
|
||||
|
||||
class GroupsServerHandler(object):
|
||||
class GroupsServerWorkerHandler(object):
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
self.store = hs.get_datastore()
|
||||
@@ -51,9 +51,6 @@ class GroupsServerHandler(object):
|
||||
self.transport_client = hs.get_federation_transport_client()
|
||||
self.profile_handler = hs.get_profile_handler()
|
||||
|
||||
# Ensure attestations get renewed
|
||||
hs.get_groups_attestation_renewer()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_group_is_ours(
|
||||
self, group_id, requester_user_id, and_exists=False, and_is_admin=None
|
||||
@@ -167,68 +164,6 @@ class GroupsServerHandler(object):
|
||||
"user": membership_info,
|
||||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_group_summary_room(
|
||||
self, group_id, requester_user_id, room_id, category_id, content
|
||||
):
|
||||
"""Add/update a room to the group summary
|
||||
"""
|
||||
yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
|
||||
)
|
||||
|
||||
RoomID.from_string(room_id) # Ensure valid room id
|
||||
|
||||
order = content.get("order", None)
|
||||
|
||||
is_public = _parse_visibility_from_contents(content)
|
||||
|
||||
yield self.store.add_room_to_summary(
|
||||
group_id=group_id,
|
||||
room_id=room_id,
|
||||
category_id=category_id,
|
||||
order=order,
|
||||
is_public=is_public,
|
||||
)
|
||||
|
||||
return {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_group_summary_room(
|
||||
self, group_id, requester_user_id, room_id, category_id
|
||||
):
|
||||
"""Remove a room from the summary
|
||||
"""
|
||||
yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
|
||||
)
|
||||
|
||||
yield self.store.remove_room_from_summary(
|
||||
group_id=group_id, room_id=room_id, category_id=category_id
|
||||
)
|
||||
|
||||
return {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def set_group_join_policy(self, group_id, requester_user_id, content):
|
||||
"""Sets the group join policy.
|
||||
|
||||
Currently supported policies are:
|
||||
- "invite": an invite must be received and accepted in order to join.
|
||||
- "open": anyone can join.
|
||||
"""
|
||||
yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
|
||||
)
|
||||
|
||||
join_policy = _parse_join_policy_from_contents(content)
|
||||
if join_policy is None:
|
||||
raise SynapseError(400, "No value specified for 'm.join_policy'")
|
||||
|
||||
yield self.store.set_group_join_policy(group_id, join_policy=join_policy)
|
||||
|
||||
return {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_group_categories(self, group_id, requester_user_id):
|
||||
"""Get all categories in a group (as seen by user)
|
||||
@@ -248,42 +183,10 @@ class GroupsServerHandler(object):
|
||||
group_id=group_id, category_id=category_id
|
||||
)
|
||||
|
||||
logger.info("group %s", res)
|
||||
|
||||
return res
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_group_category(self, group_id, requester_user_id, category_id, content):
|
||||
"""Add/Update a group category
|
||||
"""
|
||||
yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
|
||||
)
|
||||
|
||||
is_public = _parse_visibility_from_contents(content)
|
||||
profile = content.get("profile")
|
||||
|
||||
yield self.store.upsert_group_category(
|
||||
group_id=group_id,
|
||||
category_id=category_id,
|
||||
is_public=is_public,
|
||||
profile=profile,
|
||||
)
|
||||
|
||||
return {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_group_category(self, group_id, requester_user_id, category_id):
|
||||
"""Delete a group category
|
||||
"""
|
||||
yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
|
||||
)
|
||||
|
||||
yield self.store.remove_group_category(
|
||||
group_id=group_id, category_id=category_id
|
||||
)
|
||||
|
||||
return {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_group_roles(self, group_id, requester_user_id):
|
||||
"""Get all roles in a group (as seen by user)
|
||||
@@ -302,74 +205,6 @@ class GroupsServerHandler(object):
|
||||
res = yield self.store.get_group_role(group_id=group_id, role_id=role_id)
|
||||
return res
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_group_role(self, group_id, requester_user_id, role_id, content):
|
||||
"""Add/update a role in a group
|
||||
"""
|
||||
yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
|
||||
)
|
||||
|
||||
is_public = _parse_visibility_from_contents(content)
|
||||
|
||||
profile = content.get("profile")
|
||||
|
||||
yield self.store.upsert_group_role(
|
||||
group_id=group_id, role_id=role_id, is_public=is_public, profile=profile
|
||||
)
|
||||
|
||||
return {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_group_role(self, group_id, requester_user_id, role_id):
|
||||
"""Remove role from group
|
||||
"""
|
||||
yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
|
||||
)
|
||||
|
||||
yield self.store.remove_group_role(group_id=group_id, role_id=role_id)
|
||||
|
||||
return {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_group_summary_user(
|
||||
self, group_id, requester_user_id, user_id, role_id, content
|
||||
):
|
||||
"""Add/update a users entry in the group summary
|
||||
"""
|
||||
yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
|
||||
)
|
||||
|
||||
order = content.get("order", None)
|
||||
|
||||
is_public = _parse_visibility_from_contents(content)
|
||||
|
||||
yield self.store.add_user_to_summary(
|
||||
group_id=group_id,
|
||||
user_id=user_id,
|
||||
role_id=role_id,
|
||||
order=order,
|
||||
is_public=is_public,
|
||||
)
|
||||
|
||||
return {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_group_summary_user(self, group_id, requester_user_id, user_id, role_id):
|
||||
"""Remove a user from the group summary
|
||||
"""
|
||||
yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
|
||||
)
|
||||
|
||||
yield self.store.remove_user_from_summary(
|
||||
group_id=group_id, user_id=user_id, role_id=role_id
|
||||
)
|
||||
|
||||
return {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_group_profile(self, group_id, requester_user_id):
|
||||
"""Get the group profile as seen by requester_user_id
|
||||
@@ -394,24 +229,6 @@ class GroupsServerHandler(object):
|
||||
else:
|
||||
raise SynapseError(404, "Unknown group")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_group_profile(self, group_id, requester_user_id, content):
|
||||
"""Update the group profile
|
||||
"""
|
||||
yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
|
||||
)
|
||||
|
||||
profile = {}
|
||||
for keyname in ("name", "avatar_url", "short_description", "long_description"):
|
||||
if keyname in content:
|
||||
value = content[keyname]
|
||||
if not isinstance(value, string_types):
|
||||
raise SynapseError(400, "%r value is not a string" % (keyname,))
|
||||
profile[keyname] = value
|
||||
|
||||
yield self.store.update_group_profile(group_id, profile)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_users_in_group(self, group_id, requester_user_id):
|
||||
"""Get the users in group as seen by requester_user_id.
|
||||
@@ -530,6 +347,196 @@ class GroupsServerHandler(object):
|
||||
|
||||
return {"chunk": chunk, "total_room_count_estimate": len(room_results)}
|
||||
|
||||
|
||||
class GroupsServerHandler(GroupsServerWorkerHandler):
|
||||
def __init__(self, hs):
|
||||
super(GroupsServerHandler, self).__init__(hs)
|
||||
|
||||
# Ensure attestations get renewed
|
||||
hs.get_groups_attestation_renewer()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_group_summary_room(
|
||||
self, group_id, requester_user_id, room_id, category_id, content
|
||||
):
|
||||
"""Add/update a room to the group summary
|
||||
"""
|
||||
yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
|
||||
)
|
||||
|
||||
RoomID.from_string(room_id) # Ensure valid room id
|
||||
|
||||
order = content.get("order", None)
|
||||
|
||||
is_public = _parse_visibility_from_contents(content)
|
||||
|
||||
yield self.store.add_room_to_summary(
|
||||
group_id=group_id,
|
||||
room_id=room_id,
|
||||
category_id=category_id,
|
||||
order=order,
|
||||
is_public=is_public,
|
||||
)
|
||||
|
||||
return {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_group_summary_room(
|
||||
self, group_id, requester_user_id, room_id, category_id
|
||||
):
|
||||
"""Remove a room from the summary
|
||||
"""
|
||||
yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
|
||||
)
|
||||
|
||||
yield self.store.remove_room_from_summary(
|
||||
group_id=group_id, room_id=room_id, category_id=category_id
|
||||
)
|
||||
|
||||
return {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def set_group_join_policy(self, group_id, requester_user_id, content):
|
||||
"""Sets the group join policy.
|
||||
|
||||
Currently supported policies are:
|
||||
- "invite": an invite must be received and accepted in order to join.
|
||||
- "open": anyone can join.
|
||||
"""
|
||||
yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
|
||||
)
|
||||
|
||||
join_policy = _parse_join_policy_from_contents(content)
|
||||
if join_policy is None:
|
||||
raise SynapseError(400, "No value specified for 'm.join_policy'")
|
||||
|
||||
yield self.store.set_group_join_policy(group_id, join_policy=join_policy)
|
||||
|
||||
return {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_group_category(self, group_id, requester_user_id, category_id, content):
|
||||
"""Add/Update a group category
|
||||
"""
|
||||
yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
|
||||
)
|
||||
|
||||
is_public = _parse_visibility_from_contents(content)
|
||||
profile = content.get("profile")
|
||||
|
||||
yield self.store.upsert_group_category(
|
||||
group_id=group_id,
|
||||
category_id=category_id,
|
||||
is_public=is_public,
|
||||
profile=profile,
|
||||
)
|
||||
|
||||
return {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_group_category(self, group_id, requester_user_id, category_id):
|
||||
"""Delete a group category
|
||||
"""
|
||||
yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
|
||||
)
|
||||
|
||||
yield self.store.remove_group_category(
|
||||
group_id=group_id, category_id=category_id
|
||||
)
|
||||
|
||||
return {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_group_role(self, group_id, requester_user_id, role_id, content):
|
||||
"""Add/update a role in a group
|
||||
"""
|
||||
yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
|
||||
)
|
||||
|
||||
is_public = _parse_visibility_from_contents(content)
|
||||
|
||||
profile = content.get("profile")
|
||||
|
||||
yield self.store.upsert_group_role(
|
||||
group_id=group_id, role_id=role_id, is_public=is_public, profile=profile
|
||||
)
|
||||
|
||||
return {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_group_role(self, group_id, requester_user_id, role_id):
|
||||
"""Remove role from group
|
||||
"""
|
||||
yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
|
||||
)
|
||||
|
||||
yield self.store.remove_group_role(group_id=group_id, role_id=role_id)
|
||||
|
||||
return {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_group_summary_user(
|
||||
self, group_id, requester_user_id, user_id, role_id, content
|
||||
):
|
||||
"""Add/update a users entry in the group summary
|
||||
"""
|
||||
yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
|
||||
)
|
||||
|
||||
order = content.get("order", None)
|
||||
|
||||
is_public = _parse_visibility_from_contents(content)
|
||||
|
||||
yield self.store.add_user_to_summary(
|
||||
group_id=group_id,
|
||||
user_id=user_id,
|
||||
role_id=role_id,
|
||||
order=order,
|
||||
is_public=is_public,
|
||||
)
|
||||
|
||||
return {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_group_summary_user(self, group_id, requester_user_id, user_id, role_id):
|
||||
"""Remove a user from the group summary
|
||||
"""
|
||||
yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
|
||||
)
|
||||
|
||||
yield self.store.remove_user_from_summary(
|
||||
group_id=group_id, user_id=user_id, role_id=role_id
|
||||
)
|
||||
|
||||
return {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_group_profile(self, group_id, requester_user_id, content):
|
||||
"""Update the group profile
|
||||
"""
|
||||
yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
|
||||
)
|
||||
|
||||
profile = {}
|
||||
for keyname in ("name", "avatar_url", "short_description", "long_description"):
|
||||
if keyname in content:
|
||||
value = content[keyname]
|
||||
if not isinstance(value, string_types):
|
||||
raise SynapseError(400, "%r value is not a string" % (keyname,))
|
||||
profile[keyname] = value
|
||||
|
||||
yield self.store.update_group_profile(group_id, profile)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def add_room_to_group(self, group_id, requester_user_id, room_id, content):
|
||||
"""Add room to group
|
||||
|
||||
@@ -225,6 +225,22 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
|
||||
return result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_federation_query_user_devices(self, user_id):
|
||||
stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
|
||||
master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master")
|
||||
self_signing_key = yield self.store.get_e2e_cross_signing_key(
|
||||
user_id, "self_signing"
|
||||
)
|
||||
|
||||
return {
|
||||
"user_id": user_id,
|
||||
"stream_id": stream_id,
|
||||
"devices": devices,
|
||||
"master_key": master_key,
|
||||
"self_signing_key": self_signing_key,
|
||||
}
|
||||
|
||||
|
||||
class DeviceHandler(DeviceWorkerHandler):
|
||||
def __init__(self, hs):
|
||||
@@ -239,9 +255,6 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
federation_registry.register_edu_handler(
|
||||
"m.device_list_update", self.device_list_updater.incoming_device_list_update
|
||||
)
|
||||
federation_registry.register_query_handler(
|
||||
"user_devices", self.on_federation_query_user_devices
|
||||
)
|
||||
|
||||
hs.get_distributor().observe("user_left_room", self.user_left_room)
|
||||
|
||||
@@ -456,22 +469,6 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
|
||||
self.notifier.on_new_event("device_list_key", position, users=[from_user_id])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_federation_query_user_devices(self, user_id):
|
||||
stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
|
||||
master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master")
|
||||
self_signing_key = yield self.store.get_e2e_cross_signing_key(
|
||||
user_id, "self_signing"
|
||||
)
|
||||
|
||||
return {
|
||||
"user_id": user_id,
|
||||
"stream_id": stream_id,
|
||||
"devices": devices,
|
||||
"master_key": master_key,
|
||||
"self_signing_key": self_signing_key,
|
||||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def user_left_room(self, user, room_id):
|
||||
user_id = user.to_string()
|
||||
|
||||
@@ -63,7 +63,7 @@ def _create_rerouter(func_name):
|
||||
return f
|
||||
|
||||
|
||||
class GroupsLocalHandler(object):
|
||||
class GroupsLocalWorkerHandler(object):
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
self.store = hs.get_datastore()
|
||||
@@ -81,40 +81,17 @@ class GroupsLocalHandler(object):
|
||||
|
||||
self.profile_handler = hs.get_profile_handler()
|
||||
|
||||
# Ensure attestations get renewed
|
||||
hs.get_groups_attestation_renewer()
|
||||
|
||||
# The following functions merely route the query to the local groups server
|
||||
# or federation depending on if the group is local or remote
|
||||
|
||||
get_group_profile = _create_rerouter("get_group_profile")
|
||||
update_group_profile = _create_rerouter("update_group_profile")
|
||||
get_rooms_in_group = _create_rerouter("get_rooms_in_group")
|
||||
|
||||
get_invited_users_in_group = _create_rerouter("get_invited_users_in_group")
|
||||
|
||||
add_room_to_group = _create_rerouter("add_room_to_group")
|
||||
update_room_in_group = _create_rerouter("update_room_in_group")
|
||||
remove_room_from_group = _create_rerouter("remove_room_from_group")
|
||||
|
||||
update_group_summary_room = _create_rerouter("update_group_summary_room")
|
||||
delete_group_summary_room = _create_rerouter("delete_group_summary_room")
|
||||
|
||||
update_group_category = _create_rerouter("update_group_category")
|
||||
delete_group_category = _create_rerouter("delete_group_category")
|
||||
get_group_category = _create_rerouter("get_group_category")
|
||||
get_group_categories = _create_rerouter("get_group_categories")
|
||||
|
||||
update_group_summary_user = _create_rerouter("update_group_summary_user")
|
||||
delete_group_summary_user = _create_rerouter("delete_group_summary_user")
|
||||
|
||||
update_group_role = _create_rerouter("update_group_role")
|
||||
delete_group_role = _create_rerouter("delete_group_role")
|
||||
get_group_role = _create_rerouter("get_group_role")
|
||||
get_group_roles = _create_rerouter("get_group_roles")
|
||||
|
||||
set_group_join_policy = _create_rerouter("set_group_join_policy")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_group_summary(self, group_id, requester_user_id):
|
||||
"""Get the group summary for a group.
|
||||
@@ -169,6 +146,144 @@ class GroupsLocalHandler(object):
|
||||
|
||||
return res
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_users_in_group(self, group_id, requester_user_id):
|
||||
"""Get users in a group
|
||||
"""
|
||||
if self.is_mine_id(group_id):
|
||||
res = yield self.groups_server_handler.get_users_in_group(
|
||||
group_id, requester_user_id
|
||||
)
|
||||
return res
|
||||
|
||||
group_server_name = get_domain_from_id(group_id)
|
||||
|
||||
try:
|
||||
res = yield self.transport_client.get_users_in_group(
|
||||
get_domain_from_id(group_id), group_id, requester_user_id
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
raise e.to_synapse_error()
|
||||
except RequestSendFailed:
|
||||
raise SynapseError(502, "Failed to contact group server")
|
||||
|
||||
chunk = res["chunk"]
|
||||
valid_entries = []
|
||||
for entry in chunk:
|
||||
g_user_id = entry["user_id"]
|
||||
attestation = entry.pop("attestation", {})
|
||||
try:
|
||||
if get_domain_from_id(g_user_id) != group_server_name:
|
||||
yield self.attestations.verify_attestation(
|
||||
attestation,
|
||||
group_id=group_id,
|
||||
user_id=g_user_id,
|
||||
server_name=get_domain_from_id(g_user_id),
|
||||
)
|
||||
valid_entries.append(entry)
|
||||
except Exception as e:
|
||||
logger.info("Failed to verify user is in group: %s", e)
|
||||
|
||||
res["chunk"] = valid_entries
|
||||
|
||||
return res
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_joined_groups(self, user_id):
|
||||
group_ids = yield self.store.get_joined_groups(user_id)
|
||||
return {"groups": group_ids}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_publicised_groups_for_user(self, user_id):
|
||||
if self.hs.is_mine_id(user_id):
|
||||
result = yield self.store.get_publicised_groups_for_user(user_id)
|
||||
|
||||
# Check AS associated groups for this user - this depends on the
|
||||
# RegExps in the AS registration file (under `users`)
|
||||
for app_service in self.store.get_app_services():
|
||||
result.extend(app_service.get_groups_for_user(user_id))
|
||||
|
||||
return {"groups": result}
|
||||
else:
|
||||
try:
|
||||
bulk_result = yield self.transport_client.bulk_get_publicised_groups(
|
||||
get_domain_from_id(user_id), [user_id]
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
raise e.to_synapse_error()
|
||||
except RequestSendFailed:
|
||||
raise SynapseError(502, "Failed to contact group server")
|
||||
|
||||
result = bulk_result.get("users", {}).get(user_id)
|
||||
# TODO: Verify attestations
|
||||
return {"groups": result}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def bulk_get_publicised_groups(self, user_ids, proxy=True):
|
||||
destinations = {}
|
||||
local_users = set()
|
||||
|
||||
for user_id in user_ids:
|
||||
if self.hs.is_mine_id(user_id):
|
||||
local_users.add(user_id)
|
||||
else:
|
||||
destinations.setdefault(get_domain_from_id(user_id), set()).add(user_id)
|
||||
|
||||
if not proxy and destinations:
|
||||
raise SynapseError(400, "Some user_ids are not local")
|
||||
|
||||
results = {}
|
||||
failed_results = []
|
||||
for destination, dest_user_ids in iteritems(destinations):
|
||||
try:
|
||||
r = yield self.transport_client.bulk_get_publicised_groups(
|
||||
destination, list(dest_user_ids)
|
||||
)
|
||||
results.update(r["users"])
|
||||
except Exception:
|
||||
failed_results.extend(dest_user_ids)
|
||||
|
||||
for uid in local_users:
|
||||
results[uid] = yield self.store.get_publicised_groups_for_user(uid)
|
||||
|
||||
# Check AS associated groups for this user - this depends on the
|
||||
# RegExps in the AS registration file (under `users`)
|
||||
for app_service in self.store.get_app_services():
|
||||
results[uid].extend(app_service.get_groups_for_user(uid))
|
||||
|
||||
return {"users": results}
|
||||
|
||||
|
||||
class GroupsLocalHandler(GroupsLocalWorkerHandler):
|
||||
def __init__(self, hs):
|
||||
super(GroupsLocalHandler, self).__init__(hs)
|
||||
|
||||
# Ensure attestations get renewed
|
||||
hs.get_groups_attestation_renewer()
|
||||
|
||||
# The following functions merely route the query to the local groups server
|
||||
# or federation depending on if the group is local or remote
|
||||
|
||||
update_group_profile = _create_rerouter("update_group_profile")
|
||||
|
||||
add_room_to_group = _create_rerouter("add_room_to_group")
|
||||
update_room_in_group = _create_rerouter("update_room_in_group")
|
||||
remove_room_from_group = _create_rerouter("remove_room_from_group")
|
||||
|
||||
update_group_summary_room = _create_rerouter("update_group_summary_room")
|
||||
delete_group_summary_room = _create_rerouter("delete_group_summary_room")
|
||||
|
||||
update_group_category = _create_rerouter("update_group_category")
|
||||
delete_group_category = _create_rerouter("delete_group_category")
|
||||
|
||||
update_group_summary_user = _create_rerouter("update_group_summary_user")
|
||||
delete_group_summary_user = _create_rerouter("delete_group_summary_user")
|
||||
|
||||
update_group_role = _create_rerouter("update_group_role")
|
||||
delete_group_role = _create_rerouter("delete_group_role")
|
||||
|
||||
set_group_join_policy = _create_rerouter("set_group_join_policy")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def create_group(self, group_id, user_id, content):
|
||||
"""Create a group
|
||||
@@ -219,48 +334,6 @@ class GroupsLocalHandler(object):
|
||||
|
||||
return res
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_users_in_group(self, group_id, requester_user_id):
|
||||
"""Get users in a group
|
||||
"""
|
||||
if self.is_mine_id(group_id):
|
||||
res = yield self.groups_server_handler.get_users_in_group(
|
||||
group_id, requester_user_id
|
||||
)
|
||||
return res
|
||||
|
||||
group_server_name = get_domain_from_id(group_id)
|
||||
|
||||
try:
|
||||
res = yield self.transport_client.get_users_in_group(
|
||||
get_domain_from_id(group_id), group_id, requester_user_id
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
raise e.to_synapse_error()
|
||||
except RequestSendFailed:
|
||||
raise SynapseError(502, "Failed to contact group server")
|
||||
|
||||
chunk = res["chunk"]
|
||||
valid_entries = []
|
||||
for entry in chunk:
|
||||
g_user_id = entry["user_id"]
|
||||
attestation = entry.pop("attestation", {})
|
||||
try:
|
||||
if get_domain_from_id(g_user_id) != group_server_name:
|
||||
yield self.attestations.verify_attestation(
|
||||
attestation,
|
||||
group_id=group_id,
|
||||
user_id=g_user_id,
|
||||
server_name=get_domain_from_id(g_user_id),
|
||||
)
|
||||
valid_entries.append(entry)
|
||||
except Exception as e:
|
||||
logger.info("Failed to verify user is in group: %s", e)
|
||||
|
||||
res["chunk"] = valid_entries
|
||||
|
||||
return res
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def join_group(self, group_id, user_id, content):
|
||||
"""Request to join a group
|
||||
@@ -452,68 +525,3 @@ class GroupsLocalHandler(object):
|
||||
group_id, user_id, membership="leave"
|
||||
)
|
||||
self.notifier.on_new_event("groups_key", token, users=[user_id])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_joined_groups(self, user_id):
|
||||
group_ids = yield self.store.get_joined_groups(user_id)
|
||||
return {"groups": group_ids}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_publicised_groups_for_user(self, user_id):
|
||||
if self.hs.is_mine_id(user_id):
|
||||
result = yield self.store.get_publicised_groups_for_user(user_id)
|
||||
|
||||
# Check AS associated groups for this user - this depends on the
|
||||
# RegExps in the AS registration file (under `users`)
|
||||
for app_service in self.store.get_app_services():
|
||||
result.extend(app_service.get_groups_for_user(user_id))
|
||||
|
||||
return {"groups": result}
|
||||
else:
|
||||
try:
|
||||
bulk_result = yield self.transport_client.bulk_get_publicised_groups(
|
||||
get_domain_from_id(user_id), [user_id]
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
raise e.to_synapse_error()
|
||||
except RequestSendFailed:
|
||||
raise SynapseError(502, "Failed to contact group server")
|
||||
|
||||
result = bulk_result.get("users", {}).get(user_id)
|
||||
# TODO: Verify attestations
|
||||
return {"groups": result}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def bulk_get_publicised_groups(self, user_ids, proxy=True):
|
||||
destinations = {}
|
||||
local_users = set()
|
||||
|
||||
for user_id in user_ids:
|
||||
if self.hs.is_mine_id(user_id):
|
||||
local_users.add(user_id)
|
||||
else:
|
||||
destinations.setdefault(get_domain_from_id(user_id), set()).add(user_id)
|
||||
|
||||
if not proxy and destinations:
|
||||
raise SynapseError(400, "Some user_ids are not local")
|
||||
|
||||
results = {}
|
||||
failed_results = []
|
||||
for destination, dest_user_ids in iteritems(destinations):
|
||||
try:
|
||||
r = yield self.transport_client.bulk_get_publicised_groups(
|
||||
destination, list(dest_user_ids)
|
||||
)
|
||||
results.update(r["users"])
|
||||
except Exception:
|
||||
failed_results.extend(dest_user_ids)
|
||||
|
||||
for uid in local_users:
|
||||
results[uid] = yield self.store.get_publicised_groups_for_user(uid)
|
||||
|
||||
# Check AS associated groups for this user - this depends on the
|
||||
# RegExps in the AS registration file (under `users`)
|
||||
for app_service in self.store.get_app_services():
|
||||
results[uid].extend(app_service.get_groups_for_user(uid))
|
||||
|
||||
return {"users": results}
|
||||
|
||||
@@ -229,7 +229,7 @@ class MessageHandler(object):
|
||||
# If this is an AS, double check that they are allowed to see the members.
|
||||
# This can either be because the AS user is in the room or because there
|
||||
# is a user in the room that the AS is "interested in"
|
||||
if requester.app_service and user_id not in users_with_profile:
|
||||
if False and requester.app_service and user_id not in users_with_profile:
|
||||
for uid in users_with_profile:
|
||||
if requester.app_service.is_interested_in_user(uid):
|
||||
break
|
||||
|
||||
@@ -43,7 +43,8 @@ class RoomListHandler(BaseHandler):
|
||||
def __init__(self, hs):
|
||||
super(RoomListHandler, self).__init__(hs)
|
||||
self.enable_room_list_search = hs.config.enable_room_list_search
|
||||
self.response_cache = ResponseCache(hs, "room_list")
|
||||
|
||||
self.response_cache = ResponseCache(hs, "room_list", timeout_ms=10 * 60 * 1000)
|
||||
self.remote_response_cache = ResponseCache(
|
||||
hs, "remote_room_list", timeout_ms=30 * 1000
|
||||
)
|
||||
|
||||
@@ -62,6 +62,7 @@ class RoomMemberHandler(object):
|
||||
self.event_creation_handler = hs.get_event_creation_handler()
|
||||
|
||||
self.member_linearizer = Linearizer(name="member")
|
||||
self.member_limiter = Linearizer(max_count=10, name="member_as_limiter")
|
||||
|
||||
self.clock = hs.get_clock()
|
||||
self.spam_checker = hs.get_spam_checker()
|
||||
@@ -269,19 +270,38 @@ class RoomMemberHandler(object):
|
||||
):
|
||||
key = (room_id,)
|
||||
|
||||
with (yield self.member_linearizer.queue(key)):
|
||||
result = yield self._update_membership(
|
||||
requester,
|
||||
target,
|
||||
room_id,
|
||||
action,
|
||||
txn_id=txn_id,
|
||||
remote_room_hosts=remote_room_hosts,
|
||||
third_party_signed=third_party_signed,
|
||||
ratelimit=ratelimit,
|
||||
content=content,
|
||||
require_consent=require_consent,
|
||||
)
|
||||
as_id = object()
|
||||
if requester.app_service:
|
||||
as_id = requester.app_service.id
|
||||
|
||||
then = self.clock.time_msec()
|
||||
|
||||
with (yield self.member_limiter.queue(as_id)):
|
||||
diff = self.clock.time_msec() - then
|
||||
|
||||
if diff > 80 * 1000:
|
||||
# haproxy would have timed the request out anyway...
|
||||
raise SynapseError(504, "took to long to process")
|
||||
|
||||
with (yield self.member_linearizer.queue(key)):
|
||||
diff = self.clock.time_msec() - then
|
||||
|
||||
if diff > 80 * 1000:
|
||||
# haproxy would have timed the request out anyway...
|
||||
raise SynapseError(504, "took to long to process")
|
||||
|
||||
result = yield self._update_membership(
|
||||
requester,
|
||||
target,
|
||||
room_id,
|
||||
action,
|
||||
txn_id=txn_id,
|
||||
remote_room_hosts=remote_room_hosts,
|
||||
third_party_signed=third_party_signed,
|
||||
ratelimit=ratelimit,
|
||||
content=content,
|
||||
require_consent=require_consent,
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@@ -40,6 +40,7 @@ logger = logging.getLogger(__name__)
|
||||
# Debug logger for https://github.com/matrix-org/synapse/issues/4422
|
||||
issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug")
|
||||
|
||||
SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000
|
||||
|
||||
# Counts the number of times we returned a non-empty sync. `type` is one of
|
||||
# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
|
||||
@@ -225,7 +226,9 @@ class SyncHandler(object):
|
||||
self.presence_handler = hs.get_presence_handler()
|
||||
self.event_sources = hs.get_event_sources()
|
||||
self.clock = hs.get_clock()
|
||||
self.response_cache = ResponseCache(hs, "sync")
|
||||
self.response_cache = ResponseCache(
|
||||
hs, "sync", timeout_ms=SYNC_RESPONSE_CACHE_MS
|
||||
)
|
||||
self.state = hs.get_state_handler()
|
||||
self.auth = hs.get_auth()
|
||||
self.storage = hs.get_storage()
|
||||
|
||||
@@ -103,6 +103,10 @@ class WellKnownResolver(object):
|
||||
Returns:
|
||||
Deferred[WellKnownLookupResult]: The result of the lookup
|
||||
"""
|
||||
|
||||
if server_name == b"kde.org":
|
||||
return WellKnownLookupResult(delegated_server=b"kde.modular.im:443")
|
||||
|
||||
try:
|
||||
prev_result, expiry, ttl = self._well_known_cache.get_with_expiry(
|
||||
server_name
|
||||
|
||||
@@ -103,6 +103,10 @@ class HttpPusher(object):
|
||||
if "url" not in self.data:
|
||||
raise PusherConfigException("'url' required in data for HTTP pusher")
|
||||
self.url = self.data["url"]
|
||||
self.url = self.url.replace(
|
||||
"https://matrix.org/_matrix/push/v1/notify",
|
||||
"http://10.103.0.7/_matrix/push/v1/notify",
|
||||
)
|
||||
self.http_client = hs.get_proxied_http_client()
|
||||
self.data_minus_url = {}
|
||||
self.data_minus_url.update(self.data)
|
||||
|
||||
@@ -13,15 +13,14 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.storage import DataStore
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
|
||||
from synapse.storage.data_stores.main.group_server import GroupServerWorkerStore
|
||||
from synapse.storage.database import Database
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
||||
from ._base import BaseSlavedStore, __func__
|
||||
from ._slaved_id_tracker import SlavedIdTracker
|
||||
|
||||
|
||||
class SlavedGroupServerStore(BaseSlavedStore):
|
||||
class SlavedGroupServerStore(GroupServerWorkerStore, BaseSlavedStore):
|
||||
def __init__(self, database: Database, db_conn, hs):
|
||||
super(SlavedGroupServerStore, self).__init__(database, db_conn, hs)
|
||||
|
||||
@@ -35,9 +34,8 @@ class SlavedGroupServerStore(BaseSlavedStore):
|
||||
self._group_updates_id_gen.get_current_token(),
|
||||
)
|
||||
|
||||
get_groups_changes_for_user = __func__(DataStore.get_groups_changes_for_user)
|
||||
get_group_stream_token = __func__(DataStore.get_group_stream_token)
|
||||
get_all_groups_for_user = __func__(DataStore.get_all_groups_for_user)
|
||||
def get_group_stream_token(self):
|
||||
return self._group_updates_id_gen.get_current_token()
|
||||
|
||||
def stream_positions(self):
|
||||
result = super(SlavedGroupServerStore, self).stream_positions()
|
||||
|
||||
@@ -24,7 +24,7 @@ import attr
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
MAX_EVENTS_BEHIND = 10000
|
||||
MAX_EVENTS_BEHIND = 500000
|
||||
|
||||
BackfillStreamRow = namedtuple(
|
||||
"BackfillStreamRow",
|
||||
|
||||
@@ -50,7 +50,7 @@ from synapse.federation.send_queue import FederationRemoteSendQueue
|
||||
from synapse.federation.sender import FederationSender
|
||||
from synapse.federation.transport.client import TransportLayerClient
|
||||
from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer
|
||||
from synapse.groups.groups_server import GroupsServerHandler
|
||||
from synapse.groups.groups_server import GroupsServerHandler, GroupsServerWorkerHandler
|
||||
from synapse.handlers import Handlers
|
||||
from synapse.handlers.account_validity import AccountValidityHandler
|
||||
from synapse.handlers.acme import AcmeHandler
|
||||
@@ -62,7 +62,7 @@ from synapse.handlers.devicemessage import DeviceMessageHandler
|
||||
from synapse.handlers.e2e_keys import E2eKeysHandler
|
||||
from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler
|
||||
from synapse.handlers.events import EventHandler, EventStreamHandler
|
||||
from synapse.handlers.groups_local import GroupsLocalHandler
|
||||
from synapse.handlers.groups_local import GroupsLocalHandler, GroupsLocalWorkerHandler
|
||||
from synapse.handlers.initial_sync import InitialSyncHandler
|
||||
from synapse.handlers.message import EventCreationHandler, MessageHandler
|
||||
from synapse.handlers.pagination import PaginationHandler
|
||||
@@ -460,10 +460,16 @@ class HomeServer(object):
|
||||
return UserDirectoryHandler(self)
|
||||
|
||||
def build_groups_local_handler(self):
|
||||
return GroupsLocalHandler(self)
|
||||
if self.config.worker_app:
|
||||
return GroupsLocalWorkerHandler(self)
|
||||
else:
|
||||
return GroupsLocalHandler(self)
|
||||
|
||||
def build_groups_server_handler(self):
|
||||
return GroupsServerHandler(self)
|
||||
if self.config.worker_app:
|
||||
return GroupsServerWorkerHandler(self)
|
||||
else:
|
||||
return GroupsServerHandler(self)
|
||||
|
||||
def build_groups_attestation_signing(self):
|
||||
return GroupAttestationSigning(self)
|
||||
|
||||
@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
|
||||
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
|
||||
# times give more inserts into the database even for readonly API hits
|
||||
# 120 seconds == 2 minutes
|
||||
LAST_SEEN_GRANULARITY = 120 * 1000
|
||||
LAST_SEEN_GRANULARITY = 10 * 60 * 1000
|
||||
|
||||
|
||||
class ClientIpBackgroundUpdateStore(SQLBaseStore):
|
||||
|
||||
@@ -27,21 +27,7 @@ _DEFAULT_CATEGORY_ID = ""
|
||||
_DEFAULT_ROLE_ID = ""
|
||||
|
||||
|
||||
class GroupServerStore(SQLBaseStore):
|
||||
def set_group_join_policy(self, group_id, join_policy):
|
||||
"""Set the join policy of a group.
|
||||
|
||||
join_policy can be one of:
|
||||
* "invite"
|
||||
* "open"
|
||||
"""
|
||||
return self.db.simple_update_one(
|
||||
table="groups",
|
||||
keyvalues={"group_id": group_id},
|
||||
updatevalues={"join_policy": join_policy},
|
||||
desc="set_group_join_policy",
|
||||
)
|
||||
|
||||
class GroupServerWorkerStore(SQLBaseStore):
|
||||
def get_group(self, group_id):
|
||||
return self.db.simple_select_one(
|
||||
table="groups",
|
||||
@@ -157,6 +143,366 @@ class GroupServerStore(SQLBaseStore):
|
||||
"get_rooms_for_summary", _get_rooms_for_summary_txn
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_group_categories(self, group_id):
|
||||
rows = yield self.db.simple_select_list(
|
||||
table="group_room_categories",
|
||||
keyvalues={"group_id": group_id},
|
||||
retcols=("category_id", "is_public", "profile"),
|
||||
desc="get_group_categories",
|
||||
)
|
||||
|
||||
return {
|
||||
row["category_id"]: {
|
||||
"is_public": row["is_public"],
|
||||
"profile": json.loads(row["profile"]),
|
||||
}
|
||||
for row in rows
|
||||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_group_category(self, group_id, category_id):
|
||||
category = yield self.db.simple_select_one(
|
||||
table="group_room_categories",
|
||||
keyvalues={"group_id": group_id, "category_id": category_id},
|
||||
retcols=("is_public", "profile"),
|
||||
desc="get_group_category",
|
||||
)
|
||||
|
||||
category["profile"] = json.loads(category["profile"])
|
||||
|
||||
return category
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_group_roles(self, group_id):
|
||||
rows = yield self.db.simple_select_list(
|
||||
table="group_roles",
|
||||
keyvalues={"group_id": group_id},
|
||||
retcols=("role_id", "is_public", "profile"),
|
||||
desc="get_group_roles",
|
||||
)
|
||||
|
||||
return {
|
||||
row["role_id"]: {
|
||||
"is_public": row["is_public"],
|
||||
"profile": json.loads(row["profile"]),
|
||||
}
|
||||
for row in rows
|
||||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_group_role(self, group_id, role_id):
|
||||
role = yield self.db.simple_select_one(
|
||||
table="group_roles",
|
||||
keyvalues={"group_id": group_id, "role_id": role_id},
|
||||
retcols=("is_public", "profile"),
|
||||
desc="get_group_role",
|
||||
)
|
||||
|
||||
role["profile"] = json.loads(role["profile"])
|
||||
|
||||
return role
|
||||
|
||||
def get_local_groups_for_room(self, room_id):
|
||||
"""Get all of the local group that contain a given room
|
||||
Args:
|
||||
room_id (str): The ID of a room
|
||||
Returns:
|
||||
Deferred[list[str]]: A twisted.Deferred containing a list of group ids
|
||||
containing this room
|
||||
"""
|
||||
return self.db.simple_select_onecol(
|
||||
table="group_rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
retcol="group_id",
|
||||
desc="get_local_groups_for_room",
|
||||
)
|
||||
|
||||
def get_users_for_summary_by_role(self, group_id, include_private=False):
|
||||
"""Get the users and roles that should be included in a summary request
|
||||
|
||||
Returns ([users], [roles])
|
||||
"""
|
||||
|
||||
def _get_users_for_summary_txn(txn):
|
||||
keyvalues = {"group_id": group_id}
|
||||
if not include_private:
|
||||
keyvalues["is_public"] = True
|
||||
|
||||
sql = """
|
||||
SELECT user_id, is_public, role_id, user_order
|
||||
FROM group_summary_users
|
||||
WHERE group_id = ?
|
||||
"""
|
||||
|
||||
if not include_private:
|
||||
sql += " AND is_public = ?"
|
||||
txn.execute(sql, (group_id, True))
|
||||
else:
|
||||
txn.execute(sql, (group_id,))
|
||||
|
||||
users = [
|
||||
{
|
||||
"user_id": row[0],
|
||||
"is_public": row[1],
|
||||
"role_id": row[2] if row[2] != _DEFAULT_ROLE_ID else None,
|
||||
"order": row[3],
|
||||
}
|
||||
for row in txn
|
||||
]
|
||||
|
||||
sql = """
|
||||
SELECT role_id, is_public, profile, role_order
|
||||
FROM group_summary_roles
|
||||
INNER JOIN group_roles USING (group_id, role_id)
|
||||
WHERE group_id = ?
|
||||
"""
|
||||
|
||||
if not include_private:
|
||||
sql += " AND is_public = ?"
|
||||
txn.execute(sql, (group_id, True))
|
||||
else:
|
||||
txn.execute(sql, (group_id,))
|
||||
|
||||
roles = {
|
||||
row[0]: {
|
||||
"is_public": row[1],
|
||||
"profile": json.loads(row[2]),
|
||||
"order": row[3],
|
||||
}
|
||||
for row in txn
|
||||
}
|
||||
|
||||
return users, roles
|
||||
|
||||
return self.db.runInteraction(
|
||||
"get_users_for_summary_by_role", _get_users_for_summary_txn
|
||||
)
|
||||
|
||||
def is_user_in_group(self, user_id, group_id):
|
||||
return self.db.simple_select_one_onecol(
|
||||
table="group_users",
|
||||
keyvalues={"group_id": group_id, "user_id": user_id},
|
||||
retcol="user_id",
|
||||
allow_none=True,
|
||||
desc="is_user_in_group",
|
||||
).addCallback(lambda r: bool(r))
|
||||
|
||||
def is_user_admin_in_group(self, group_id, user_id):
|
||||
return self.db.simple_select_one_onecol(
|
||||
table="group_users",
|
||||
keyvalues={"group_id": group_id, "user_id": user_id},
|
||||
retcol="is_admin",
|
||||
allow_none=True,
|
||||
desc="is_user_admin_in_group",
|
||||
)
|
||||
|
||||
def is_user_invited_to_local_group(self, group_id, user_id):
|
||||
"""Has the group server invited a user?
|
||||
"""
|
||||
return self.db.simple_select_one_onecol(
|
||||
table="group_invites",
|
||||
keyvalues={"group_id": group_id, "user_id": user_id},
|
||||
retcol="user_id",
|
||||
desc="is_user_invited_to_local_group",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
def get_users_membership_info_in_group(self, group_id, user_id):
|
||||
"""Get a dict describing the membership of a user in a group.
|
||||
|
||||
Example if joined:
|
||||
|
||||
{
|
||||
"membership": "join",
|
||||
"is_public": True,
|
||||
"is_privileged": False,
|
||||
}
|
||||
|
||||
Returns an empty dict if the user is not join/invite/etc
|
||||
"""
|
||||
|
||||
def _get_users_membership_in_group_txn(txn):
|
||||
row = self.db.simple_select_one_txn(
|
||||
txn,
|
||||
table="group_users",
|
||||
keyvalues={"group_id": group_id, "user_id": user_id},
|
||||
retcols=("is_admin", "is_public"),
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
if row:
|
||||
return {
|
||||
"membership": "join",
|
||||
"is_public": row["is_public"],
|
||||
"is_privileged": row["is_admin"],
|
||||
}
|
||||
|
||||
row = self.db.simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="group_invites",
|
||||
keyvalues={"group_id": group_id, "user_id": user_id},
|
||||
retcol="user_id",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
if row:
|
||||
return {"membership": "invite"}
|
||||
|
||||
return {}
|
||||
|
||||
return self.db.runInteraction(
|
||||
"get_users_membership_info_in_group", _get_users_membership_in_group_txn
|
||||
)
|
||||
|
||||
def get_publicised_groups_for_user(self, user_id):
|
||||
"""Get all groups a user is publicising
|
||||
"""
|
||||
return self.db.simple_select_onecol(
|
||||
table="local_group_membership",
|
||||
keyvalues={"user_id": user_id, "membership": "join", "is_publicised": True},
|
||||
retcol="group_id",
|
||||
desc="get_publicised_groups_for_user",
|
||||
)
|
||||
|
||||
def get_attestations_need_renewals(self, valid_until_ms):
|
||||
"""Get all attestations that need to be renewed until givent time
|
||||
"""
|
||||
|
||||
def _get_attestations_need_renewals_txn(txn):
|
||||
sql = """
|
||||
SELECT group_id, user_id FROM group_attestations_renewals
|
||||
WHERE valid_until_ms <= ?
|
||||
"""
|
||||
txn.execute(sql, (valid_until_ms,))
|
||||
return self.db.cursor_to_dict(txn)
|
||||
|
||||
return self.db.runInteraction(
|
||||
"get_attestations_need_renewals", _get_attestations_need_renewals_txn
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_remote_attestation(self, group_id, user_id):
|
||||
"""Get the attestation that proves the remote agrees that the user is
|
||||
in the group.
|
||||
"""
|
||||
row = yield self.db.simple_select_one(
|
||||
table="group_attestations_remote",
|
||||
keyvalues={"group_id": group_id, "user_id": user_id},
|
||||
retcols=("valid_until_ms", "attestation_json"),
|
||||
desc="get_remote_attestation",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
now = int(self._clock.time_msec())
|
||||
if row and now < row["valid_until_ms"]:
|
||||
return json.loads(row["attestation_json"])
|
||||
|
||||
return None
|
||||
|
||||
def get_joined_groups(self, user_id):
|
||||
return self.db.simple_select_onecol(
|
||||
table="local_group_membership",
|
||||
keyvalues={"user_id": user_id, "membership": "join"},
|
||||
retcol="group_id",
|
||||
desc="get_joined_groups",
|
||||
)
|
||||
|
||||
def get_all_groups_for_user(self, user_id, now_token):
|
||||
def _get_all_groups_for_user_txn(txn):
|
||||
sql = """
|
||||
SELECT group_id, type, membership, u.content
|
||||
FROM local_group_updates AS u
|
||||
INNER JOIN local_group_membership USING (group_id, user_id)
|
||||
WHERE user_id = ? AND membership != 'leave'
|
||||
AND stream_id <= ?
|
||||
"""
|
||||
txn.execute(sql, (user_id, now_token))
|
||||
return [
|
||||
{
|
||||
"group_id": row[0],
|
||||
"type": row[1],
|
||||
"membership": row[2],
|
||||
"content": json.loads(row[3]),
|
||||
}
|
||||
for row in txn
|
||||
]
|
||||
|
||||
return self.db.runInteraction(
|
||||
"get_all_groups_for_user", _get_all_groups_for_user_txn
|
||||
)
|
||||
|
||||
def get_groups_changes_for_user(self, user_id, from_token, to_token):
|
||||
from_token = int(from_token)
|
||||
has_changed = self._group_updates_stream_cache.has_entity_changed(
|
||||
user_id, from_token
|
||||
)
|
||||
if not has_changed:
|
||||
return defer.succeed([])
|
||||
|
||||
def _get_groups_changes_for_user_txn(txn):
|
||||
sql = """
|
||||
SELECT group_id, membership, type, u.content
|
||||
FROM local_group_updates AS u
|
||||
INNER JOIN local_group_membership USING (group_id, user_id)
|
||||
WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
|
||||
"""
|
||||
txn.execute(sql, (user_id, from_token, to_token))
|
||||
return [
|
||||
{
|
||||
"group_id": group_id,
|
||||
"membership": membership,
|
||||
"type": gtype,
|
||||
"content": json.loads(content_json),
|
||||
}
|
||||
for group_id, membership, gtype, content_json in txn
|
||||
]
|
||||
|
||||
return self.db.runInteraction(
|
||||
"get_groups_changes_for_user", _get_groups_changes_for_user_txn
|
||||
)
|
||||
|
||||
def get_all_groups_changes(self, from_token, to_token, limit):
|
||||
from_token = int(from_token)
|
||||
has_changed = self._group_updates_stream_cache.has_any_entity_changed(
|
||||
from_token
|
||||
)
|
||||
if not has_changed:
|
||||
return defer.succeed([])
|
||||
|
||||
def _get_all_groups_changes_txn(txn):
|
||||
sql = """
|
||||
SELECT stream_id, group_id, user_id, type, content
|
||||
FROM local_group_updates
|
||||
WHERE ? < stream_id AND stream_id <= ?
|
||||
LIMIT ?
|
||||
"""
|
||||
txn.execute(sql, (from_token, to_token, limit))
|
||||
return [
|
||||
(stream_id, group_id, user_id, gtype, json.loads(content_json))
|
||||
for stream_id, group_id, user_id, gtype, content_json in txn
|
||||
]
|
||||
|
||||
return self.db.runInteraction(
|
||||
"get_all_groups_changes", _get_all_groups_changes_txn
|
||||
)
|
||||
|
||||
|
||||
class GroupServerStore(GroupServerWorkerStore):
|
||||
def set_group_join_policy(self, group_id, join_policy):
|
||||
"""Set the join policy of a group.
|
||||
|
||||
join_policy can be one of:
|
||||
* "invite"
|
||||
* "open"
|
||||
"""
|
||||
return self.db.simple_update_one(
|
||||
table="groups",
|
||||
keyvalues={"group_id": group_id},
|
||||
updatevalues={"join_policy": join_policy},
|
||||
desc="set_group_join_policy",
|
||||
)
|
||||
|
||||
def add_room_to_summary(self, group_id, room_id, category_id, order, is_public):
|
||||
return self.db.runInteraction(
|
||||
"add_room_to_summary",
|
||||
@@ -299,36 +645,6 @@ class GroupServerStore(SQLBaseStore):
|
||||
desc="remove_room_from_summary",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_group_categories(self, group_id):
|
||||
rows = yield self.db.simple_select_list(
|
||||
table="group_room_categories",
|
||||
keyvalues={"group_id": group_id},
|
||||
retcols=("category_id", "is_public", "profile"),
|
||||
desc="get_group_categories",
|
||||
)
|
||||
|
||||
return {
|
||||
row["category_id"]: {
|
||||
"is_public": row["is_public"],
|
||||
"profile": json.loads(row["profile"]),
|
||||
}
|
||||
for row in rows
|
||||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_group_category(self, group_id, category_id):
|
||||
category = yield self.db.simple_select_one(
|
||||
table="group_room_categories",
|
||||
keyvalues={"group_id": group_id, "category_id": category_id},
|
||||
retcols=("is_public", "profile"),
|
||||
desc="get_group_category",
|
||||
)
|
||||
|
||||
category["profile"] = json.loads(category["profile"])
|
||||
|
||||
return category
|
||||
|
||||
def upsert_group_category(self, group_id, category_id, profile, is_public):
|
||||
"""Add/update room category for group
|
||||
"""
|
||||
@@ -360,36 +676,6 @@ class GroupServerStore(SQLBaseStore):
|
||||
desc="remove_group_category",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_group_roles(self, group_id):
|
||||
rows = yield self.db.simple_select_list(
|
||||
table="group_roles",
|
||||
keyvalues={"group_id": group_id},
|
||||
retcols=("role_id", "is_public", "profile"),
|
||||
desc="get_group_roles",
|
||||
)
|
||||
|
||||
return {
|
||||
row["role_id"]: {
|
||||
"is_public": row["is_public"],
|
||||
"profile": json.loads(row["profile"]),
|
||||
}
|
||||
for row in rows
|
||||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_group_role(self, group_id, role_id):
|
||||
role = yield self.db.simple_select_one(
|
||||
table="group_roles",
|
||||
keyvalues={"group_id": group_id, "role_id": role_id},
|
||||
retcols=("is_public", "profile"),
|
||||
desc="get_group_role",
|
||||
)
|
||||
|
||||
role["profile"] = json.loads(role["profile"])
|
||||
|
||||
return role
|
||||
|
||||
def upsert_group_role(self, group_id, role_id, profile, is_public):
|
||||
"""Add/remove user role
|
||||
"""
|
||||
@@ -555,100 +841,6 @@ class GroupServerStore(SQLBaseStore):
|
||||
desc="remove_user_from_summary",
|
||||
)
|
||||
|
||||
def get_local_groups_for_room(self, room_id):
|
||||
"""Get all of the local group that contain a given room
|
||||
Args:
|
||||
room_id (str): The ID of a room
|
||||
Returns:
|
||||
Deferred[list[str]]: A twisted.Deferred containing a list of group ids
|
||||
containing this room
|
||||
"""
|
||||
return self.db.simple_select_onecol(
|
||||
table="group_rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
retcol="group_id",
|
||||
desc="get_local_groups_for_room",
|
||||
)
|
||||
|
||||
def get_users_for_summary_by_role(self, group_id, include_private=False):
|
||||
"""Get the users and roles that should be included in a summary request
|
||||
|
||||
Returns ([users], [roles])
|
||||
"""
|
||||
|
||||
def _get_users_for_summary_txn(txn):
|
||||
keyvalues = {"group_id": group_id}
|
||||
if not include_private:
|
||||
keyvalues["is_public"] = True
|
||||
|
||||
sql = """
|
||||
SELECT user_id, is_public, role_id, user_order
|
||||
FROM group_summary_users
|
||||
WHERE group_id = ?
|
||||
"""
|
||||
|
||||
if not include_private:
|
||||
sql += " AND is_public = ?"
|
||||
txn.execute(sql, (group_id, True))
|
||||
else:
|
||||
txn.execute(sql, (group_id,))
|
||||
|
||||
users = [
|
||||
{
|
||||
"user_id": row[0],
|
||||
"is_public": row[1],
|
||||
"role_id": row[2] if row[2] != _DEFAULT_ROLE_ID else None,
|
||||
"order": row[3],
|
||||
}
|
||||
for row in txn
|
||||
]
|
||||
|
||||
sql = """
|
||||
SELECT role_id, is_public, profile, role_order
|
||||
FROM group_summary_roles
|
||||
INNER JOIN group_roles USING (group_id, role_id)
|
||||
WHERE group_id = ?
|
||||
"""
|
||||
|
||||
if not include_private:
|
||||
sql += " AND is_public = ?"
|
||||
txn.execute(sql, (group_id, True))
|
||||
else:
|
||||
txn.execute(sql, (group_id,))
|
||||
|
||||
roles = {
|
||||
row[0]: {
|
||||
"is_public": row[1],
|
||||
"profile": json.loads(row[2]),
|
||||
"order": row[3],
|
||||
}
|
||||
for row in txn
|
||||
}
|
||||
|
||||
return users, roles
|
||||
|
||||
return self.db.runInteraction(
|
||||
"get_users_for_summary_by_role", _get_users_for_summary_txn
|
||||
)
|
||||
|
||||
def is_user_in_group(self, user_id, group_id):
|
||||
return self.db.simple_select_one_onecol(
|
||||
table="group_users",
|
||||
keyvalues={"group_id": group_id, "user_id": user_id},
|
||||
retcol="user_id",
|
||||
allow_none=True,
|
||||
desc="is_user_in_group",
|
||||
).addCallback(lambda r: bool(r))
|
||||
|
||||
def is_user_admin_in_group(self, group_id, user_id):
|
||||
return self.db.simple_select_one_onecol(
|
||||
table="group_users",
|
||||
keyvalues={"group_id": group_id, "user_id": user_id},
|
||||
retcol="is_admin",
|
||||
allow_none=True,
|
||||
desc="is_user_admin_in_group",
|
||||
)
|
||||
|
||||
def add_group_invite(self, group_id, user_id):
|
||||
"""Record that the group server has invited a user
|
||||
"""
|
||||
@@ -658,64 +850,6 @@ class GroupServerStore(SQLBaseStore):
|
||||
desc="add_group_invite",
|
||||
)
|
||||
|
||||
def is_user_invited_to_local_group(self, group_id, user_id):
|
||||
"""Has the group server invited a user?
|
||||
"""
|
||||
return self.db.simple_select_one_onecol(
|
||||
table="group_invites",
|
||||
keyvalues={"group_id": group_id, "user_id": user_id},
|
||||
retcol="user_id",
|
||||
desc="is_user_invited_to_local_group",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
def get_users_membership_info_in_group(self, group_id, user_id):
|
||||
"""Get a dict describing the membership of a user in a group.
|
||||
|
||||
Example if joined:
|
||||
|
||||
{
|
||||
"membership": "join",
|
||||
"is_public": True,
|
||||
"is_privileged": False,
|
||||
}
|
||||
|
||||
Returns an empty dict if the user is not join/invite/etc
|
||||
"""
|
||||
|
||||
def _get_users_membership_in_group_txn(txn):
|
||||
row = self.db.simple_select_one_txn(
|
||||
txn,
|
||||
table="group_users",
|
||||
keyvalues={"group_id": group_id, "user_id": user_id},
|
||||
retcols=("is_admin", "is_public"),
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
if row:
|
||||
return {
|
||||
"membership": "join",
|
||||
"is_public": row["is_public"],
|
||||
"is_privileged": row["is_admin"],
|
||||
}
|
||||
|
||||
row = self.db.simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="group_invites",
|
||||
keyvalues={"group_id": group_id, "user_id": user_id},
|
||||
retcol="user_id",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
if row:
|
||||
return {"membership": "invite"}
|
||||
|
||||
return {}
|
||||
|
||||
return self.db.runInteraction(
|
||||
"get_users_membership_info_in_group", _get_users_membership_in_group_txn
|
||||
)
|
||||
|
||||
def add_user_to_group(
|
||||
self,
|
||||
group_id,
|
||||
@@ -846,16 +980,6 @@ class GroupServerStore(SQLBaseStore):
|
||||
"remove_room_from_group", _remove_room_from_group_txn
|
||||
)
|
||||
|
||||
def get_publicised_groups_for_user(self, user_id):
|
||||
"""Get all groups a user is publicising
|
||||
"""
|
||||
return self.db.simple_select_onecol(
|
||||
table="local_group_membership",
|
||||
keyvalues={"user_id": user_id, "membership": "join", "is_publicised": True},
|
||||
retcol="group_id",
|
||||
desc="get_publicised_groups_for_user",
|
||||
)
|
||||
|
||||
def update_group_publicity(self, group_id, user_id, publicise):
|
||||
"""Update whether the user is publicising their membership of the group
|
||||
"""
|
||||
@@ -1000,22 +1124,6 @@ class GroupServerStore(SQLBaseStore):
|
||||
desc="update_group_profile",
|
||||
)
|
||||
|
||||
def get_attestations_need_renewals(self, valid_until_ms):
|
||||
"""Get all attestations that need to be renewed until givent time
|
||||
"""
|
||||
|
||||
def _get_attestations_need_renewals_txn(txn):
|
||||
sql = """
|
||||
SELECT group_id, user_id FROM group_attestations_renewals
|
||||
WHERE valid_until_ms <= ?
|
||||
"""
|
||||
txn.execute(sql, (valid_until_ms,))
|
||||
return self.db.cursor_to_dict(txn)
|
||||
|
||||
return self.db.runInteraction(
|
||||
"get_attestations_need_renewals", _get_attestations_need_renewals_txn
|
||||
)
|
||||
|
||||
def update_attestation_renewal(self, group_id, user_id, attestation):
|
||||
"""Update an attestation that we have renewed
|
||||
"""
|
||||
@@ -1054,112 +1162,6 @@ class GroupServerStore(SQLBaseStore):
|
||||
desc="remove_attestation_renewal",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_remote_attestation(self, group_id, user_id):
|
||||
"""Get the attestation that proves the remote agrees that the user is
|
||||
in the group.
|
||||
"""
|
||||
row = yield self.db.simple_select_one(
|
||||
table="group_attestations_remote",
|
||||
keyvalues={"group_id": group_id, "user_id": user_id},
|
||||
retcols=("valid_until_ms", "attestation_json"),
|
||||
desc="get_remote_attestation",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
now = int(self._clock.time_msec())
|
||||
if row and now < row["valid_until_ms"]:
|
||||
return json.loads(row["attestation_json"])
|
||||
|
||||
return None
|
||||
|
||||
def get_joined_groups(self, user_id):
|
||||
return self.db.simple_select_onecol(
|
||||
table="local_group_membership",
|
||||
keyvalues={"user_id": user_id, "membership": "join"},
|
||||
retcol="group_id",
|
||||
desc="get_joined_groups",
|
||||
)
|
||||
|
||||
def get_all_groups_for_user(self, user_id, now_token):
|
||||
def _get_all_groups_for_user_txn(txn):
|
||||
sql = """
|
||||
SELECT group_id, type, membership, u.content
|
||||
FROM local_group_updates AS u
|
||||
INNER JOIN local_group_membership USING (group_id, user_id)
|
||||
WHERE user_id = ? AND membership != 'leave'
|
||||
AND stream_id <= ?
|
||||
"""
|
||||
txn.execute(sql, (user_id, now_token))
|
||||
return [
|
||||
{
|
||||
"group_id": row[0],
|
||||
"type": row[1],
|
||||
"membership": row[2],
|
||||
"content": json.loads(row[3]),
|
||||
}
|
||||
for row in txn
|
||||
]
|
||||
|
||||
return self.db.runInteraction(
|
||||
"get_all_groups_for_user", _get_all_groups_for_user_txn
|
||||
)
|
||||
|
||||
def get_groups_changes_for_user(self, user_id, from_token, to_token):
|
||||
from_token = int(from_token)
|
||||
has_changed = self._group_updates_stream_cache.has_entity_changed(
|
||||
user_id, from_token
|
||||
)
|
||||
if not has_changed:
|
||||
return defer.succeed([])
|
||||
|
||||
def _get_groups_changes_for_user_txn(txn):
|
||||
sql = """
|
||||
SELECT group_id, membership, type, u.content
|
||||
FROM local_group_updates AS u
|
||||
INNER JOIN local_group_membership USING (group_id, user_id)
|
||||
WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
|
||||
"""
|
||||
txn.execute(sql, (user_id, from_token, to_token))
|
||||
return [
|
||||
{
|
||||
"group_id": group_id,
|
||||
"membership": membership,
|
||||
"type": gtype,
|
||||
"content": json.loads(content_json),
|
||||
}
|
||||
for group_id, membership, gtype, content_json in txn
|
||||
]
|
||||
|
||||
return self.db.runInteraction(
|
||||
"get_groups_changes_for_user", _get_groups_changes_for_user_txn
|
||||
)
|
||||
|
||||
def get_all_groups_changes(self, from_token, to_token, limit):
|
||||
from_token = int(from_token)
|
||||
has_changed = self._group_updates_stream_cache.has_any_entity_changed(
|
||||
from_token
|
||||
)
|
||||
if not has_changed:
|
||||
return defer.succeed([])
|
||||
|
||||
def _get_all_groups_changes_txn(txn):
|
||||
sql = """
|
||||
SELECT stream_id, group_id, user_id, type, content
|
||||
FROM local_group_updates
|
||||
WHERE ? < stream_id AND stream_id <= ?
|
||||
LIMIT ?
|
||||
"""
|
||||
txn.execute(sql, (from_token, to_token, limit))
|
||||
return [
|
||||
(stream_id, group_id, user_id, gtype, json.loads(content_json))
|
||||
for stream_id, group_id, user_id, gtype, content_json in txn
|
||||
]
|
||||
|
||||
return self.db.runInteraction(
|
||||
"get_all_groups_changes", _get_all_groups_changes_txn
|
||||
)
|
||||
|
||||
def get_group_stream_token(self):
|
||||
return self._group_updates_id_gen.get_current_token()
|
||||
|
||||
|
||||
@@ -725,7 +725,7 @@ def _parse_query(database_engine, search_term):
|
||||
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
|
||||
|
||||
if isinstance(database_engine, PostgresEngine):
|
||||
return " & ".join(result + ":*" for result in results)
|
||||
return " & ".join(result for result in results)
|
||||
elif isinstance(database_engine, Sqlite3Engine):
|
||||
return " & ".join(result + "*" for result in results)
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user