mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Compare commits
2 Commits
anoa/log_e
...
hs/as-synt
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ba449705d1 | ||
|
|
9c4d018e4e |
@@ -187,6 +187,7 @@ class ApplicationService:
|
||||
for user_id in member_list:
|
||||
if self.is_interested_in_user(user_id):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _matches_room_id(self, event: EventBase) -> bool:
|
||||
@@ -233,6 +234,15 @@ class ApplicationService:
|
||||
|
||||
return False
|
||||
|
||||
def is_interested_in_synthetic_user_event(self, event_type: str, user_id: UserID):
|
||||
for regex_obj in self.namespaces["users"]:
|
||||
if not regex_obj["regex"].match(user_id):
|
||||
continue
|
||||
# TODO: Validate structure further up.
|
||||
if event_type in regex_obj.get("uk.half-shot.msc3395.synthetic_events", {"events": []})["events"]:
|
||||
return True
|
||||
return False
|
||||
|
||||
@cached(num_args=1)
|
||||
async def is_interested_in_presence(
|
||||
self, user_id: UserID, store: "DataStore"
|
||||
@@ -258,10 +268,10 @@ class ApplicationService:
|
||||
return False
|
||||
|
||||
def is_interested_in_user(self, user_id: str) -> bool:
|
||||
return (
|
||||
bool(self._matches_regex(user_id, ApplicationService.NS_USERS))
|
||||
or user_id == self.sender
|
||||
)
|
||||
if user_id == self.sender:
|
||||
return True
|
||||
regex_obj = self._matches_regex(user_id, ApplicationService.NS_USERS)
|
||||
return regex_obj and regex_obj.get("uk.half-shot.msc3395.events", True)
|
||||
|
||||
def is_interested_in_alias(self, alias: str) -> bool:
|
||||
return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES))
|
||||
@@ -329,11 +339,13 @@ class AppServiceTransaction:
|
||||
id: int,
|
||||
events: List[EventBase],
|
||||
ephemeral: List[JsonDict],
|
||||
synthetic_events: Optional[JsonDict] = None,
|
||||
):
|
||||
self.service = service
|
||||
self.id = id
|
||||
self.events = events
|
||||
self.ephemeral = ephemeral
|
||||
self.synthetic_events = synthetic_events
|
||||
|
||||
async def send(self, as_api: "ApplicationServiceApi") -> bool:
|
||||
"""Sends this transaction using the provided AS API interface.
|
||||
@@ -347,6 +359,7 @@ class AppServiceTransaction:
|
||||
service=self.service,
|
||||
events=self.events,
|
||||
ephemeral=self.ephemeral,
|
||||
synthetic_events=self.synthetic_events,
|
||||
txn_id=self.id,
|
||||
)
|
||||
|
||||
|
||||
@@ -203,6 +203,7 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
service: "ApplicationService",
|
||||
events: List[EventBase],
|
||||
ephemeral: List[JsonDict],
|
||||
synthetic_events: Optional[List[JsonDict]],
|
||||
txn_id: Optional[int] = None,
|
||||
):
|
||||
if service.url is None:
|
||||
@@ -218,11 +219,15 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
|
||||
uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))
|
||||
|
||||
body = {"events": events}
|
||||
|
||||
# Never send ephemeral events to appservices that do not support it
|
||||
if service.supports_ephemeral:
|
||||
body = {"events": events, "de.sorunome.msc2409.ephemeral": ephemeral}
|
||||
else:
|
||||
body = {"events": events}
|
||||
body["de.sorunome.msc2409.ephemeral"] = ephemeral
|
||||
|
||||
# We will only populate this if the appservice requests synthetic events
|
||||
if synthetic_events and len(synthetic_events):
|
||||
body["uk.half-shot.msc3395.synthetic_events"] = synthetic_events
|
||||
|
||||
try:
|
||||
await self.put_json(
|
||||
|
||||
@@ -65,6 +65,8 @@ MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100
|
||||
# Maximum number of ephemeral events to provide in an AS transaction.
|
||||
MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100
|
||||
|
||||
# Maximum number of syntethci events to provide in an AS transaction.
|
||||
MAX_SYNTHETIC_EVENTS_PER_TRANSACTION = 100
|
||||
|
||||
class ApplicationServiceScheduler:
|
||||
"""Public facing API for this module. Does the required DI to tie the
|
||||
@@ -99,6 +101,11 @@ class ApplicationServiceScheduler:
|
||||
):
|
||||
self.queuer.enqueue_ephemeral(service, events)
|
||||
|
||||
def submit_synthetic_events_for_as(
|
||||
self, service: ApplicationService, events: List[JsonDict]
|
||||
):
|
||||
self.queuer.enqueue_ephemeral(service, events)
|
||||
|
||||
|
||||
class _ServiceQueuer:
|
||||
"""Queue of events waiting to be sent to appservices.
|
||||
@@ -111,6 +118,7 @@ class _ServiceQueuer:
|
||||
def __init__(self, txn_ctrl, clock):
|
||||
self.queued_events = {} # dict of {service_id: [events]}
|
||||
self.queued_ephemeral = {} # dict of {service_id: [events]}
|
||||
self.queued_synthetic = {} # dict of {service_id: [events]}
|
||||
|
||||
# the appservices which currently have a transaction in flight
|
||||
self.requests_in_flight = set()
|
||||
@@ -134,6 +142,10 @@ class _ServiceQueuer:
|
||||
self.queued_ephemeral.setdefault(service.id, []).extend(events)
|
||||
self._start_background_request(service)
|
||||
|
||||
def enqueue_syntheic(self, service: ApplicationService, events: List[JsonDict]):
|
||||
self.queued_synthetic.setdefault(service.id, []).extend(events)
|
||||
self._start_background_request(service)
|
||||
|
||||
async def _send_request(self, service: ApplicationService):
|
||||
# sanity-check: we shouldn't get here if this service already has a sender
|
||||
# running.
|
||||
@@ -150,11 +162,15 @@ class _ServiceQueuer:
|
||||
ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
|
||||
del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
|
||||
|
||||
if not events and not ephemeral:
|
||||
all_events_synthetic = self.queued_synthetic.get(service.id, [])
|
||||
synthetic = all_events_ephemeral[:MAX_SYNTHETIC_EVENTS_PER_TRANSACTION]
|
||||
del all_events_synthetic[:MAX_SYNTHETIC_EVENTS_PER_TRANSACTION]
|
||||
|
||||
if not events and not ephemeral and not synthetic:
|
||||
return
|
||||
|
||||
try:
|
||||
await self.txn_ctrl.send(service, events, ephemeral)
|
||||
await self.txn_ctrl.send(service, events, ephemeral, synthetic)
|
||||
except Exception:
|
||||
logger.exception("AS request failed")
|
||||
finally:
|
||||
@@ -191,10 +207,11 @@ class _TransactionController:
|
||||
service: ApplicationService,
|
||||
events: List[EventBase],
|
||||
ephemeral: Optional[List[JsonDict]] = None,
|
||||
synthetic_events: Optional[List[JsonDict]] = None,
|
||||
):
|
||||
try:
|
||||
txn = await self.store.create_appservice_txn(
|
||||
service=service, events=events, ephemeral=ephemeral or []
|
||||
service=service, events=events, ephemeral=ephemeral or [], synthetic_events=synthetic_events
|
||||
)
|
||||
service_is_up = await self._is_service_up(service)
|
||||
if service_is_up:
|
||||
|
||||
@@ -179,6 +179,58 @@ class ApplicationServicesHandler:
|
||||
finally:
|
||||
self.is_processing = False
|
||||
|
||||
def notify_synthetic_event(
|
||||
self,
|
||||
event_type: str,
|
||||
user_id: UserID,
|
||||
content: JsonDict,
|
||||
) -> None:
|
||||
"""This is called when another service wishes to
|
||||
notify about a synthetic event.
|
||||
|
||||
This will determine which appservices
|
||||
are interested in the event, and submit them.
|
||||
|
||||
Events will only be pushed to appservices
|
||||
that have opted into ephemeral events
|
||||
|
||||
Args:
|
||||
event_type: The type of event to notify about.
|
||||
user_id: The user_id of the user involved in the event.
|
||||
content: The content of the event itself.
|
||||
"""
|
||||
if not self.notify_appservices:
|
||||
return
|
||||
|
||||
logger.debug("Checking interested services for synthetic event %s:%s" % (event_type, user_id))
|
||||
services = self._get_services_for_user_synthetic_event(event_type, user_id)
|
||||
|
||||
if not services:
|
||||
return
|
||||
|
||||
# We only start a new background process if necessary rather than
|
||||
# optimistically (to cut down on overhead).
|
||||
self._notify_synthetic_event(
|
||||
services, user_id, {
|
||||
"type": event_type,
|
||||
"content": content,
|
||||
}
|
||||
)
|
||||
|
||||
@wrap_as_background_process("notify_synthetic_event")
|
||||
async def _notify_synthetic_event(
|
||||
self,
|
||||
services: List[ApplicationService],
|
||||
user_id: UserID,
|
||||
event: JsonDict,
|
||||
) -> None:
|
||||
logger.debug("Submitting synthetic event to interested services %s:%s" % (event["type"], user_id))
|
||||
with Measure(self.clock, "notify_synthetic_event"):
|
||||
for service in services:
|
||||
# TODO: Store event in DB if we can't submit it now.
|
||||
self.scheduler.submit_synthetic_events_for_as(service, [event])
|
||||
|
||||
|
||||
def notify_interested_services_ephemeral(
|
||||
self,
|
||||
stream_key: str,
|
||||
@@ -434,12 +486,16 @@ class ApplicationServicesHandler:
|
||||
|
||||
def _get_services_for_user(self, user_id: str) -> List[ApplicationService]:
|
||||
services = self.store.get_app_services()
|
||||
return [s for s in services if (s.is_interested_in_user(user_id))]
|
||||
return [s for s in services if s.is_interested_in_user(user_id)]
|
||||
|
||||
def _get_services_for_3pn(self, protocol: str) -> List[ApplicationService]:
|
||||
services = self.store.get_app_services()
|
||||
return [s for s in services if s.is_interested_in_protocol(protocol)]
|
||||
|
||||
def _get_services_for_user_synthetic_event(self, event_type: str, user_id: str) -> List[ApplicationService]:
|
||||
services = self.store.get_app_services()
|
||||
return [s for s in services if s.is_interested_in_synthetic_user_event(event_type, user_id)]
|
||||
|
||||
async def _is_unknown_user(self, user_id: str) -> bool:
|
||||
if not self.is_mine_id(user_id):
|
||||
# we don't know if they are unknown or not since it isn't one of our
|
||||
|
||||
@@ -1371,6 +1371,17 @@ class AuthHandler(BaseHandler):
|
||||
access_token=access_token,
|
||||
)
|
||||
|
||||
|
||||
# Inform interested appservices
|
||||
self.hs.get_application_service_handler().notify_synthetic_event(
|
||||
"m.user.logout",
|
||||
user_id,
|
||||
{
|
||||
"user_id": user_info.user_id,
|
||||
"device_id": user_info.device_id,
|
||||
}
|
||||
)
|
||||
|
||||
# delete pushers associated with this access token
|
||||
if user_info.token_id is not None:
|
||||
await self.hs.get_pusherpool().remove_pushers_by_access_token(
|
||||
@@ -1408,6 +1419,16 @@ class AuthHandler(BaseHandler):
|
||||
user_id, (token_id for _, token_id, _ in tokens_and_devices)
|
||||
)
|
||||
|
||||
# Inform interested appservices
|
||||
self.hs.get_application_service_handler().notify_synthetic_event(
|
||||
"m.user.logout",
|
||||
user_id,
|
||||
{
|
||||
"user_id": user_id,
|
||||
"device_id": device_id,
|
||||
}
|
||||
)
|
||||
|
||||
async def add_threepid(
|
||||
self, user_id: str, medium: str, address: str, validated_at: int
|
||||
) -> None:
|
||||
|
||||
@@ -38,6 +38,7 @@ class DeactivateAccountHandler(BaseHandler):
|
||||
self._room_member_handler = hs.get_room_member_handler()
|
||||
self._identity_handler = hs.get_identity_handler()
|
||||
self._profile_handler = hs.get_profile_handler()
|
||||
self._application_service_handler = hs.get_application_service_handler()
|
||||
self.user_directory_handler = hs.get_user_directory_handler()
|
||||
self._server_name = hs.hostname
|
||||
|
||||
@@ -159,6 +160,16 @@ class DeactivateAccountHandler(BaseHandler):
|
||||
# Mark the user as deactivated.
|
||||
await self.store.set_user_deactivated_status(user_id, True)
|
||||
|
||||
|
||||
# Inform interested appservices
|
||||
self._application_service_handler.notify_synthetic_event(
|
||||
"m.user.deactivated",
|
||||
user_id,
|
||||
{
|
||||
"user_id": user_id,
|
||||
}
|
||||
)
|
||||
|
||||
return identity_server_supports_unbinding
|
||||
|
||||
async def _reject_pending_invites_for_user(self, user_id: str) -> None:
|
||||
|
||||
@@ -30,6 +30,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
|
||||
super().__init__(hs)
|
||||
self.store = hs.get_datastore()
|
||||
self.registration_handler = hs.get_registration_handler()
|
||||
self._application_service_handler = hs.get_application_service_handler()
|
||||
|
||||
@staticmethod
|
||||
async def _serialize_payload(
|
||||
@@ -91,6 +92,20 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
|
||||
shadow_banned=content["shadow_banned"],
|
||||
)
|
||||
|
||||
# Inform interested appservices
|
||||
self._application_service_handler.notify_synthetic_event(
|
||||
"m.user.registration",
|
||||
user_id,
|
||||
{
|
||||
"user_id": user_id,
|
||||
"guest": content["make_guest"],
|
||||
"org.matrix.synapse.admin": content["admin"],
|
||||
"org.matrix.synapse.user_type": content["user_type"],
|
||||
"org.matrix.synapse.shadow_banned": content["shadow_banned"],
|
||||
"org.matrix.synapse.appservice_id": content["appservice_id"],
|
||||
}
|
||||
)
|
||||
|
||||
return 200, {}
|
||||
|
||||
|
||||
|
||||
@@ -87,6 +87,7 @@ class LoginRestServlet(RestServlet):
|
||||
|
||||
self.auth_handler = self.hs.get_auth_handler()
|
||||
self.registration_handler = hs.get_registration_handler()
|
||||
self.appservice_handler = hs.get_application_service_handler()
|
||||
self._sso_handler = hs.get_sso_handler()
|
||||
|
||||
self._well_known_builder = WellKnownBuilder(hs)
|
||||
@@ -353,6 +354,17 @@ class LoginRestServlet(RestServlet):
|
||||
if callback is not None:
|
||||
await callback(result)
|
||||
|
||||
# Inform interested appservices
|
||||
self.appservice_handler.notify_synthetic_event(
|
||||
"m.user.login",
|
||||
user_id,
|
||||
{
|
||||
"user_id": user_id,
|
||||
"device_id": device_id,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
return result
|
||||
|
||||
async def _do_token_login(
|
||||
|
||||
@@ -194,6 +194,7 @@ class ApplicationServiceTransactionWorkerStore(
|
||||
service: ApplicationService,
|
||||
events: List[EventBase],
|
||||
ephemeral: List[JsonDict],
|
||||
synthetic_events: Optional[List[JsonDict]] = None,
|
||||
) -> AppServiceTransaction:
|
||||
"""Atomically creates a new transaction for this application service
|
||||
with the given list of events. Ephemeral events are NOT persisted to the
|
||||
@@ -233,7 +234,7 @@ class ApplicationServiceTransactionWorkerStore(
|
||||
(service.id, new_txn_id, event_ids),
|
||||
)
|
||||
return AppServiceTransaction(
|
||||
service=service, id=new_txn_id, events=events, ephemeral=ephemeral
|
||||
service=service, id=new_txn_id, events=events, ephemeral=ephemeral, synthetic_events=synthetic_events
|
||||
)
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
|
||||
Reference in New Issue
Block a user