Compare commits

...

1 Commits

Author SHA1 Message Date
Half-Shot
0ce56923da Update delayed events to support no tokens 2025-10-27 16:46:00 +00:00
4 changed files with 41 additions and 43 deletions

View File

@@ -399,7 +399,7 @@ class DelayedEventsHandler:
if self._next_send_ts_changed(next_send_ts):
self._schedule_next_at(next_send_ts)
async def cancel(self, requester: Requester, delay_id: str) -> None:
async def cancel(self, delay_id: str) -> None:
"""
Cancels the scheduled delivery of the matching delayed event.
@@ -412,20 +412,19 @@ class DelayedEventsHandler:
"""
assert self._is_master
await self._delayed_event_mgmt_ratelimiter.ratelimit(
requester,
(requester.user.to_string(), requester.device_id),
None,
(delay_id),
)
await make_deferred_yieldable(self._initialized_from_db)
next_send_ts = await self._store.cancel_delayed_event(
delay_id=delay_id,
user_localpart=requester.user.localpart,
)
if self._next_send_ts_changed(next_send_ts):
self._schedule_next_at_or_none(next_send_ts)
async def restart(self, requester: Requester, delay_id: str) -> None:
async def restart(self, delay_id: str) -> None:
"""
Restarts the scheduled delivery of the matching delayed event.
@@ -438,26 +437,24 @@ class DelayedEventsHandler:
"""
assert self._is_master
await self._delayed_event_mgmt_ratelimiter.ratelimit(
requester,
(requester.user.to_string(), requester.device_id),
None,
(delay_id),
)
await make_deferred_yieldable(self._initialized_from_db)
next_send_ts = await self._store.restart_delayed_event(
delay_id=delay_id,
user_localpart=requester.user.localpart,
current_ts=self._get_current_ts(),
)
if self._next_send_ts_changed(next_send_ts):
self._schedule_next_at(next_send_ts)
async def send(self, requester: Requester, delay_id: str) -> None:
async def send(self, delay_id: str) -> None:
"""
Immediately sends the matching delayed event, instead of waiting for its scheduled delivery.
Args:
requester: The owner of the delayed event to act on.
delay_id: The ID of the delayed event to act on.
Raises:
@@ -466,28 +463,21 @@ class DelayedEventsHandler:
assert self._is_master
# Use standard request limiter for sending delayed events on-demand,
# as an on-demand send is similar to sending a regular event.
await self._request_ratelimiter.ratelimit(requester)
await make_deferred_yieldable(self._initialized_from_db)
await self._delayed_event_mgmt_ratelimiter.ratelimit(
None,
(delay_id),
)
event, next_send_ts = await self._store.process_target_delayed_event(
delay_id=delay_id,
user_localpart=requester.user.localpart,
)
if self._next_send_ts_changed(next_send_ts):
self._schedule_next_at_or_none(next_send_ts)
await self._send_event(
DelayedEventDetails(
delay_id=DelayID(delay_id),
user_localpart=UserLocalpart(requester.user.localpart),
room_id=event.room_id,
type=event.type,
state_key=event.state_key,
origin_server_ts=event.origin_server_ts,
content=event.content,
device_id=event.device_id,
)
event
)
async def _send_on_timeout(self) -> None:

View File

@@ -53,8 +53,6 @@ class UpdateDelayedEventServlet(RestServlet):
async def on_POST(
self, request: SynapseRequest, delay_id: str
) -> tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
body = parse_json_object_from_request(request)
try:
action = str(body["action"])
@@ -75,11 +73,11 @@ class UpdateDelayedEventServlet(RestServlet):
)
if enum_action == _UpdateDelayedEventAction.CANCEL:
await self.delayed_events_handler.cancel(requester, delay_id)
await self.delayed_events_handler.cancel(delay_id)
elif enum_action == _UpdateDelayedEventAction.RESTART:
await self.delayed_events_handler.restart(requester, delay_id)
await self.delayed_events_handler.restart(delay_id)
elif enum_action == _UpdateDelayedEventAction.SEND:
await self.delayed_events_handler.send(requester, delay_id)
await self.delayed_events_handler.send(delay_id)
return 200, {}

View File

@@ -110,7 +110,6 @@ class DelayedEventsStore(SQLBaseStore):
table="delayed_events",
values={
"delay_id": delay_id,
"user_localpart": user_localpart,
"device_id": device_id,
"delay": delay,
"send_ts": send_ts,
@@ -136,7 +135,6 @@ class DelayedEventsStore(SQLBaseStore):
self,
*,
delay_id: str,
user_localpart: str,
current_ts: Timestamp,
) -> Timestamp:
"""
@@ -145,7 +143,6 @@ class DelayedEventsStore(SQLBaseStore):
Args:
delay_id: The ID of the delayed event to restart.
user_localpart: The localpart of the delayed event's owner.
current_ts: The current time, which will be used to calculate the new send time.
Returns: The send time of the next delayed event to be sent,
@@ -163,13 +160,11 @@ class DelayedEventsStore(SQLBaseStore):
"""
UPDATE delayed_events
SET send_ts = ? + delay
WHERE delay_id = ? AND user_localpart = ?
AND NOT is_processed
WHERE delay_id = ? AND NOT is_processed
""",
(
current_ts,
delay_id,
user_localpart,
),
)
if txn.rowcount == 0:
@@ -321,9 +316,8 @@ class DelayedEventsStore(SQLBaseStore):
self,
*,
delay_id: str,
user_localpart: str,
) -> tuple[
EventDetails,
DelayedEventDetails,
Optional[Timestamp],
]:
"""
@@ -332,7 +326,6 @@ class DelayedEventsStore(SQLBaseStore):
Args:
delay_id: The ID of the delayed event to restart.
user_localpart: The localpart of the delayed event's owner.
Returns: The details of the matching delayed event,
and the send time of the next delayed event to be sent, if any.
@@ -351,7 +344,7 @@ class DelayedEventsStore(SQLBaseStore):
"""
UPDATE delayed_events
SET is_processed = TRUE
WHERE delay_id = ? AND user_localpart = ?
WHERE delay_id = ?
AND NOT is_processed
RETURNING
room_id,
@@ -359,24 +352,26 @@ class DelayedEventsStore(SQLBaseStore):
state_key,
origin_server_ts,
content,
device_id
device_id,
user_localpart
""",
(
delay_id,
user_localpart,
),
)
row = txn.fetchone()
if row is None:
raise NotFoundError("Delayed event not found")
event = EventDetails(
event = DelayedEventDetails(
RoomID.from_string(row[0]),
EventType(row[1]),
StateKey(row[2]) if row[2] is not None else None,
Timestamp(row[3]) if row[3] is not None else None,
db_to_json(row[4]),
DeviceID(row[5]) if row[5] is not None else None,
DelayID(delay_id),
UserLocalpart(row[6]),
)
return event, self._get_next_delayed_event_send_ts_txn(txn)
@@ -388,8 +383,7 @@ class DelayedEventsStore(SQLBaseStore):
async def cancel_delayed_event(
self,
*,
delay_id: str,
user_localpart: str,
delay_id: str
) -> Optional[Timestamp]:
"""
Cancels the matching delayed event, i.e. remove it as long as it hasn't been processed.
@@ -413,7 +407,6 @@ class DelayedEventsStore(SQLBaseStore):
table="delayed_events",
keyvalues={
"delay_id": delay_id,
"user_localpart": user_localpart,
"is_processed": False,
},
)

View File

@@ -0,0 +1,17 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 Element Creations, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- Remove user_localpart from primary key.
ALTER TABLE delayed_events DROP CONSTRAINT delayed_events_pkey;
ALTER TABLE delayed_events ADD PRIMARY KEY (delay_id);