Compare commits

...

2 Commits

Author SHA1 Message Date
Will Hunt
ba449705d1 Add hooks 2021-09-24 14:04:06 +01:00
Will Hunt
9c4d018e4e Setup synthetic_events structure 2021-09-24 14:00:58 +01:00
9 changed files with 163 additions and 12 deletions

View File

@@ -187,6 +187,7 @@ class ApplicationService:
for user_id in member_list:
if self.is_interested_in_user(user_id):
return True
return False
def _matches_room_id(self, event: EventBase) -> bool:
@@ -233,6 +234,15 @@ class ApplicationService:
return False
def is_interested_in_synthetic_user_event(self, event_type: str, user_id: UserID):
for regex_obj in self.namespaces["users"]:
if not regex_obj["regex"].match(user_id):
continue
# TODO: Validate structure further up.
if event_type in regex_obj.get("uk.half-shot.msc3395.synthetic_events", {"events": []})["events"]:
return True
return False
@cached(num_args=1)
async def is_interested_in_presence(
self, user_id: UserID, store: "DataStore"
@@ -258,10 +268,10 @@ class ApplicationService:
return False
def is_interested_in_user(self, user_id: str) -> bool:
return (
bool(self._matches_regex(user_id, ApplicationService.NS_USERS))
or user_id == self.sender
)
if user_id == self.sender:
return True
regex_obj = self._matches_regex(user_id, ApplicationService.NS_USERS)
return regex_obj and regex_obj.get("uk.half-shot.msc3395.events", True)
def is_interested_in_alias(self, alias: str) -> bool:
return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES))
@@ -329,11 +339,13 @@ class AppServiceTransaction:
id: int,
events: List[EventBase],
ephemeral: List[JsonDict],
synthetic_events: Optional[JsonDict] = None,
):
self.service = service
self.id = id
self.events = events
self.ephemeral = ephemeral
self.synthetic_events = synthetic_events
async def send(self, as_api: "ApplicationServiceApi") -> bool:
"""Sends this transaction using the provided AS API interface.
@@ -347,6 +359,7 @@ class AppServiceTransaction:
service=self.service,
events=self.events,
ephemeral=self.ephemeral,
synthetic_events=self.synthetic_events,
txn_id=self.id,
)

View File

@@ -203,6 +203,7 @@ class ApplicationServiceApi(SimpleHttpClient):
service: "ApplicationService",
events: List[EventBase],
ephemeral: List[JsonDict],
synthetic_events: Optional[List[JsonDict]],
txn_id: Optional[int] = None,
):
if service.url is None:
@@ -218,11 +219,15 @@ class ApplicationServiceApi(SimpleHttpClient):
uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))
body = {"events": events}
# Never send ephemeral events to appservices that do not support it
if service.supports_ephemeral:
body = {"events": events, "de.sorunome.msc2409.ephemeral": ephemeral}
else:
body = {"events": events}
body["de.sorunome.msc2409.ephemeral"] = ephemeral
# We will only populate this if the appservice requests synthetic events
if synthetic_events and len(synthetic_events):
body["uk.half-shot.msc3395.synthetic_events"] = synthetic_events
try:
await self.put_json(

View File

@@ -65,6 +65,8 @@ MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100
# Maximum number of ephemeral events to provide in an AS transaction.
MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100
# Maximum number of syntethci events to provide in an AS transaction.
MAX_SYNTHETIC_EVENTS_PER_TRANSACTION = 100
class ApplicationServiceScheduler:
"""Public facing API for this module. Does the required DI to tie the
@@ -99,6 +101,11 @@ class ApplicationServiceScheduler:
):
self.queuer.enqueue_ephemeral(service, events)
def submit_synthetic_events_for_as(
self, service: ApplicationService, events: List[JsonDict]
):
self.queuer.enqueue_ephemeral(service, events)
class _ServiceQueuer:
"""Queue of events waiting to be sent to appservices.
@@ -111,6 +118,7 @@ class _ServiceQueuer:
def __init__(self, txn_ctrl, clock):
self.queued_events = {} # dict of {service_id: [events]}
self.queued_ephemeral = {} # dict of {service_id: [events]}
self.queued_synthetic = {} # dict of {service_id: [events]}
# the appservices which currently have a transaction in flight
self.requests_in_flight = set()
@@ -134,6 +142,10 @@ class _ServiceQueuer:
self.queued_ephemeral.setdefault(service.id, []).extend(events)
self._start_background_request(service)
def enqueue_syntheic(self, service: ApplicationService, events: List[JsonDict]):
self.queued_synthetic.setdefault(service.id, []).extend(events)
self._start_background_request(service)
async def _send_request(self, service: ApplicationService):
# sanity-check: we shouldn't get here if this service already has a sender
# running.
@@ -150,11 +162,15 @@ class _ServiceQueuer:
ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
if not events and not ephemeral:
all_events_synthetic = self.queued_synthetic.get(service.id, [])
synthetic = all_events_ephemeral[:MAX_SYNTHETIC_EVENTS_PER_TRANSACTION]
del all_events_synthetic[:MAX_SYNTHETIC_EVENTS_PER_TRANSACTION]
if not events and not ephemeral and not synthetic:
return
try:
await self.txn_ctrl.send(service, events, ephemeral)
await self.txn_ctrl.send(service, events, ephemeral, synthetic)
except Exception:
logger.exception("AS request failed")
finally:
@@ -191,10 +207,11 @@ class _TransactionController:
service: ApplicationService,
events: List[EventBase],
ephemeral: Optional[List[JsonDict]] = None,
synthetic_events: Optional[List[JsonDict]] = None,
):
try:
txn = await self.store.create_appservice_txn(
service=service, events=events, ephemeral=ephemeral or []
service=service, events=events, ephemeral=ephemeral or [], synthetic_events=synthetic_events
)
service_is_up = await self._is_service_up(service)
if service_is_up:

View File

@@ -179,6 +179,58 @@ class ApplicationServicesHandler:
finally:
self.is_processing = False
def notify_synthetic_event(
self,
event_type: str,
user_id: UserID,
content: JsonDict,
) -> None:
"""This is called when another service wishes to
notify about a synthetic event.
This will determine which appservices
are interested in the event, and submit them.
Events will only be pushed to appservices
that have opted into ephemeral events
Args:
event_type: The type of event to notify about.
user_id: The user_id of the user involved in the event.
content: The content of the event itself.
"""
if not self.notify_appservices:
return
logger.debug("Checking interested services for synthetic event %s:%s" % (event_type, user_id))
services = self._get_services_for_user_synthetic_event(event_type, user_id)
if not services:
return
# We only start a new background process if necessary rather than
# optimistically (to cut down on overhead).
self._notify_synthetic_event(
services, user_id, {
"type": event_type,
"content": content,
}
)
@wrap_as_background_process("notify_synthetic_event")
async def _notify_synthetic_event(
self,
services: List[ApplicationService],
user_id: UserID,
event: JsonDict,
) -> None:
logger.debug("Submitting synthetic event to interested services %s:%s" % (event["type"], user_id))
with Measure(self.clock, "notify_synthetic_event"):
for service in services:
# TODO: Store event in DB if we can't submit it now.
self.scheduler.submit_synthetic_events_for_as(service, [event])
def notify_interested_services_ephemeral(
self,
stream_key: str,
@@ -434,12 +486,16 @@ class ApplicationServicesHandler:
def _get_services_for_user(self, user_id: str) -> List[ApplicationService]:
services = self.store.get_app_services()
return [s for s in services if (s.is_interested_in_user(user_id))]
return [s for s in services if s.is_interested_in_user(user_id)]
def _get_services_for_3pn(self, protocol: str) -> List[ApplicationService]:
services = self.store.get_app_services()
return [s for s in services if s.is_interested_in_protocol(protocol)]
def _get_services_for_user_synthetic_event(self, event_type: str, user_id: str) -> List[ApplicationService]:
services = self.store.get_app_services()
return [s for s in services if s.is_interested_in_synthetic_user_event(event_type, user_id)]
async def _is_unknown_user(self, user_id: str) -> bool:
if not self.is_mine_id(user_id):
# we don't know if they are unknown or not since it isn't one of our

View File

@@ -1371,6 +1371,17 @@ class AuthHandler(BaseHandler):
access_token=access_token,
)
# Inform interested appservices
self.hs.get_application_service_handler().notify_synthetic_event(
"m.user.logout",
user_id,
{
"user_id": user_info.user_id,
"device_id": user_info.device_id,
}
)
# delete pushers associated with this access token
if user_info.token_id is not None:
await self.hs.get_pusherpool().remove_pushers_by_access_token(
@@ -1408,6 +1419,16 @@ class AuthHandler(BaseHandler):
user_id, (token_id for _, token_id, _ in tokens_and_devices)
)
# Inform interested appservices
self.hs.get_application_service_handler().notify_synthetic_event(
"m.user.logout",
user_id,
{
"user_id": user_id,
"device_id": device_id,
}
)
async def add_threepid(
self, user_id: str, medium: str, address: str, validated_at: int
) -> None:

View File

@@ -38,6 +38,7 @@ class DeactivateAccountHandler(BaseHandler):
self._room_member_handler = hs.get_room_member_handler()
self._identity_handler = hs.get_identity_handler()
self._profile_handler = hs.get_profile_handler()
self._application_service_handler = hs.get_application_service_handler()
self.user_directory_handler = hs.get_user_directory_handler()
self._server_name = hs.hostname
@@ -159,6 +160,16 @@ class DeactivateAccountHandler(BaseHandler):
# Mark the user as deactivated.
await self.store.set_user_deactivated_status(user_id, True)
# Inform interested appservices
self._application_service_handler.notify_synthetic_event(
"m.user.deactivated",
user_id,
{
"user_id": user_id,
}
)
return identity_server_supports_unbinding
async def _reject_pending_invites_for_user(self, user_id: str) -> None:

View File

@@ -30,6 +30,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
super().__init__(hs)
self.store = hs.get_datastore()
self.registration_handler = hs.get_registration_handler()
self._application_service_handler = hs.get_application_service_handler()
@staticmethod
async def _serialize_payload(
@@ -91,6 +92,20 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
shadow_banned=content["shadow_banned"],
)
# Inform interested appservices
self._application_service_handler.notify_synthetic_event(
"m.user.registration",
user_id,
{
"user_id": user_id,
"guest": content["make_guest"],
"org.matrix.synapse.admin": content["admin"],
"org.matrix.synapse.user_type": content["user_type"],
"org.matrix.synapse.shadow_banned": content["shadow_banned"],
"org.matrix.synapse.appservice_id": content["appservice_id"],
}
)
return 200, {}

View File

@@ -87,6 +87,7 @@ class LoginRestServlet(RestServlet):
self.auth_handler = self.hs.get_auth_handler()
self.registration_handler = hs.get_registration_handler()
self.appservice_handler = hs.get_application_service_handler()
self._sso_handler = hs.get_sso_handler()
self._well_known_builder = WellKnownBuilder(hs)
@@ -353,6 +354,17 @@ class LoginRestServlet(RestServlet):
if callback is not None:
await callback(result)
# Inform interested appservices
self.appservice_handler.notify_synthetic_event(
"m.user.login",
user_id,
{
"user_id": user_id,
"device_id": device_id,
}
)
return result
async def _do_token_login(

View File

@@ -194,6 +194,7 @@ class ApplicationServiceTransactionWorkerStore(
service: ApplicationService,
events: List[EventBase],
ephemeral: List[JsonDict],
synthetic_events: Optional[List[JsonDict]] = None,
) -> AppServiceTransaction:
"""Atomically creates a new transaction for this application service
with the given list of events. Ephemeral events are NOT persisted to the
@@ -233,7 +234,7 @@ class ApplicationServiceTransactionWorkerStore(
(service.id, new_txn_id, event_ids),
)
return AppServiceTransaction(
service=service, id=new_txn_id, events=events, ephemeral=ephemeral
service=service, id=new_txn_id, events=events, ephemeral=ephemeral, synthetic_events=synthetic_events
)
return await self.db_pool.runInteraction(