mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-07 01:20:16 +00:00
Compare commits
12 Commits
bbz/improv
...
hs/send-le
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0b997bdd6e | ||
|
|
613a7ea847 | ||
|
|
5c91913f5a | ||
|
|
c810460e40 | ||
|
|
e4312d9af8 | ||
|
|
7a91519fe8 | ||
|
|
e6fcab4309 | ||
|
|
2160889a3a | ||
|
|
0355e6fd6d | ||
|
|
1904389b5d | ||
|
|
8f33017653 | ||
|
|
b590056ba7 |
@@ -345,7 +345,7 @@ def check_state_dependent_auth_rules(
|
||||
logger.debug("Allowing! %s", event)
|
||||
return
|
||||
|
||||
# 5. If type is m.room.membership
|
||||
# 5. If type is m.room.member
|
||||
if event.type == EventTypes.Member:
|
||||
_is_membership_change_allowed(event.room_version, event, auth_dict)
|
||||
logger.debug("Allowing! %s", event)
|
||||
|
||||
@@ -36,7 +36,7 @@ from prometheus_client import Counter
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse
|
||||
from synapse.api.constants import EduTypes, EventTypes
|
||||
from synapse.api.constants import EduTypes, EventTypes, Membership
|
||||
from synapse.appservice import ApplicationService
|
||||
from synapse.events import EventBase
|
||||
from synapse.handlers.presence import format_user_presence_state
|
||||
@@ -139,6 +139,24 @@ class ApplicationServicesHandler:
|
||||
|
||||
events_by_room: Dict[str, List[EventBase]] = {}
|
||||
for event in events:
|
||||
# We never want to send leave events from deleted rooms to the AS since the events
|
||||
# may have been purged by the time it's pushed to the AS. Instead, we handle this
|
||||
# elsewhere.
|
||||
membership = event.content.get("membership")
|
||||
state_key = event.get_state_key()
|
||||
if (
|
||||
state_key is not None
|
||||
and event.type == EventTypes.Member
|
||||
and membership == Membership.LEAVE
|
||||
):
|
||||
if await self.store.has_room_been_deleted(event.room_id):
|
||||
logger.debug(
|
||||
"Filtering %s from appservice as it's a leave event (%s) from deleted room %s",
|
||||
event.event_id,
|
||||
state_key,
|
||||
event.room_id,
|
||||
)
|
||||
continue
|
||||
events_by_room.setdefault(event.room_id, []).append(event)
|
||||
|
||||
async def handle_event(event: EventBase) -> None:
|
||||
@@ -222,6 +240,18 @@ class ApplicationServicesHandler:
|
||||
finally:
|
||||
self.is_processing = False
|
||||
|
||||
async def notify_room_deletion(
|
||||
self, room_id: str, deleted_stream_id: int, users: Iterable[str]
|
||||
) -> None:
|
||||
services = self.store.get_app_services()
|
||||
for service in services:
|
||||
if any(
|
||||
service.is_interested_in_user(user_id) for user_id in users
|
||||
) or await service.is_interested_in_room(room_id, self.store):
|
||||
await self.store.create_appservice_stream_id_txn(
|
||||
service, deleted_stream_id
|
||||
)
|
||||
|
||||
def notify_interested_services_ephemeral(
|
||||
self,
|
||||
stream_key: StreamKeyType,
|
||||
|
||||
@@ -1801,6 +1801,7 @@ class RoomShutdownHandler:
|
||||
self._replication = hs.get_replication_data_handler()
|
||||
self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
|
||||
self.event_creation_handler = hs.get_event_creation_handler()
|
||||
self.as_handler = hs.get_application_service_handler()
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
async def shutdown_room(
|
||||
@@ -1920,12 +1921,23 @@ class RoomShutdownHandler:
|
||||
else:
|
||||
logger.info("Shutting down room %r", room_id)
|
||||
|
||||
users = await self.store.get_local_users_related_to_room(room_id)
|
||||
for user_id, membership in users:
|
||||
# If the user is not in the room (or is banned), nothing to do.
|
||||
if membership not in (Membership.JOIN, Membership.INVITE, Membership.KNOCK):
|
||||
continue
|
||||
# If the user is not in the room (or is banned), nothing to do.
|
||||
users = [
|
||||
user
|
||||
for user in await self.store.get_local_users_related_to_room(room_id)
|
||||
if user[1] in (Membership.JOIN, Membership.INVITE, Membership.KNOCK)
|
||||
]
|
||||
|
||||
# When deleting a room, we want to store the local membership state so that we
|
||||
# can still send synthetic leaves down sync after the room has been purged (if indeed it has).
|
||||
# We must do this prior to kicking as otherwise the current_state_events
|
||||
# table will be empty.
|
||||
delete_stream_id = await self.store.store_deleted_room_members(room_id)
|
||||
await self.as_handler.notify_room_deletion(
|
||||
room_id, delete_stream_id, [user_id for user_id, _membership in users]
|
||||
)
|
||||
|
||||
for user_id, membership in users:
|
||||
logger.info("Kicking %r from %r...", user_id, room_id)
|
||||
|
||||
try:
|
||||
|
||||
@@ -1235,6 +1235,10 @@ class SlidingSyncRoomLists:
|
||||
)
|
||||
)
|
||||
|
||||
deleted_rooms = await self.store.get_deleted_rooms_for_user(
|
||||
user_id, to_token.room_key.stream
|
||||
)
|
||||
|
||||
# 1) Assemble a list of the last membership events in some given ranges. Someone
|
||||
# could have left and joined multiple times during the given range but we only
|
||||
# care about end-result so we grab the last one.
|
||||
@@ -1295,6 +1299,32 @@ class SlidingSyncRoomLists:
|
||||
event_pos=last_membership_change_in_from_to_range.event_pos,
|
||||
room_version_id=await self.store.get_room_version_id(room_id),
|
||||
)
|
||||
for room_id, room_version_id, stream_pos in deleted_rooms:
|
||||
if newly_left_room_map[room_id]:
|
||||
logger.info(
|
||||
"Room %s in newly deleted list, not handling for %s",
|
||||
room_id,
|
||||
user_id,
|
||||
)
|
||||
# It's possible that if the user is syncing at the same time the room is deleted then they will
|
||||
# see a genuine leave event from the room, so we don't need a synthetic leave.
|
||||
continue
|
||||
# Otherwise, generate a synthetic leave to tell clients that the room has been deleted.
|
||||
logger.info(
|
||||
"Generating synthetic leave for %s in %s as room was deleted.",
|
||||
user_id,
|
||||
room_id,
|
||||
)
|
||||
# Note we use RoomsForUserStateReset here as it's ideal for the purpose of a deleted
|
||||
# room where the event that caused us to leave no longer exists.
|
||||
newly_left_room_map[room_id] = RoomsForUserStateReset(
|
||||
room_id=room_id,
|
||||
sender=None,
|
||||
membership=Membership.LEAVE,
|
||||
event_id=None,
|
||||
event_pos=stream_pos,
|
||||
room_version_id=room_version_id,
|
||||
)
|
||||
|
||||
# 2) Figure out `newly_joined`
|
||||
for room_id in possibly_newly_joined_room_ids:
|
||||
|
||||
@@ -52,7 +52,7 @@ from synapse.api.constants import (
|
||||
from synapse.api.filtering import FilterCollection
|
||||
from synapse.api.presence import UserPresenceState
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
from synapse.events import EventBase
|
||||
from synapse.events import EventBase, make_event_from_dict
|
||||
from synapse.handlers.relations import BundledAggregations
|
||||
from synapse.logging import issue9533_logger
|
||||
from synapse.logging.context import current_context
|
||||
@@ -1829,7 +1829,7 @@ class SyncHandler:
|
||||
full_state,
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
logger.info(
|
||||
"Calculating sync response for %r between %s and %s",
|
||||
sync_config.user,
|
||||
sync_result_builder.since_token,
|
||||
@@ -2386,6 +2386,9 @@ class SyncHandler:
|
||||
|
||||
since_token = sync_result_builder.since_token
|
||||
user_id = sync_result_builder.sync_config.user.to_string()
|
||||
logger.info(
|
||||
"Generating _generate_sync_entry_for_rooms for %s %s", user_id, since_token
|
||||
)
|
||||
|
||||
blocks_all_rooms = (
|
||||
sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
|
||||
@@ -2427,19 +2430,22 @@ class SyncHandler:
|
||||
# no point in going further.
|
||||
if not sync_result_builder.full_state:
|
||||
if since_token and not ephemeral_by_room and not account_data_by_room:
|
||||
have_changed = await self._have_rooms_changed(sync_result_builder)
|
||||
have_changed = await self._have_rooms_changed(
|
||||
sync_result_builder, user_id
|
||||
)
|
||||
log_kv({"rooms_have_changed": have_changed})
|
||||
if not have_changed:
|
||||
tags_by_room = await self.store.get_updated_tags(
|
||||
user_id, since_token.account_data_key
|
||||
)
|
||||
if not tags_by_room:
|
||||
logger.debug("no-oping sync")
|
||||
logger.info("no-oping sync")
|
||||
return set(), set()
|
||||
|
||||
# 3. Work out which rooms need reporting in the sync response.
|
||||
ignored_users = await self.store.ignored_users(user_id)
|
||||
if since_token:
|
||||
logger.info("With since_token %s %s", user_id, since_token)
|
||||
room_changes = await self._get_room_changes_for_incremental_sync(
|
||||
sync_result_builder, ignored_users
|
||||
)
|
||||
@@ -2484,7 +2490,7 @@ class SyncHandler:
|
||||
return set(newly_joined_rooms), set(newly_left_rooms)
|
||||
|
||||
async def _have_rooms_changed(
|
||||
self, sync_result_builder: "SyncResultBuilder"
|
||||
self, sync_result_builder: "SyncResultBuilder", user_id: str
|
||||
) -> bool:
|
||||
"""Returns whether there may be any new events that should be sent down
|
||||
the sync. Returns True if there are.
|
||||
@@ -2499,6 +2505,13 @@ class SyncHandler:
|
||||
if membership_change_events or sync_result_builder.forced_newly_joined_room_ids:
|
||||
return True
|
||||
|
||||
# If we have any deleted rooms to send down sync (which do not appear down the event paths)
|
||||
# then also emit a room change.
|
||||
if await self.store.has_deleted_rooms_for_user(
|
||||
user_id, since_token.room_key.stream
|
||||
):
|
||||
return True
|
||||
|
||||
stream_id = since_token.room_key.stream
|
||||
for room_id in sync_result_builder.joined_room_ids:
|
||||
if self.store.has_room_changed_since(room_id, stream_id):
|
||||
@@ -2757,6 +2770,56 @@ class SyncHandler:
|
||||
|
||||
room_entries.append(entry)
|
||||
|
||||
deleted_left_rooms = await self.store.get_deleted_rooms_for_user(
|
||||
user_id, since_token.room_key.stream
|
||||
)
|
||||
|
||||
for room_id, room_version, deleted_stream_id in deleted_left_rooms:
|
||||
if room_id in newly_left_rooms:
|
||||
# It's possible that if the user is syncing at the same time the room is deleted then they will
|
||||
# see a genuine leave event from the room, so we don't need a synthetic leave.
|
||||
continue
|
||||
# Otherwise, generate a synthetic leave to tell clients that the room has been deleted.
|
||||
logger.info(
|
||||
"Generating synthetic leave for %s in %s as room was deleted.",
|
||||
user_id,
|
||||
room_id,
|
||||
)
|
||||
# Synthetic leaves for deleted rooms
|
||||
leave_evt = make_event_from_dict(
|
||||
{
|
||||
"state_key": user_id,
|
||||
"sender": user_id,
|
||||
"room_id": room_id,
|
||||
"type": "m.room.member",
|
||||
"content": {
|
||||
"membership": "leave",
|
||||
"reason": "The room has been deleted",
|
||||
},
|
||||
},
|
||||
# We have no idea what the room version is since the room is gone
|
||||
KNOWN_ROOM_VERSIONS[room_version],
|
||||
)
|
||||
# Ensure the event is treated as an outlier since we are not persisting this!
|
||||
leave_evt.internal_metadata.outlier = True
|
||||
leave_evt.internal_metadata.out_of_band_membership = True
|
||||
leave_evt.internal_metadata.stream_ordering = deleted_stream_id.stream
|
||||
|
||||
room_entries.append(
|
||||
RoomSyncResultBuilder(
|
||||
room_id=room_id,
|
||||
rtype="archived",
|
||||
events=[leave_evt],
|
||||
newly_joined=False,
|
||||
full_state=False,
|
||||
since_token=since_token,
|
||||
upto_token=since_token,
|
||||
end_token=since_token,
|
||||
out_of_band=True,
|
||||
)
|
||||
)
|
||||
newly_left_rooms.append(room_id)
|
||||
|
||||
return _RoomChanges(
|
||||
room_entries,
|
||||
invited,
|
||||
|
||||
@@ -30,7 +30,7 @@ from synapse.appservice import (
|
||||
TransactionUnusedFallbackKeys,
|
||||
)
|
||||
from synapse.config.appservice import load_appservices
|
||||
from synapse.events import EventBase
|
||||
from synapse.events import EventBase, make_event_from_dict
|
||||
from synapse.storage._base import db_to_json
|
||||
from synapse.storage.database import (
|
||||
DatabasePool,
|
||||
@@ -319,6 +319,75 @@ class ApplicationServiceTransactionWorkerStore(
|
||||
"create_appservice_txn", _create_appservice_txn
|
||||
)
|
||||
|
||||
async def get_deleted_room_members_for_appservice(
|
||||
self, room_stream_id: int
|
||||
) -> List[EventBase]:
|
||||
# If we have a room_stream_id, let's see if there are any deleted events.
|
||||
events: List[EventBase] = []
|
||||
logger.info("Processing deleted rooms stream ID %s", room_stream_id)
|
||||
rooms = await self.get_deleted_room_members_at(room_stream_id)
|
||||
for room_id, room_data in rooms.items():
|
||||
room_version, members = room_data
|
||||
for user_id in members:
|
||||
leave_evt = make_event_from_dict(
|
||||
{
|
||||
"state_key": user_id,
|
||||
"sender": user_id,
|
||||
"room_id": room_id,
|
||||
"type": "m.room.member",
|
||||
"content": {
|
||||
"membership": "leave",
|
||||
"reason": "The room has been deleted",
|
||||
},
|
||||
},
|
||||
room_version,
|
||||
)
|
||||
events.append(leave_evt)
|
||||
return events
|
||||
|
||||
async def create_appservice_stream_id_txn(
|
||||
self,
|
||||
service: ApplicationService,
|
||||
stream_id: int,
|
||||
) -> AppServiceTransaction:
|
||||
"""Atomically creates a new transaction for this application service
|
||||
with the given room stream token.
|
||||
|
||||
Args:
|
||||
service: The service who the transaction is for.
|
||||
token: A list of persistent events to put in the transaction.
|
||||
|
||||
Returns:
|
||||
A new transaction.
|
||||
"""
|
||||
|
||||
events = await self.get_deleted_room_members_for_appservice(stream_id)
|
||||
|
||||
def _create_appservice_txn(txn: LoggingTransaction) -> AppServiceTransaction:
|
||||
new_txn_id = self._as_txn_seq_gen.get_next_id_txn(txn)
|
||||
|
||||
# Insert new txn into txn table
|
||||
txn.execute(
|
||||
"INSERT INTO application_services_txns(as_id, txn_id, event_ids, stream_id) "
|
||||
"VALUES(?,?,?,?)",
|
||||
(service.id, new_txn_id, [], stream_id),
|
||||
)
|
||||
|
||||
return AppServiceTransaction(
|
||||
service=service,
|
||||
id=new_txn_id,
|
||||
events=events,
|
||||
ephemeral=[],
|
||||
to_device_messages=[],
|
||||
one_time_keys_count={},
|
||||
unused_fallback_keys={},
|
||||
device_list_summary=DeviceListUpdates(),
|
||||
)
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"create_appservice_txn", _create_appservice_txn
|
||||
)
|
||||
|
||||
async def complete_appservice_txn(
|
||||
self, txn_id: int, service: ApplicationService
|
||||
) -> None:
|
||||
@@ -354,15 +423,15 @@ class ApplicationServiceTransactionWorkerStore(
|
||||
|
||||
def _get_oldest_unsent_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Optional[Tuple[int, str]]:
|
||||
) -> Optional[Tuple[int, str, Optional[int]]]:
|
||||
# Monotonically increasing txn ids, so just select the smallest
|
||||
# one in the txns table (we delete them when they are sent)
|
||||
txn.execute(
|
||||
"SELECT txn_id, event_ids FROM application_services_txns WHERE as_id=?"
|
||||
"SELECT txn_id, event_ids, stream_id FROM application_services_txns WHERE as_id=?"
|
||||
" ORDER BY txn_id ASC LIMIT 1",
|
||||
(service.id,),
|
||||
)
|
||||
return cast(Optional[Tuple[int, str]], txn.fetchone())
|
||||
return cast(Optional[Tuple[int, str, Optional[int]]], txn.fetchone())
|
||||
|
||||
entry = await self.db_pool.runInteraction(
|
||||
"get_oldest_unsent_appservice_txn", _get_oldest_unsent_txn
|
||||
@@ -371,11 +440,14 @@ class ApplicationServiceTransactionWorkerStore(
|
||||
if not entry:
|
||||
return None
|
||||
|
||||
txn_id, event_ids_str = entry
|
||||
txn_id, event_ids_str, room_stream_id = entry
|
||||
|
||||
event_ids = db_to_json(event_ids_str)
|
||||
events = await self.get_events_as_list(event_ids)
|
||||
|
||||
if room_stream_id:
|
||||
events += await self.get_deleted_room_members_for_appservice(room_stream_id)
|
||||
|
||||
# TODO: to-device messages, one-time key counts, device list summaries and unused
|
||||
# fallback keys are not yet populated for catch-up transactions.
|
||||
# We likely want to populate those for reliability.
|
||||
|
||||
@@ -102,6 +102,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
super().__init__(database, db_conn, hs)
|
||||
|
||||
self._server_notices_mxid = hs.config.servernotices.server_notices_mxid
|
||||
self._our_server_name = hs.config.server.server_name
|
||||
|
||||
if (
|
||||
self.hs.config.worker.run_background_tasks
|
||||
@@ -1388,7 +1389,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
|
||||
rows = cast(
|
||||
List[Tuple[str, str, str]],
|
||||
await self.db_pool.simple_select_many_batch(
|
||||
await self.db_pool.simple_select_onecol(
|
||||
table="room_memberships",
|
||||
column="event_id",
|
||||
iterable=member_event_ids,
|
||||
@@ -1845,6 +1846,146 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
"_get_room_participation_txn", _get_room_participation_txn, user_id, room_id
|
||||
)
|
||||
|
||||
async def store_deleted_room_members(
|
||||
self,
|
||||
room_id: str,
|
||||
) -> int:
|
||||
"""Get all local members of a given room and copy them to
|
||||
the deleted room members table.
|
||||
|
||||
This should be run just before a room is deleted (before any
|
||||
kicks are made).
|
||||
|
||||
Args:
|
||||
room_id: the ID of the room
|
||||
"""
|
||||
|
||||
max = self.get_room_max_stream_ordering()
|
||||
|
||||
room_version = await self.get_room_version_id()
|
||||
|
||||
def _store_deleted_room_members_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> None:
|
||||
# This copies the current membership from the room into the deleted members table,
|
||||
# taking care to preseve the old stream ordering for users who were banned or left,
|
||||
# otherwise using the latest stream ordering.
|
||||
sql = """
|
||||
INSERT INTO deleted_room_members (room_id, user_id, deleted_at_stream_id, room_version)
|
||||
SELECT room_id, state_key, (CASE
|
||||
WHEN (membership = 'ban' OR membership = 'leave') THEN event_stream_ordering
|
||||
ELSE ?
|
||||
END), ? FROM current_state_events
|
||||
WHERE type = 'm.room.member'
|
||||
AND room_id = ?
|
||||
AND state_key LIKE ?
|
||||
"""
|
||||
|
||||
return txn.execute(
|
||||
sql,
|
||||
(max, room_version, room_id, "%" + self._our_server_name),
|
||||
)
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"store_deleted_room_members",
|
||||
_store_deleted_room_members_txn,
|
||||
)
|
||||
|
||||
return max
|
||||
|
||||
async def get_deleted_rooms_for_user(
|
||||
self, user_id: str, stream_pos: int
|
||||
) -> list[(str, str, PersistedEventPosition)]:
|
||||
"""Get all rooms and stream positions of deleted rooms to
|
||||
send down the user's sync.
|
||||
|
||||
Returns a tuple of room_id, stream position.
|
||||
"""
|
||||
|
||||
def _get_deleted_rooms_for_user(
|
||||
txn: LoggingTransaction,
|
||||
) -> list[(str, str, PersistedEventPosition)]:
|
||||
sql = """
|
||||
SELECT room_id, room_version, deleted_at_stream_id FROM deleted_room_members
|
||||
WHERE user_id = ?
|
||||
AND ? < deleted_at_stream_id
|
||||
"""
|
||||
txn.execute(sql, (user_id, stream_pos))
|
||||
return [(r[0], r[1], PersistedEventPosition("master", r[2])) for r in txn]
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_deleted_rooms_for_user", _get_deleted_rooms_for_user
|
||||
)
|
||||
|
||||
async def has_deleted_rooms_for_user(self, user_id: str, stream_pos: int) -> bool:
|
||||
"""Checks if the user has any outstanding deleted rooms to send
|
||||
down sync.
|
||||
|
||||
Returns true if there are rooms, otherwise false.
|
||||
"""
|
||||
|
||||
def _has_deleted_rooms_for_user(txn: LoggingTransaction) -> bool:
|
||||
sql = """
|
||||
SELECT 1 FROM deleted_room_members
|
||||
WHERE user_id = ?
|
||||
AND ? < deleted_at_stream_id
|
||||
LIMIT 1
|
||||
"""
|
||||
txn.execute(sql, (user_id, stream_pos))
|
||||
return bool(txn.fetchone())
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"has_deleted_rooms_for_user", _has_deleted_rooms_for_user
|
||||
)
|
||||
|
||||
async def get_deleted_room_members_at(
|
||||
self, stream_pos: int
|
||||
) -> dict[str, (str, list[str])]:
|
||||
"""Gets any rooms and users have been deleted globally at the given stream pos.
|
||||
|
||||
Returns true if there are rooms, otherwise false.
|
||||
"""
|
||||
|
||||
def _get_deleted_room_members_at(
|
||||
txn: LoggingTransaction,
|
||||
) -> dict[str, (str, list[str])]:
|
||||
sql = """
|
||||
SELECT room_id, room_version, user_id FROM deleted_room_members
|
||||
WHERE deleted_at_stream_id = ?
|
||||
LIMIT 1
|
||||
"""
|
||||
txn.execute(sql, (stream_pos,))
|
||||
room_map: dict[str, (str, list[str])] = {}
|
||||
for room_id, room_version, user_id in txn:
|
||||
if not room_map.get(room_id):
|
||||
room_map[room_id] = (room_version,[])
|
||||
room_map[room_id].append(user_id)
|
||||
return room_map
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_deleted_room_members_at", _get_deleted_room_members_at
|
||||
)
|
||||
|
||||
@cached()
|
||||
async def has_room_been_deleted(self, room_id: str) -> bool:
|
||||
"""Checks if a room has been deleted.
|
||||
|
||||
Returns True if there are any members in the deleted room, otherwise False.
|
||||
"""
|
||||
|
||||
def _has_room_been_deleted(txn: LoggingTransaction) -> bool:
|
||||
sql = """
|
||||
SELECT 1 FROM deleted_room_members
|
||||
WHERE room_id = ?
|
||||
LIMIT 1
|
||||
"""
|
||||
txn.execute(sql, (room_id, ))
|
||||
return bool(txn.fetchone())
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"has_room_been_deleted", _has_room_been_deleted
|
||||
)
|
||||
|
||||
|
||||
class RoomMemberBackgroundUpdateStore(SQLBaseStore):
|
||||
def __init__(
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
#
|
||||
#
|
||||
|
||||
SCHEMA_VERSION = 92 # remember to update the list below when updating
|
||||
SCHEMA_VERSION = 93 # remember to update the list below when updating
|
||||
"""Represents the expectations made by the codebase about the database schema
|
||||
|
||||
This should be incremented whenever the codebase changes its requirements on the
|
||||
@@ -168,6 +168,9 @@ Changes in SCHEMA_VERSION = 91
|
||||
|
||||
Changes in SCHEMA_VERSION = 92
|
||||
- Cleaned up a trigger that was added in #18260 and then reverted.
|
||||
Changes in SCHEMA_VERSION = 93:
|
||||
- Add `deleted_room_members` table.
|
||||
- Add `steam_id` to the `application_services_txns` table.
|
||||
"""
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2025 New Vector, Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
CREATE TABLE deleted_room_members (
|
||||
room_id TEXT NOT NULL,
|
||||
room_version TEXT NOT NULL,
|
||||
user_id TEXT NOT NULL,
|
||||
deleted_at_stream_id bigint NOT NULL
|
||||
);
|
||||
CREATE UNIQUE INDEX deleted_room_member_idx ON deleted_room_members(room_id, user_id);
|
||||
14
synapse/storage/schema/main/delta/93/02_as_add_stream_id.sql
Normal file
14
synapse/storage/schema/main/delta/93/02_as_add_stream_id.sql
Normal file
@@ -0,0 +1,14 @@
|
||||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2025 New Vector, Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
ALTER TABLE application_services_txns ADD COLUMN stream_id BIGINT;
|
||||
Reference in New Issue
Block a user