mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-13 01:50:46 +00:00
Compare commits
159 Commits
devon/old-
...
erikj/ss_h
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4fd68e2736 | ||
|
|
c5a90575bf | ||
|
|
a217155570 | ||
|
|
90d0e035dd | ||
|
|
ab414f2ab8 | ||
|
|
6f9932d146 | ||
|
|
bb905cd02c | ||
|
|
7c9c62051c | ||
|
|
da463fb102 | ||
|
|
8468401a97 | ||
|
|
53b7309f6c | ||
|
|
94e1a54687 | ||
|
|
a507f152c9 | ||
|
|
9d08bc2157 | ||
|
|
56a4c0ba6e | ||
|
|
85a60c3132 | ||
|
|
e5e7269998 | ||
|
|
c8e17f7479 | ||
|
|
4dc9e268e6 | ||
|
|
9a7d8c2be4 | ||
|
|
c51a309da5 | ||
|
|
9764f626ea | ||
|
|
7a0c281028 | ||
|
|
7fe5d31e20 | ||
|
|
53473a0eb4 | ||
|
|
eb3c84cf45 | ||
|
|
6a44686dc3 | ||
|
|
a94c1dd62c | ||
|
|
8bddbe23bd | ||
|
|
addb91485f | ||
|
|
9795556052 | ||
|
|
c44db28958 | ||
|
|
c6204d3fa1 | ||
|
|
1ead5e0c0e | ||
|
|
8d1d8f9b3b | ||
|
|
651e520292 | ||
|
|
f457dbee35 | ||
|
|
77d3fa8b9e | ||
|
|
4a9d81f6ad | ||
|
|
96f476d9b4 | ||
|
|
e2501a0bd7 | ||
|
|
a57d47b778 | ||
|
|
b6a7d2bf6c | ||
|
|
f8926d07df | ||
|
|
21cc97ba9d | ||
|
|
fdb8b5931f | ||
|
|
4b866c4fca | ||
|
|
088a4c7cf0 | ||
|
|
0726a6d58b | ||
|
|
6edc4c78ce | ||
|
|
44432e2118 | ||
|
|
bcba8cccfe | ||
|
|
693c06b2f1 | ||
|
|
4d87fa61c6 | ||
|
|
d61aada8ba | ||
|
|
6723824c4a | ||
|
|
980ee9aad6 | ||
|
|
03eac5ae60 | ||
|
|
ed7591cbef | ||
|
|
3838b18d3b | ||
|
|
b3d8e2d2bd | ||
|
|
d1ee253bef | ||
|
|
87d53368d7 | ||
|
|
e34d634778 | ||
|
|
7087c7c3d5 | ||
|
|
fc73b6ffc9 | ||
|
|
9b8d2017af | ||
|
|
5b77f4a67a | ||
|
|
e2ade85250 | ||
|
|
339500d067 | ||
|
|
31300f4ce5 | ||
|
|
b45b1896aa | ||
|
|
ee2ef0b4d9 | ||
|
|
0a938b137a | ||
|
|
97248362d0 | ||
|
|
02711552cf | ||
|
|
8ddf5c7235 | ||
|
|
513ec8e906 | ||
|
|
cda2311520 | ||
|
|
c612572d12 | ||
|
|
5b1db39bb7 | ||
|
|
e7a3328228 | ||
|
|
f6d7ffd9c5 | ||
|
|
772c501bb6 | ||
|
|
cda92af4a6 | ||
|
|
d3f90e4bd8 | ||
|
|
a5e06c6a8d | ||
|
|
0233e20aa3 | ||
|
|
357132db1d | ||
|
|
726a8e9698 | ||
|
|
cc200ee9f5 | ||
|
|
3eb77c3a2a | ||
|
|
45c89ec625 | ||
|
|
ac5b05c86b | ||
|
|
2964c567d3 | ||
|
|
95d39db772 | ||
|
|
6cc6bdbedf | ||
|
|
574a04a40f | ||
|
|
8ee2e114dd | ||
|
|
98fb56e5fe | ||
|
|
df0c57d383 | ||
|
|
d2f5247e77 | ||
|
|
c89d859c7c | ||
|
|
2ec93e3f0d | ||
|
|
fa63c02648 | ||
|
|
419be7c6b2 | ||
|
|
ef5f0fca3a | ||
|
|
fb5af8f5fa | ||
|
|
8461faf384 | ||
|
|
6c2fc1d20f | ||
|
|
cbeff57402 | ||
|
|
4b42e44ef9 | ||
|
|
d113e743ae | ||
|
|
23e0d34a2d | ||
|
|
1c931cb3e7 | ||
|
|
9f551f0e97 | ||
|
|
c8508f113a | ||
|
|
f49003c35c | ||
|
|
8b0e1692f9 | ||
|
|
5df94f47b5 | ||
|
|
3566abd9bc | ||
|
|
96a4614f92 | ||
|
|
dc447a673f | ||
|
|
32ae162278 | ||
|
|
a90f3d4ae2 | ||
|
|
eb3a185cfc | ||
|
|
517946d940 | ||
|
|
f600eacd0d | ||
|
|
3423eb72d5 | ||
|
|
5589ae48ca | ||
|
|
83a5858083 | ||
|
|
3e1f24ea11 | ||
|
|
ab074f5335 | ||
|
|
53232e6df5 | ||
|
|
f069659343 | ||
|
|
552f8f496d | ||
|
|
0af3b4822c | ||
|
|
ed47a7eff5 | ||
|
|
3367422fd3 | ||
|
|
ca909013c8 | ||
|
|
cc2d2b6b9f | ||
|
|
bc3796d333 | ||
|
|
5cf3ad3d7f | ||
|
|
bf78692ba0 | ||
|
|
a1aaa47dad | ||
|
|
c590474757 | ||
|
|
5b1053f23e | ||
|
|
68a3daf605 | ||
|
|
61cea4e9b7 | ||
|
|
87d95615d4 | ||
|
|
cb335805d4 | ||
|
|
2f3bd27284 | ||
|
|
f96d0c36a3 | ||
|
|
1a251d5211 | ||
|
|
2b5f07d714 | ||
|
|
ad1c887b4c | ||
|
|
8392d6ac3b | ||
|
|
e7e9cb289d | ||
|
|
d26ac746d4 |
1
changelog.d/17512.misc
Normal file
1
changelog.d/17512.misc
Normal file
@@ -0,0 +1 @@
|
|||||||
|
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.
|
||||||
1
changelog.d/17599.misc
Normal file
1
changelog.d/17599.misc
Normal file
@@ -0,0 +1 @@
|
|||||||
|
Store sliding sync per-connection state in the database.
|
||||||
1
changelog.d/17600.misc
Normal file
1
changelog.d/17600.misc
Normal file
@@ -0,0 +1 @@
|
|||||||
|
Make the sliding sync `PerConnectionState` class immutable.
|
||||||
@@ -38,6 +38,7 @@ from mypy.types import (
|
|||||||
NoneType,
|
NoneType,
|
||||||
TupleType,
|
TupleType,
|
||||||
TypeAliasType,
|
TypeAliasType,
|
||||||
|
TypeVarType,
|
||||||
UninhabitedType,
|
UninhabitedType,
|
||||||
UnionType,
|
UnionType,
|
||||||
)
|
)
|
||||||
@@ -233,6 +234,7 @@ IMMUTABLE_CUSTOM_TYPES = {
|
|||||||
"synapse.synapse_rust.push.FilteredPushRules",
|
"synapse.synapse_rust.push.FilteredPushRules",
|
||||||
# This is technically not immutable, but close enough.
|
# This is technically not immutable, but close enough.
|
||||||
"signedjson.types.VerifyKey",
|
"signedjson.types.VerifyKey",
|
||||||
|
"synapse.types.StrCollection",
|
||||||
}
|
}
|
||||||
|
|
||||||
# Immutable containers only if the values are also immutable.
|
# Immutable containers only if the values are also immutable.
|
||||||
@@ -298,7 +300,7 @@ def is_cacheable(
|
|||||||
|
|
||||||
elif rt.type.fullname in MUTABLE_CONTAINER_TYPES:
|
elif rt.type.fullname in MUTABLE_CONTAINER_TYPES:
|
||||||
# Mutable containers are mutable regardless of their underlying type.
|
# Mutable containers are mutable regardless of their underlying type.
|
||||||
return False, None
|
return False, f"container {rt.type.fullname} is mutable"
|
||||||
|
|
||||||
elif "attrs" in rt.type.metadata:
|
elif "attrs" in rt.type.metadata:
|
||||||
# attrs classes are only cachable iff it is frozen (immutable itself)
|
# attrs classes are only cachable iff it is frozen (immutable itself)
|
||||||
@@ -318,6 +320,9 @@ def is_cacheable(
|
|||||||
else:
|
else:
|
||||||
return False, "non-frozen attrs class"
|
return False, "non-frozen attrs class"
|
||||||
|
|
||||||
|
elif rt.type.is_enum:
|
||||||
|
# We assume Enum values are immutable
|
||||||
|
return True, None
|
||||||
else:
|
else:
|
||||||
# Ensure we fail for unknown types, these generally means that the
|
# Ensure we fail for unknown types, these generally means that the
|
||||||
# above code is not complete.
|
# above code is not complete.
|
||||||
@@ -326,6 +331,18 @@ def is_cacheable(
|
|||||||
f"Don't know how to handle {rt.type.fullname} return type instance",
|
f"Don't know how to handle {rt.type.fullname} return type instance",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
elif isinstance(rt, TypeVarType):
|
||||||
|
# We consider TypeVars immutable if they are bound to a set of immutable
|
||||||
|
# types.
|
||||||
|
if rt.values:
|
||||||
|
for value in rt.values:
|
||||||
|
ok, note = is_cacheable(value, signature, verbose)
|
||||||
|
if not ok:
|
||||||
|
return False, f"TypeVar bound not cacheable {value}"
|
||||||
|
return True, None
|
||||||
|
|
||||||
|
return False, "TypeVar is unbound"
|
||||||
|
|
||||||
elif isinstance(rt, NoneType):
|
elif isinstance(rt, NoneType):
|
||||||
# None is cachable.
|
# None is cachable.
|
||||||
return True, None
|
return True, None
|
||||||
|
|||||||
@@ -129,6 +129,11 @@ BOOLEAN_COLUMNS = {
|
|||||||
"remote_media_cache": ["authenticated"],
|
"remote_media_cache": ["authenticated"],
|
||||||
"room_stats_state": ["is_federatable"],
|
"room_stats_state": ["is_federatable"],
|
||||||
"rooms": ["is_public", "has_auth_chain_index"],
|
"rooms": ["is_public", "has_auth_chain_index"],
|
||||||
|
"sliding_sync_joined_rooms": ["is_encrypted"],
|
||||||
|
"sliding_sync_membership_snapshots": [
|
||||||
|
"has_known_state",
|
||||||
|
"is_encrypted",
|
||||||
|
],
|
||||||
"users": ["shadow_banned", "approved", "locked", "suspended"],
|
"users": ["shadow_banned", "approved", "locked", "suspended"],
|
||||||
"un_partial_stated_event_stream": ["rejection_status_changed"],
|
"un_partial_stated_event_stream": ["rejection_status_changed"],
|
||||||
"users_who_share_rooms": ["share_private"],
|
"users_who_share_rooms": ["share_private"],
|
||||||
|
|||||||
@@ -245,6 +245,8 @@ class EventContentFields:
|
|||||||
# `m.room.encryption`` algorithm field
|
# `m.room.encryption`` algorithm field
|
||||||
ENCRYPTION_ALGORITHM: Final = "algorithm"
|
ENCRYPTION_ALGORITHM: Final = "algorithm"
|
||||||
|
|
||||||
|
TOMBSTONE_SUCCESSOR_ROOM: Final = "replacement_room"
|
||||||
|
|
||||||
|
|
||||||
class EventUnsignedContentFields:
|
class EventUnsignedContentFields:
|
||||||
"""Fields found inside the 'unsigned' data on events"""
|
"""Fields found inside the 'unsigned' data on events"""
|
||||||
|
|||||||
@@ -98,6 +98,7 @@ from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
|
|||||||
from synapse.storage.databases.main.search import SearchStore
|
from synapse.storage.databases.main.search import SearchStore
|
||||||
from synapse.storage.databases.main.session import SessionStore
|
from synapse.storage.databases.main.session import SessionStore
|
||||||
from synapse.storage.databases.main.signatures import SignatureWorkerStore
|
from synapse.storage.databases.main.signatures import SignatureWorkerStore
|
||||||
|
from synapse.storage.databases.main.sliding_sync import SlidingSyncStore
|
||||||
from synapse.storage.databases.main.state import StateGroupWorkerStore
|
from synapse.storage.databases.main.state import StateGroupWorkerStore
|
||||||
from synapse.storage.databases.main.stats import StatsStore
|
from synapse.storage.databases.main.stats import StatsStore
|
||||||
from synapse.storage.databases.main.stream import StreamWorkerStore
|
from synapse.storage.databases.main.stream import StreamWorkerStore
|
||||||
@@ -159,6 +160,7 @@ class GenericWorkerStore(
|
|||||||
SessionStore,
|
SessionStore,
|
||||||
TaskSchedulerWorkerStore,
|
TaskSchedulerWorkerStore,
|
||||||
ExperimentalFeaturesStore,
|
ExperimentalFeaturesStore,
|
||||||
|
SlidingSyncStore,
|
||||||
):
|
):
|
||||||
# Properties that multiple storage classes define. Tell mypy what the
|
# Properties that multiple storage classes define. Tell mypy what the
|
||||||
# expected type is.
|
# expected type is.
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ import logging
|
|||||||
from itertools import chain
|
from itertools import chain
|
||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
|
AbstractSet,
|
||||||
Any,
|
Any,
|
||||||
Dict,
|
Dict,
|
||||||
List,
|
List,
|
||||||
@@ -45,13 +46,6 @@ from synapse.events.utils import parse_stripped_state_event, strip_event
|
|||||||
from synapse.handlers.relations import BundledAggregations
|
from synapse.handlers.relations import BundledAggregations
|
||||||
from synapse.handlers.sliding_sync.extensions import SlidingSyncExtensionHandler
|
from synapse.handlers.sliding_sync.extensions import SlidingSyncExtensionHandler
|
||||||
from synapse.handlers.sliding_sync.store import SlidingSyncConnectionStore
|
from synapse.handlers.sliding_sync.store import SlidingSyncConnectionStore
|
||||||
from synapse.handlers.sliding_sync.types import (
|
|
||||||
HaveSentRoomFlag,
|
|
||||||
MutablePerConnectionState,
|
|
||||||
PerConnectionState,
|
|
||||||
RoomSyncConfig,
|
|
||||||
StateValues,
|
|
||||||
)
|
|
||||||
from synapse.logging.opentracing import (
|
from synapse.logging.opentracing import (
|
||||||
SynapseTags,
|
SynapseTags,
|
||||||
log_kv,
|
log_kv,
|
||||||
@@ -83,7 +77,17 @@ from synapse.types import (
|
|||||||
StreamToken,
|
StreamToken,
|
||||||
UserID,
|
UserID,
|
||||||
)
|
)
|
||||||
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
|
from synapse.types.handlers.sliding_sync import (
|
||||||
|
SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
|
||||||
|
HaveSentRoomFlag,
|
||||||
|
MutablePerConnectionState,
|
||||||
|
OperationType,
|
||||||
|
PerConnectionState,
|
||||||
|
RoomSyncConfig,
|
||||||
|
SlidingSyncConfig,
|
||||||
|
SlidingSyncResult,
|
||||||
|
StateValues,
|
||||||
|
)
|
||||||
from synapse.types.state import StateFilter
|
from synapse.types.state import StateFilter
|
||||||
from synapse.util.async_helpers import concurrently_execute
|
from synapse.util.async_helpers import concurrently_execute
|
||||||
from synapse.visibility import filter_events_for_client
|
from synapse.visibility import filter_events_for_client
|
||||||
@@ -107,18 +111,6 @@ class Sentinel(enum.Enum):
|
|||||||
UNSET_SENTINEL = object()
|
UNSET_SENTINEL = object()
|
||||||
|
|
||||||
|
|
||||||
# The event types that clients should consider as new activity.
|
|
||||||
DEFAULT_BUMP_EVENT_TYPES = {
|
|
||||||
EventTypes.Create,
|
|
||||||
EventTypes.Message,
|
|
||||||
EventTypes.Encrypted,
|
|
||||||
EventTypes.Sticker,
|
|
||||||
EventTypes.CallInvite,
|
|
||||||
EventTypes.PollStart,
|
|
||||||
EventTypes.LiveLocationShareStart,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
class _RoomMembershipForUser:
|
class _RoomMembershipForUser:
|
||||||
"""
|
"""
|
||||||
@@ -206,7 +198,7 @@ class SlidingSyncHandler:
|
|||||||
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
|
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
|
||||||
self.is_mine_id = hs.is_mine_id
|
self.is_mine_id = hs.is_mine_id
|
||||||
|
|
||||||
self.connection_store = SlidingSyncConnectionStore()
|
self.connection_store = SlidingSyncConnectionStore(self.store)
|
||||||
self.extensions = SlidingSyncExtensionHandler(hs)
|
self.extensions = SlidingSyncExtensionHandler(hs)
|
||||||
|
|
||||||
async def wait_for_sync_for_user(
|
async def wait_for_sync_for_user(
|
||||||
@@ -330,11 +322,7 @@ class SlidingSyncHandler:
|
|||||||
sync_config, from_token
|
sync_config, from_token
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
new_connection_state = previous_connection_state.get_mutable()
|
||||||
await self.connection_store.mark_token_seen(
|
|
||||||
sync_config=sync_config,
|
|
||||||
from_token=from_token,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Get all of the room IDs that the user should be able to see in the sync
|
# Get all of the room IDs that the user should be able to see in the sync
|
||||||
# response
|
# response
|
||||||
@@ -352,6 +340,10 @@ class SlidingSyncHandler:
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
lists_to_rooms: Mapping[str, AbstractSet[str]] = {}
|
||||||
|
if previous_connection_state is not None:
|
||||||
|
lists_to_rooms = previous_connection_state.list_to_rooms
|
||||||
|
|
||||||
# Assemble sliding window lists
|
# Assemble sliding window lists
|
||||||
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
|
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
|
||||||
# Keep track of the rooms that we can display and need to fetch more info about
|
# Keep track of the rooms that we can display and need to fetch more info about
|
||||||
@@ -368,6 +360,15 @@ class SlidingSyncHandler:
|
|||||||
|
|
||||||
for list_key, list_config in sync_config.lists.items():
|
for list_key, list_config in sync_config.lists.items():
|
||||||
# Apply filters
|
# Apply filters
|
||||||
|
previous_found_rooms = lists_to_rooms.get(list_key)
|
||||||
|
if previous_found_rooms:
|
||||||
|
filtered_sync_room_map = {
|
||||||
|
room_id: sync_room_map[room_id]
|
||||||
|
for room_id in previous_found_rooms
|
||||||
|
}
|
||||||
|
|
||||||
|
# TODO: Record changes to the list.
|
||||||
|
else:
|
||||||
filtered_sync_room_map = sync_room_map
|
filtered_sync_room_map = sync_room_map
|
||||||
if list_config.filters is not None:
|
if list_config.filters is not None:
|
||||||
filtered_sync_room_map = await self.filter_rooms(
|
filtered_sync_room_map = await self.filter_rooms(
|
||||||
@@ -377,6 +378,10 @@ class SlidingSyncHandler:
|
|||||||
to_token,
|
to_token,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
new_connection_state.list_to_rooms[list_key] = set(
|
||||||
|
filtered_sync_room_map.keys()
|
||||||
|
)
|
||||||
|
|
||||||
# Find which rooms are partially stated and may need to be filtered out
|
# Find which rooms are partially stated and may need to be filtered out
|
||||||
# depending on the `required_state` requested (see below).
|
# depending on the `required_state` requested (see below).
|
||||||
partial_state_room_map = (
|
partial_state_room_map = (
|
||||||
@@ -430,15 +435,11 @@ class SlidingSyncHandler:
|
|||||||
room_id
|
room_id
|
||||||
)
|
)
|
||||||
if existing_room_sync_config is not None:
|
if existing_room_sync_config is not None:
|
||||||
existing_room_sync_config.combine_room_sync_config(
|
room_sync_config = existing_room_sync_config.combine_room_sync_config(
|
||||||
room_sync_config
|
room_sync_config
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
# Make a copy so if we modify it later, it doesn't
|
relevant_room_map[room_id] = room_sync_config
|
||||||
# affect all references.
|
|
||||||
relevant_room_map[room_id] = (
|
|
||||||
room_sync_config.deep_copy()
|
|
||||||
)
|
|
||||||
|
|
||||||
room_ids_in_list.append(room_id)
|
room_ids_in_list.append(room_id)
|
||||||
|
|
||||||
@@ -503,10 +504,12 @@ class SlidingSyncHandler:
|
|||||||
# and need to fetch more info about.
|
# and need to fetch more info about.
|
||||||
existing_room_sync_config = relevant_room_map.get(room_id)
|
existing_room_sync_config = relevant_room_map.get(room_id)
|
||||||
if existing_room_sync_config is not None:
|
if existing_room_sync_config is not None:
|
||||||
|
room_sync_config = (
|
||||||
existing_room_sync_config.combine_room_sync_config(
|
existing_room_sync_config.combine_room_sync_config(
|
||||||
room_sync_config
|
room_sync_config
|
||||||
)
|
)
|
||||||
else:
|
)
|
||||||
|
|
||||||
relevant_room_map[room_id] = room_sync_config
|
relevant_room_map[room_id] = room_sync_config
|
||||||
|
|
||||||
# Fetch room data
|
# Fetch room data
|
||||||
@@ -569,8 +572,6 @@ class SlidingSyncHandler:
|
|||||||
if room_id in rooms_should_send
|
if room_id in rooms_should_send
|
||||||
}
|
}
|
||||||
|
|
||||||
new_connection_state = previous_connection_state.get_mutable()
|
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
@tag_args
|
@tag_args
|
||||||
async def handle_room(room_id: str) -> None:
|
async def handle_room(room_id: str) -> None:
|
||||||
@@ -2229,7 +2230,9 @@ class SlidingSyncHandler:
|
|||||||
# Figure out the last bump event in the room
|
# Figure out the last bump event in the room
|
||||||
last_bump_event_result = (
|
last_bump_event_result = (
|
||||||
await self.store.get_last_event_pos_in_room_before_stream_ordering(
|
await self.store.get_last_event_pos_in_room_before_stream_ordering(
|
||||||
room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
|
room_id,
|
||||||
|
to_token.room_key,
|
||||||
|
event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -19,11 +19,6 @@ from typing_extensions import assert_never
|
|||||||
|
|
||||||
from synapse.api.constants import AccountDataTypes
|
from synapse.api.constants import AccountDataTypes
|
||||||
from synapse.handlers.receipts import ReceiptEventSource
|
from synapse.handlers.receipts import ReceiptEventSource
|
||||||
from synapse.handlers.sliding_sync.types import (
|
|
||||||
HaveSentRoomFlag,
|
|
||||||
MutablePerConnectionState,
|
|
||||||
PerConnectionState,
|
|
||||||
)
|
|
||||||
from synapse.logging.opentracing import trace
|
from synapse.logging.opentracing import trace
|
||||||
from synapse.types import (
|
from synapse.types import (
|
||||||
DeviceListUpdates,
|
DeviceListUpdates,
|
||||||
@@ -32,7 +27,14 @@ from synapse.types import (
|
|||||||
SlidingSyncStreamToken,
|
SlidingSyncStreamToken,
|
||||||
StreamToken,
|
StreamToken,
|
||||||
)
|
)
|
||||||
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
|
from synapse.types.handlers.sliding_sync import (
|
||||||
|
HaveSentRoomFlag,
|
||||||
|
MutablePerConnectionState,
|
||||||
|
OperationType,
|
||||||
|
PerConnectionState,
|
||||||
|
SlidingSyncConfig,
|
||||||
|
SlidingSyncResult,
|
||||||
|
)
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
|
|||||||
@@ -13,18 +13,18 @@
|
|||||||
#
|
#
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from typing import TYPE_CHECKING, Dict, Optional, Tuple
|
from typing import TYPE_CHECKING, Optional
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
|
||||||
from synapse.api.errors import SlidingSyncUnknownPosition
|
from synapse.logging.opentracing import trace
|
||||||
from synapse.handlers.sliding_sync.types import (
|
from synapse.storage.databases.main import DataStore
|
||||||
|
from synapse.types import SlidingSyncStreamToken
|
||||||
|
from synapse.types.handlers.sliding_sync import (
|
||||||
MutablePerConnectionState,
|
MutablePerConnectionState,
|
||||||
PerConnectionState,
|
PerConnectionState,
|
||||||
|
SlidingSyncConfig,
|
||||||
)
|
)
|
||||||
from synapse.logging.opentracing import trace
|
|
||||||
from synapse.types import SlidingSyncStreamToken
|
|
||||||
from synapse.types.handlers import SlidingSyncConfig
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
pass
|
pass
|
||||||
@@ -61,20 +61,7 @@ class SlidingSyncConnectionStore:
|
|||||||
to mapping of room ID to `HaveSentRoom`.
|
to mapping of room ID to `HaveSentRoom`.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# `(user_id, conn_id)` -> `connection_position` -> `PerConnectionState`
|
store: "DataStore"
|
||||||
_connections: Dict[Tuple[str, str], Dict[int, PerConnectionState]] = attr.Factory(
|
|
||||||
dict
|
|
||||||
)
|
|
||||||
|
|
||||||
async def is_valid_token(
|
|
||||||
self, sync_config: SlidingSyncConfig, connection_token: int
|
|
||||||
) -> bool:
|
|
||||||
"""Return whether the connection token is valid/recognized"""
|
|
||||||
if connection_token == 0:
|
|
||||||
return True
|
|
||||||
|
|
||||||
conn_key = self._get_connection_key(sync_config)
|
|
||||||
return connection_token in self._connections.get(conn_key, {})
|
|
||||||
|
|
||||||
async def get_per_connection_state(
|
async def get_per_connection_state(
|
||||||
self,
|
self,
|
||||||
@@ -86,23 +73,20 @@ class SlidingSyncConnectionStore:
|
|||||||
Raises:
|
Raises:
|
||||||
SlidingSyncUnknownPosition if the connection_token is unknown
|
SlidingSyncUnknownPosition if the connection_token is unknown
|
||||||
"""
|
"""
|
||||||
if from_token is None:
|
if from_token is None or from_token.connection_position == 0:
|
||||||
return PerConnectionState()
|
return PerConnectionState()
|
||||||
|
|
||||||
connection_position = from_token.connection_position
|
conn_id = sync_config.conn_id or ""
|
||||||
if connection_position == 0:
|
|
||||||
# Initial sync (request without a `from_token`) starts at `0` so
|
|
||||||
# there is no existing per-connection state
|
|
||||||
return PerConnectionState()
|
|
||||||
|
|
||||||
conn_key = self._get_connection_key(sync_config)
|
device_id = sync_config.requester.device_id
|
||||||
sync_statuses = self._connections.get(conn_key, {})
|
assert device_id is not None
|
||||||
connection_state = sync_statuses.get(connection_position)
|
|
||||||
|
|
||||||
if connection_state is None:
|
return await self.store.get_per_connection_state(
|
||||||
raise SlidingSyncUnknownPosition()
|
sync_config.user.to_string(),
|
||||||
|
device_id,
|
||||||
return connection_state
|
conn_id,
|
||||||
|
from_token.connection_position,
|
||||||
|
)
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
async def record_new_state(
|
async def record_new_state(
|
||||||
@@ -116,85 +100,24 @@ class SlidingSyncConnectionStore:
|
|||||||
If there are no changes to the state this may return the same token as
|
If there are no changes to the state this may return the same token as
|
||||||
the existing per-connection state.
|
the existing per-connection state.
|
||||||
"""
|
"""
|
||||||
prev_connection_token = 0
|
|
||||||
if from_token is not None:
|
|
||||||
prev_connection_token = from_token.connection_position
|
|
||||||
|
|
||||||
if not new_connection_state.has_updates():
|
if not new_connection_state.has_updates():
|
||||||
return prev_connection_token
|
if from_token is not None:
|
||||||
|
return from_token.connection_position
|
||||||
|
else:
|
||||||
|
return 0
|
||||||
|
|
||||||
conn_key = self._get_connection_key(sync_config)
|
if from_token is not None and from_token.connection_position == 0:
|
||||||
sync_statuses = self._connections.setdefault(conn_key, {})
|
from_token = None
|
||||||
|
|
||||||
# Generate a new token, removing any existing entries in that token
|
|
||||||
# (which can happen if requests get resent).
|
|
||||||
new_store_token = prev_connection_token + 1
|
|
||||||
sync_statuses.pop(new_store_token, None)
|
|
||||||
|
|
||||||
# We copy the `MutablePerConnectionState` so that the inner `ChainMap`s
|
|
||||||
# don't grow forever.
|
|
||||||
sync_statuses[new_store_token] = new_connection_state.copy()
|
|
||||||
|
|
||||||
return new_store_token
|
|
||||||
|
|
||||||
@trace
|
|
||||||
async def mark_token_seen(
|
|
||||||
self,
|
|
||||||
sync_config: SlidingSyncConfig,
|
|
||||||
from_token: Optional[SlidingSyncStreamToken],
|
|
||||||
) -> None:
|
|
||||||
"""We have received a request with the given token, so we can clear out
|
|
||||||
any other tokens associated with the connection.
|
|
||||||
|
|
||||||
If there is no from token then we have started afresh, and so we delete
|
|
||||||
all tokens associated with the device.
|
|
||||||
"""
|
|
||||||
# Clear out any tokens for the connection that doesn't match the one
|
|
||||||
# from the request.
|
|
||||||
|
|
||||||
conn_key = self._get_connection_key(sync_config)
|
|
||||||
sync_statuses = self._connections.pop(conn_key, {})
|
|
||||||
if from_token is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
sync_statuses = {
|
|
||||||
connection_token: room_statuses
|
|
||||||
for connection_token, room_statuses in sync_statuses.items()
|
|
||||||
if connection_token == from_token.connection_position
|
|
||||||
}
|
|
||||||
if sync_statuses:
|
|
||||||
self._connections[conn_key] = sync_statuses
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _get_connection_key(sync_config: SlidingSyncConfig) -> Tuple[str, str]:
|
|
||||||
"""Return a unique identifier for this connection.
|
|
||||||
|
|
||||||
The first part is simply the user ID.
|
|
||||||
|
|
||||||
The second part is generally a combination of device ID and conn_id.
|
|
||||||
However, both these two are optional (e.g. puppet access tokens don't
|
|
||||||
have device IDs), so this handles those edge cases.
|
|
||||||
|
|
||||||
We use this over the raw `conn_id` to avoid clashes between different
|
|
||||||
clients that use the same `conn_id`. Imagine a user uses a web client
|
|
||||||
that uses `conn_id: main_sync_loop` and an Android client that also has
|
|
||||||
a `conn_id: main_sync_loop`.
|
|
||||||
"""
|
|
||||||
|
|
||||||
user_id = sync_config.user.to_string()
|
|
||||||
|
|
||||||
# Only one sliding sync connection is allowed per given conn_id (empty
|
|
||||||
# or not).
|
|
||||||
conn_id = sync_config.conn_id or ""
|
conn_id = sync_config.conn_id or ""
|
||||||
|
|
||||||
if sync_config.requester.device_id:
|
device_id = sync_config.requester.device_id
|
||||||
return (user_id, f"D/{sync_config.requester.device_id}/{conn_id}")
|
assert device_id is not None
|
||||||
|
|
||||||
if sync_config.requester.access_token_id:
|
return await self.store.persist_per_connection_state(
|
||||||
# If we don't have a device, then the access token ID should be a
|
sync_config.user.to_string(),
|
||||||
# stable ID.
|
device_id,
|
||||||
return (user_id, f"A/{sync_config.requester.access_token_id}/{conn_id}")
|
conn_id,
|
||||||
|
from_token.connection_position if from_token else None,
|
||||||
# If we have neither then its likely an AS or some weird token. Either
|
new_connection_state,
|
||||||
# way we can just fail here.
|
)
|
||||||
raise Exception("Cannot use sliding sync with access token type")
|
|
||||||
|
|||||||
@@ -502,8 +502,15 @@ class EventsPersistenceStorageController:
|
|||||||
"""
|
"""
|
||||||
state = await self._calculate_current_state(room_id)
|
state = await self._calculate_current_state(room_id)
|
||||||
delta = await self._calculate_state_delta(room_id, state)
|
delta = await self._calculate_state_delta(room_id, state)
|
||||||
|
sliding_sync_table_changes = (
|
||||||
|
await self.persist_events_store._calculate_sliding_sync_table_changes(
|
||||||
|
room_id, [], delta
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
await self.persist_events_store.update_current_state(room_id, delta)
|
await self.persist_events_store.update_current_state(
|
||||||
|
room_id, delta, sliding_sync_table_changes
|
||||||
|
)
|
||||||
|
|
||||||
async def _calculate_current_state(self, room_id: str) -> StateMap[str]:
|
async def _calculate_current_state(self, room_id: str) -> StateMap[str]:
|
||||||
"""Calculate the current state of a room, based on the forward extremities
|
"""Calculate the current state of a room, based on the forward extremities
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ from typing import (
|
|||||||
Iterable,
|
Iterable,
|
||||||
Iterator,
|
Iterator,
|
||||||
List,
|
List,
|
||||||
|
Mapping,
|
||||||
Optional,
|
Optional,
|
||||||
Sequence,
|
Sequence,
|
||||||
Tuple,
|
Tuple,
|
||||||
@@ -64,6 +65,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process
|
|||||||
from synapse.storage.background_updates import BackgroundUpdater
|
from synapse.storage.background_updates import BackgroundUpdater
|
||||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
||||||
from synapse.storage.types import Connection, Cursor, SQLQueryParameters
|
from synapse.storage.types import Connection, Cursor, SQLQueryParameters
|
||||||
|
from synapse.types import StrCollection
|
||||||
from synapse.util.async_helpers import delay_cancellation
|
from synapse.util.async_helpers import delay_cancellation
|
||||||
from synapse.util.iterutils import batch_iter
|
from synapse.util.iterutils import batch_iter
|
||||||
|
|
||||||
@@ -1095,6 +1097,50 @@ class DatabasePool:
|
|||||||
|
|
||||||
txn.execute(sql, vals)
|
txn.execute(sql, vals)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def simple_insert_returning_txn(
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
table: str,
|
||||||
|
values: Dict[str, Any],
|
||||||
|
returning: StrCollection,
|
||||||
|
) -> Tuple[Any, ...]:
|
||||||
|
"""Executes a `INSERT INTO... RETURNING...` statement (or equivalent for
|
||||||
|
SQLite versions that don't support it).
|
||||||
|
"""
|
||||||
|
|
||||||
|
if txn.database_engine.supports_returning:
|
||||||
|
keys, vals = zip(*values.items())
|
||||||
|
|
||||||
|
sql = "INSERT INTO %s (%s) VALUES(%s) RETURNING %s" % (
|
||||||
|
table,
|
||||||
|
", ".join(k for k in keys),
|
||||||
|
", ".join("?" for _ in keys),
|
||||||
|
", ".join(k for k in returning),
|
||||||
|
)
|
||||||
|
|
||||||
|
txn.execute(sql, vals)
|
||||||
|
row = txn.fetchone()
|
||||||
|
assert row is not None
|
||||||
|
return row
|
||||||
|
else:
|
||||||
|
# For old versions of SQLite we do a standard insert and then can
|
||||||
|
# use `last_insert_rowid` to get at the row we just inserted
|
||||||
|
DatabasePool.simple_insert_txn(
|
||||||
|
txn,
|
||||||
|
table=table,
|
||||||
|
values=values,
|
||||||
|
)
|
||||||
|
txn.execute("SELECT last_insert_rowid()")
|
||||||
|
row = txn.fetchone()
|
||||||
|
assert row is not None
|
||||||
|
(rowid,) = row
|
||||||
|
|
||||||
|
row = DatabasePool.simple_select_one_txn(
|
||||||
|
txn, table=table, keyvalues={"rowid": rowid}, retcols=returning
|
||||||
|
)
|
||||||
|
assert row is not None
|
||||||
|
return row
|
||||||
|
|
||||||
async def simple_insert_many(
|
async def simple_insert_many(
|
||||||
self,
|
self,
|
||||||
table: str,
|
table: str,
|
||||||
@@ -1254,9 +1300,9 @@ class DatabasePool:
|
|||||||
self,
|
self,
|
||||||
txn: LoggingTransaction,
|
txn: LoggingTransaction,
|
||||||
table: str,
|
table: str,
|
||||||
keyvalues: Dict[str, Any],
|
keyvalues: Mapping[str, Any],
|
||||||
values: Dict[str, Any],
|
values: Mapping[str, Any],
|
||||||
insertion_values: Optional[Dict[str, Any]] = None,
|
insertion_values: Optional[Mapping[str, Any]] = None,
|
||||||
where_clause: Optional[str] = None,
|
where_clause: Optional[str] = None,
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""
|
"""
|
||||||
@@ -1299,9 +1345,9 @@ class DatabasePool:
|
|||||||
self,
|
self,
|
||||||
txn: LoggingTransaction,
|
txn: LoggingTransaction,
|
||||||
table: str,
|
table: str,
|
||||||
keyvalues: Dict[str, Any],
|
keyvalues: Mapping[str, Any],
|
||||||
values: Dict[str, Any],
|
values: Mapping[str, Any],
|
||||||
insertion_values: Optional[Dict[str, Any]] = None,
|
insertion_values: Optional[Mapping[str, Any]] = None,
|
||||||
where_clause: Optional[str] = None,
|
where_clause: Optional[str] = None,
|
||||||
lock: bool = True,
|
lock: bool = True,
|
||||||
) -> bool:
|
) -> bool:
|
||||||
@@ -1322,7 +1368,7 @@ class DatabasePool:
|
|||||||
|
|
||||||
if lock:
|
if lock:
|
||||||
# We need to lock the table :(
|
# We need to lock the table :(
|
||||||
self.engine.lock_table(txn, table)
|
txn.database_engine.lock_table(txn, table)
|
||||||
|
|
||||||
def _getwhere(key: str) -> str:
|
def _getwhere(key: str) -> str:
|
||||||
# If the value we're passing in is None (aka NULL), we need to use
|
# If the value we're passing in is None (aka NULL), we need to use
|
||||||
@@ -1376,13 +1422,13 @@ class DatabasePool:
|
|||||||
# successfully inserted
|
# successfully inserted
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
def simple_upsert_txn_native_upsert(
|
def simple_upsert_txn_native_upsert(
|
||||||
self,
|
|
||||||
txn: LoggingTransaction,
|
txn: LoggingTransaction,
|
||||||
table: str,
|
table: str,
|
||||||
keyvalues: Dict[str, Any],
|
keyvalues: Mapping[str, Any],
|
||||||
values: Dict[str, Any],
|
values: Mapping[str, Any],
|
||||||
insertion_values: Optional[Dict[str, Any]] = None,
|
insertion_values: Optional[Mapping[str, Any]] = None,
|
||||||
where_clause: Optional[str] = None,
|
where_clause: Optional[str] = None,
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""
|
"""
|
||||||
@@ -1535,8 +1581,8 @@ class DatabasePool:
|
|||||||
|
|
||||||
self.simple_upsert_txn_emulated(txn, table, _keys, _vals, lock=False)
|
self.simple_upsert_txn_emulated(txn, table, _keys, _vals, lock=False)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
def simple_upsert_many_txn_native_upsert(
|
def simple_upsert_many_txn_native_upsert(
|
||||||
self,
|
|
||||||
txn: LoggingTransaction,
|
txn: LoggingTransaction,
|
||||||
table: str,
|
table: str,
|
||||||
key_names: Collection[str],
|
key_names: Collection[str],
|
||||||
@@ -1966,8 +2012,8 @@ class DatabasePool:
|
|||||||
def simple_update_txn(
|
def simple_update_txn(
|
||||||
txn: LoggingTransaction,
|
txn: LoggingTransaction,
|
||||||
table: str,
|
table: str,
|
||||||
keyvalues: Dict[str, Any],
|
keyvalues: Mapping[str, Any],
|
||||||
updatevalues: Dict[str, Any],
|
updatevalues: Mapping[str, Any],
|
||||||
) -> int:
|
) -> int:
|
||||||
"""
|
"""
|
||||||
Update rows in the given database table.
|
Update rows in the given database table.
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ from synapse.storage.database import (
|
|||||||
LoggingDatabaseConnection,
|
LoggingDatabaseConnection,
|
||||||
LoggingTransaction,
|
LoggingTransaction,
|
||||||
)
|
)
|
||||||
|
from synapse.storage.databases.main.sliding_sync import SlidingSyncStore
|
||||||
from synapse.storage.databases.main.stats import UserSortOrder
|
from synapse.storage.databases.main.stats import UserSortOrder
|
||||||
from synapse.storage.engines import BaseDatabaseEngine
|
from synapse.storage.engines import BaseDatabaseEngine
|
||||||
from synapse.storage.types import Cursor
|
from synapse.storage.types import Cursor
|
||||||
@@ -156,6 +157,7 @@ class DataStore(
|
|||||||
LockStore,
|
LockStore,
|
||||||
SessionStore,
|
SessionStore,
|
||||||
TaskSchedulerWorkerStore,
|
TaskSchedulerWorkerStore,
|
||||||
|
SlidingSyncStore,
|
||||||
):
|
):
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
|||||||
@@ -313,6 +313,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
|||||||
"get_unread_event_push_actions_by_room_for_user", (room_id,)
|
"get_unread_event_push_actions_by_room_for_user", (room_id,)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self._attempt_to_invalidate_cache("_get_max_event_pos", (room_id,))
|
||||||
|
|
||||||
# The `_get_membership_from_event_id` is immutable, except for the
|
# The `_get_membership_from_event_id` is immutable, except for the
|
||||||
# case where we look up an event *before* persisting it.
|
# case where we look up an event *before* persisting it.
|
||||||
self._attempt_to_invalidate_cache("_get_membership_from_event_id", (event_id,))
|
self._attempt_to_invalidate_cache("_get_membership_from_event_id", (event_id,))
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@@ -457,6 +457,8 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
) -> Optional[EventBase]:
|
) -> Optional[EventBase]:
|
||||||
"""Get an event from the database by event_id.
|
"""Get an event from the database by event_id.
|
||||||
|
|
||||||
|
Events for unknown room versions will also be filtered out.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
event_id: The event_id of the event to fetch
|
event_id: The event_id of the event to fetch
|
||||||
|
|
||||||
@@ -511,6 +513,10 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
) -> Dict[str, EventBase]:
|
) -> Dict[str, EventBase]:
|
||||||
"""Get events from the database
|
"""Get events from the database
|
||||||
|
|
||||||
|
Unknown events will be omitted from the response.
|
||||||
|
|
||||||
|
Events for unknown room versions will also be filtered out.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
event_ids: The event_ids of the events to fetch
|
event_ids: The event_ids of the events to fetch
|
||||||
|
|
||||||
@@ -553,6 +559,8 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
|
|
||||||
Unknown events will be omitted from the response.
|
Unknown events will be omitted from the response.
|
||||||
|
|
||||||
|
Events for unknown room versions will also be filtered out.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
event_ids: The event_ids of the events to fetch
|
event_ids: The event_ids of the events to fetch
|
||||||
|
|
||||||
|
|||||||
@@ -454,6 +454,10 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
|||||||
# so must be deleted first.
|
# so must be deleted first.
|
||||||
"local_current_membership",
|
"local_current_membership",
|
||||||
"room_memberships",
|
"room_memberships",
|
||||||
|
# Note: the sliding_sync_ tables have foreign keys to the `events` table
|
||||||
|
# so must be deleted first.
|
||||||
|
"sliding_sync_joined_rooms",
|
||||||
|
"sliding_sync_membership_snapshots",
|
||||||
"events",
|
"events",
|
||||||
"federation_inbound_events_staging",
|
"federation_inbound_events_staging",
|
||||||
"receipts_graph",
|
"receipts_graph",
|
||||||
|
|||||||
@@ -1337,6 +1337,12 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
|||||||
keyvalues={"user_id": user_id, "room_id": room_id},
|
keyvalues={"user_id": user_id, "room_id": room_id},
|
||||||
updatevalues={"forgotten": 1},
|
updatevalues={"forgotten": 1},
|
||||||
)
|
)
|
||||||
|
self.db_pool.simple_update_txn(
|
||||||
|
txn,
|
||||||
|
table="sliding_sync_membership_snapshots",
|
||||||
|
keyvalues={"user_id": user_id, "room_id": room_id},
|
||||||
|
updatevalues={"forgotten": 1},
|
||||||
|
)
|
||||||
|
|
||||||
self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
|
self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
|
||||||
self._invalidate_cache_and_stream(
|
self._invalidate_cache_and_stream(
|
||||||
|
|||||||
506
synapse/storage/databases/main/sliding_sync.py
Normal file
506
synapse/storage/databases/main/sliding_sync.py
Normal file
@@ -0,0 +1,506 @@
|
|||||||
|
#
|
||||||
|
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||||
|
#
|
||||||
|
# Copyright (C) 2023 New Vector, Ltd
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as
|
||||||
|
# published by the Free Software Foundation, either version 3 of the
|
||||||
|
# License, or (at your option) any later version.
|
||||||
|
#
|
||||||
|
# See the GNU Affero General Public License for more details:
|
||||||
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||||
|
#
|
||||||
|
|
||||||
|
|
||||||
|
from typing import TYPE_CHECKING, AbstractSet, Dict, List, Mapping, Optional, Set, cast
|
||||||
|
|
||||||
|
import attr
|
||||||
|
|
||||||
|
from synapse.api.errors import SlidingSyncUnknownPosition
|
||||||
|
from synapse.logging.opentracing import log_kv
|
||||||
|
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||||
|
from synapse.storage.database import LoggingTransaction
|
||||||
|
from synapse.types import MultiWriterStreamToken, RoomStreamToken
|
||||||
|
from synapse.types.handlers.sliding_sync import (
|
||||||
|
HaveSentRoom,
|
||||||
|
HaveSentRoomFlag,
|
||||||
|
MutablePerConnectionState,
|
||||||
|
PerConnectionState,
|
||||||
|
RoomStatusMap,
|
||||||
|
RoomSyncConfig,
|
||||||
|
)
|
||||||
|
from synapse.util import json_encoder
|
||||||
|
from synapse.util.caches.descriptors import cached
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from synapse.storage.databases.main import DataStore
|
||||||
|
|
||||||
|
|
||||||
|
class SlidingSyncStore(SQLBaseStore):
|
||||||
|
async def persist_per_connection_state(
|
||||||
|
self,
|
||||||
|
user_id: str,
|
||||||
|
device_id: str,
|
||||||
|
conn_id: str,
|
||||||
|
previous_connection_position: Optional[int],
|
||||||
|
per_connection_state: "MutablePerConnectionState",
|
||||||
|
) -> int:
|
||||||
|
"""Persist updates to the per-connection state for a sliding sync
|
||||||
|
connection.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The connection position of the newly persisted state.
|
||||||
|
"""
|
||||||
|
|
||||||
|
store = cast("DataStore", self)
|
||||||
|
return await self.db_pool.runInteraction(
|
||||||
|
"persist_per_connection_state",
|
||||||
|
self.persist_per_connection_state_txn,
|
||||||
|
user_id=user_id,
|
||||||
|
device_id=device_id,
|
||||||
|
conn_id=conn_id,
|
||||||
|
previous_connection_position=previous_connection_position,
|
||||||
|
per_connection_state=await PerConnectionStateDB.from_state(
|
||||||
|
per_connection_state, store
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
def persist_per_connection_state_txn(
|
||||||
|
self,
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
user_id: str,
|
||||||
|
device_id: str,
|
||||||
|
conn_id: str,
|
||||||
|
previous_connection_position: Optional[int],
|
||||||
|
per_connection_state: "PerConnectionStateDB",
|
||||||
|
) -> int:
|
||||||
|
# First we fetch the (or create) the connection key associated with the
|
||||||
|
# previous connection position.
|
||||||
|
if previous_connection_position is not None:
|
||||||
|
# The `previous_connection_position` is a user-supplied value, so we
|
||||||
|
# need to make sure that the one they supplied is actually theirs.
|
||||||
|
sql = """
|
||||||
|
SELECT connection_key
|
||||||
|
FROM sliding_sync_connection_positions
|
||||||
|
INNER JOIN sliding_sync_connections USING (connection_key)
|
||||||
|
WHERE
|
||||||
|
connection_position = ?
|
||||||
|
AND user_id = ? AND device_id = ? AND conn_id = ?
|
||||||
|
"""
|
||||||
|
txn.execute(
|
||||||
|
sql, (previous_connection_position, user_id, device_id, conn_id)
|
||||||
|
)
|
||||||
|
row = txn.fetchone()
|
||||||
|
if row is None:
|
||||||
|
raise SlidingSyncUnknownPosition()
|
||||||
|
|
||||||
|
(connection_key,) = row
|
||||||
|
else:
|
||||||
|
# We're restarting the connection, so we clear all existing
|
||||||
|
# connections. We do this here to ensure that if we get lots of
|
||||||
|
# one-shot requests we don't stack up lots of entries.
|
||||||
|
self.db_pool.simple_delete_txn(
|
||||||
|
txn,
|
||||||
|
table="sliding_sync_connections",
|
||||||
|
keyvalues={
|
||||||
|
"user_id": user_id,
|
||||||
|
"device_id": device_id,
|
||||||
|
"conn_id": conn_id,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
(connection_key,) = self.db_pool.simple_insert_returning_txn(
|
||||||
|
txn,
|
||||||
|
table="sliding_sync_connections",
|
||||||
|
values={
|
||||||
|
"user_id": user_id,
|
||||||
|
"device_id": device_id,
|
||||||
|
"conn_id": conn_id,
|
||||||
|
"created_ts": self._clock.time_msec(),
|
||||||
|
},
|
||||||
|
returning=("connection_key",),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Define a new connection position for the updates
|
||||||
|
(connection_position,) = self.db_pool.simple_insert_returning_txn(
|
||||||
|
txn,
|
||||||
|
table="sliding_sync_connection_positions",
|
||||||
|
values={
|
||||||
|
"connection_key": connection_key,
|
||||||
|
"created_ts": self._clock.time_msec(),
|
||||||
|
},
|
||||||
|
returning=("connection_position",),
|
||||||
|
)
|
||||||
|
|
||||||
|
# We need to deduplicate the `required_state` JSON. We do this by
|
||||||
|
# fetching all JSON associated with the connection and comparing that
|
||||||
|
# with the updates to `required_state`
|
||||||
|
|
||||||
|
# Dict from required state json -> required state ID
|
||||||
|
required_state_to_id: Dict[str, int] = {}
|
||||||
|
if previous_connection_position is not None:
|
||||||
|
rows = self.db_pool.simple_select_list_txn(
|
||||||
|
txn,
|
||||||
|
table="sliding_sync_connection_required_state",
|
||||||
|
keyvalues={"connection_key": connection_key},
|
||||||
|
retcols=("required_state_id", "required_state"),
|
||||||
|
)
|
||||||
|
for required_state_id, required_state in rows:
|
||||||
|
required_state_to_id[required_state] = required_state_id
|
||||||
|
|
||||||
|
room_to_state_ids: Dict[str, int] = {}
|
||||||
|
unique_required_state: Dict[str, List[str]] = {}
|
||||||
|
for room_id, room_state in per_connection_state.room_configs.items():
|
||||||
|
serialized_state = json_encoder.encode(
|
||||||
|
# We store the required state as a sorted list of event type /
|
||||||
|
# state key tuples.
|
||||||
|
sorted(
|
||||||
|
(event_type, state_key)
|
||||||
|
for event_type, state_keys in room_state.required_state_map.items()
|
||||||
|
for state_key in state_keys
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
existing_state_id = required_state_to_id.get(serialized_state)
|
||||||
|
if existing_state_id is not None:
|
||||||
|
room_to_state_ids[room_id] = existing_state_id
|
||||||
|
else:
|
||||||
|
unique_required_state.setdefault(serialized_state, []).append(room_id)
|
||||||
|
|
||||||
|
# Insert any new `required_state` json we haven't previously seen.
|
||||||
|
for serialized_required_state, room_ids in unique_required_state.items():
|
||||||
|
(required_state_id,) = self.db_pool.simple_insert_returning_txn(
|
||||||
|
txn,
|
||||||
|
table="sliding_sync_connection_required_state",
|
||||||
|
values={
|
||||||
|
"connection_key": connection_key,
|
||||||
|
"required_state": serialized_required_state,
|
||||||
|
},
|
||||||
|
returning=("required_state_id",),
|
||||||
|
)
|
||||||
|
for room_id in room_ids:
|
||||||
|
room_to_state_ids[room_id] = required_state_id
|
||||||
|
|
||||||
|
# Copy over state from the previous connection position (we'll overwrite
|
||||||
|
# these rows with any changes).
|
||||||
|
if previous_connection_position is not None:
|
||||||
|
sql = """
|
||||||
|
INSERT INTO sliding_sync_connection_streams
|
||||||
|
(connection_position, stream, room_id, room_status, last_position)
|
||||||
|
SELECT ?, stream, room_id, room_status, last_position
|
||||||
|
FROM sliding_sync_connection_streams
|
||||||
|
WHERE connection_position = ?
|
||||||
|
"""
|
||||||
|
txn.execute(sql, (connection_position, previous_connection_position))
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
INSERT INTO sliding_sync_connection_room_configs
|
||||||
|
(connection_position, room_id, timeline_limit, required_state_id)
|
||||||
|
SELECT ?, room_id, timeline_limit, required_state_id
|
||||||
|
FROM sliding_sync_connection_room_configs
|
||||||
|
WHERE connection_position = ?
|
||||||
|
"""
|
||||||
|
txn.execute(sql, (connection_position, previous_connection_position))
|
||||||
|
|
||||||
|
# We now upsert the changes to the various streams.
|
||||||
|
key_values = []
|
||||||
|
value_values = []
|
||||||
|
for room_id, have_sent_room in per_connection_state.rooms._statuses.items():
|
||||||
|
key_values.append((connection_position, "rooms", room_id))
|
||||||
|
value_values.append(
|
||||||
|
(have_sent_room.status.value, have_sent_room.last_token)
|
||||||
|
)
|
||||||
|
|
||||||
|
for room_id, have_sent_room in per_connection_state.receipts._statuses.items():
|
||||||
|
key_values.append((connection_position, "receipts", room_id))
|
||||||
|
value_values.append(
|
||||||
|
(have_sent_room.status.value, have_sent_room.last_token)
|
||||||
|
)
|
||||||
|
|
||||||
|
self.db_pool.simple_upsert_many_txn(
|
||||||
|
txn,
|
||||||
|
table="sliding_sync_connection_streams",
|
||||||
|
key_names=(
|
||||||
|
"connection_position",
|
||||||
|
"stream",
|
||||||
|
"room_id",
|
||||||
|
),
|
||||||
|
key_values=key_values,
|
||||||
|
value_names=(
|
||||||
|
"room_status",
|
||||||
|
"last_position",
|
||||||
|
),
|
||||||
|
value_values=value_values,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ... and upsert changes to the room configs.
|
||||||
|
keys = []
|
||||||
|
values = []
|
||||||
|
for room_id, room_config in per_connection_state.room_configs.items():
|
||||||
|
keys.append((connection_position, room_id))
|
||||||
|
values.append((room_config.timeline_limit, room_to_state_ids[room_id]))
|
||||||
|
|
||||||
|
self.db_pool.simple_upsert_many_txn(
|
||||||
|
txn,
|
||||||
|
table="sliding_sync_connection_room_configs",
|
||||||
|
key_names=(
|
||||||
|
"connection_position",
|
||||||
|
"room_id",
|
||||||
|
),
|
||||||
|
key_values=keys,
|
||||||
|
value_names=(
|
||||||
|
"timeline_limit",
|
||||||
|
"required_state_id",
|
||||||
|
),
|
||||||
|
value_values=values,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Persist changes to the room lists
|
||||||
|
for list_name, list_room_ids in per_connection_state.list_to_rooms.items():
|
||||||
|
self.db_pool.simple_delete_txn(
|
||||||
|
txn,
|
||||||
|
table="sliding_sync_connection_room_lists",
|
||||||
|
keyvalues={"connection_key": connection_key, "list_name": list_name},
|
||||||
|
)
|
||||||
|
self.db_pool.simple_insert_many_txn(
|
||||||
|
txn,
|
||||||
|
table="sliding_sync_connection_room_lists",
|
||||||
|
keys=("connection_key", "list_name", "room_id"),
|
||||||
|
values=[
|
||||||
|
(connection_key, list_name, room_id) for room_id in list_room_ids
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
return connection_position
|
||||||
|
|
||||||
|
@cached(iterable=True, max_entries=100000)
|
||||||
|
async def get_per_connection_state(
|
||||||
|
self, user_id: str, device_id: str, conn_id: str, connection_position: int
|
||||||
|
) -> "PerConnectionState":
|
||||||
|
"""Get the per-connection state for the given connection position."""
|
||||||
|
|
||||||
|
per_connection_state_db = await self.db_pool.runInteraction(
|
||||||
|
"get_per_connection_state",
|
||||||
|
self._get_per_connection_state_txn,
|
||||||
|
user_id=user_id,
|
||||||
|
device_id=device_id,
|
||||||
|
conn_id=conn_id,
|
||||||
|
connection_position=connection_position,
|
||||||
|
)
|
||||||
|
store = cast("DataStore", self)
|
||||||
|
return await per_connection_state_db.to_state(store)
|
||||||
|
|
||||||
|
def _get_per_connection_state_txn(
|
||||||
|
self,
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
user_id: str,
|
||||||
|
device_id: str,
|
||||||
|
conn_id: str,
|
||||||
|
connection_position: int,
|
||||||
|
) -> "PerConnectionStateDB":
|
||||||
|
# The `previous_connection_position` is a user-supplied value, so we
|
||||||
|
# need to make sure that the one they supplied is actually theirs.
|
||||||
|
sql = """
|
||||||
|
SELECT connection_key
|
||||||
|
FROM sliding_sync_connection_positions
|
||||||
|
INNER JOIN sliding_sync_connections USING (connection_key)
|
||||||
|
WHERE
|
||||||
|
connection_position = ?
|
||||||
|
AND user_id = ? AND device_id = ? AND conn_id = ?
|
||||||
|
"""
|
||||||
|
txn.execute(sql, (connection_position, user_id, device_id, conn_id))
|
||||||
|
row = txn.fetchone()
|
||||||
|
if row is None:
|
||||||
|
raise SlidingSyncUnknownPosition()
|
||||||
|
|
||||||
|
(connection_key,) = row
|
||||||
|
|
||||||
|
# Now that we have seen the client has received and used the connection
|
||||||
|
# position, we can delete all the other connection positions.
|
||||||
|
sql = """
|
||||||
|
DELETE FROM sliding_sync_connection_positions
|
||||||
|
WHERE connection_key = ? AND connection_position != ?
|
||||||
|
"""
|
||||||
|
txn.execute(sql, (connection_key, connection_position))
|
||||||
|
|
||||||
|
# Fetch and create a mapping from required state ID to the actual
|
||||||
|
# required state for the connection.
|
||||||
|
rows = self.db_pool.simple_select_list_txn(
|
||||||
|
txn,
|
||||||
|
table="sliding_sync_connection_required_state",
|
||||||
|
keyvalues={"connection_key": connection_key},
|
||||||
|
retcols=(
|
||||||
|
"required_state_id",
|
||||||
|
"required_state",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
required_state_map: Dict[int, Dict[str, Set[str]]] = {}
|
||||||
|
for row in rows:
|
||||||
|
state = required_state_map[row[0]] = {}
|
||||||
|
for event_type, state_keys in db_to_json(row[1]):
|
||||||
|
state[event_type] = set(state_keys)
|
||||||
|
|
||||||
|
# Get all the room configs, looking up the required state from the map
|
||||||
|
# above.
|
||||||
|
room_config_rows = self.db_pool.simple_select_list_txn(
|
||||||
|
txn,
|
||||||
|
table="sliding_sync_connection_room_configs",
|
||||||
|
keyvalues={"connection_position": connection_position},
|
||||||
|
retcols=(
|
||||||
|
"room_id",
|
||||||
|
"timeline_limit",
|
||||||
|
"required_state_id",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
room_configs: Dict[str, RoomSyncConfig] = {}
|
||||||
|
for (
|
||||||
|
room_id,
|
||||||
|
timeline_limit,
|
||||||
|
required_state_id,
|
||||||
|
) in room_config_rows:
|
||||||
|
room_configs[room_id] = RoomSyncConfig(
|
||||||
|
timeline_limit=timeline_limit,
|
||||||
|
required_state_map=required_state_map[required_state_id],
|
||||||
|
)
|
||||||
|
|
||||||
|
# Now look up the per-room stream data.
|
||||||
|
rooms: Dict[str, HaveSentRoom[str]] = {}
|
||||||
|
receipts: Dict[str, HaveSentRoom[str]] = {}
|
||||||
|
|
||||||
|
receipt_rows = self.db_pool.simple_select_list_txn(
|
||||||
|
txn,
|
||||||
|
table="sliding_sync_connection_streams",
|
||||||
|
keyvalues={"connection_position": connection_position},
|
||||||
|
retcols=(
|
||||||
|
"stream",
|
||||||
|
"room_id",
|
||||||
|
"room_status",
|
||||||
|
"last_position",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
for stream, room_id, room_status, last_position in receipt_rows:
|
||||||
|
have_sent_room: HaveSentRoom[str] = HaveSentRoom(
|
||||||
|
status=HaveSentRoomFlag(room_status), last_token=last_position
|
||||||
|
)
|
||||||
|
if stream == "rooms":
|
||||||
|
rooms[room_id] = have_sent_room
|
||||||
|
elif stream == "receipts":
|
||||||
|
receipts[room_id] = have_sent_room
|
||||||
|
|
||||||
|
# Fetch any stored lists for the connection
|
||||||
|
rows = self.db_pool.simple_select_list_txn(
|
||||||
|
txn,
|
||||||
|
table="sliding_sync_connection_room_lists",
|
||||||
|
keyvalues={
|
||||||
|
connection_key: connection_key,
|
||||||
|
},
|
||||||
|
retcols=("list_name", "room_id"),
|
||||||
|
)
|
||||||
|
list_to_rooms: Dict[str, Set[str]] = {}
|
||||||
|
for list_name, room_id in rows:
|
||||||
|
list_to_rooms.setdefault(list_name, set()).add(room_id)
|
||||||
|
|
||||||
|
return PerConnectionStateDB(
|
||||||
|
rooms=RoomStatusMap(rooms),
|
||||||
|
receipts=RoomStatusMap(receipts),
|
||||||
|
room_configs=room_configs,
|
||||||
|
list_to_rooms=list_to_rooms,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s(auto_attribs=True, frozen=True)
|
||||||
|
class PerConnectionStateDB:
|
||||||
|
"""An equivalent to `PerConnectionState` that holds data in a format stored
|
||||||
|
in the DB.
|
||||||
|
|
||||||
|
The principle difference is that the tokens for the different streams are
|
||||||
|
serialized to strings.
|
||||||
|
|
||||||
|
When persisting this *only* contains updates to the state.
|
||||||
|
"""
|
||||||
|
|
||||||
|
rooms: "RoomStatusMap[str]"
|
||||||
|
receipts: "RoomStatusMap[str]"
|
||||||
|
|
||||||
|
room_configs: Mapping[str, "RoomSyncConfig"]
|
||||||
|
list_to_rooms: Mapping[str, AbstractSet[str]]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def from_state(
|
||||||
|
per_connection_state: "MutablePerConnectionState", store: "DataStore"
|
||||||
|
) -> "PerConnectionStateDB":
|
||||||
|
"""Convert from a standard `PerConnectionState`"""
|
||||||
|
rooms = {
|
||||||
|
room_id: HaveSentRoom(
|
||||||
|
status=status.status,
|
||||||
|
last_token=(
|
||||||
|
await status.last_token.to_string(store)
|
||||||
|
if status.last_token is not None
|
||||||
|
else None
|
||||||
|
),
|
||||||
|
)
|
||||||
|
for room_id, status in per_connection_state.rooms.get_updates().items()
|
||||||
|
}
|
||||||
|
|
||||||
|
receipts = {
|
||||||
|
room_id: HaveSentRoom(
|
||||||
|
status=status.status,
|
||||||
|
last_token=(
|
||||||
|
await status.last_token.to_string(store)
|
||||||
|
if status.last_token is not None
|
||||||
|
else None
|
||||||
|
),
|
||||||
|
)
|
||||||
|
for room_id, status in per_connection_state.receipts.get_updates().items()
|
||||||
|
}
|
||||||
|
|
||||||
|
log_kv(
|
||||||
|
{
|
||||||
|
"rooms": rooms,
|
||||||
|
"receipts": receipts,
|
||||||
|
"room_configs": per_connection_state.room_configs.maps[0],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return PerConnectionStateDB(
|
||||||
|
rooms=RoomStatusMap(rooms),
|
||||||
|
receipts=RoomStatusMap(receipts),
|
||||||
|
room_configs=per_connection_state.room_configs.maps[0],
|
||||||
|
list_to_rooms=per_connection_state.list_to_rooms.maps[0],
|
||||||
|
)
|
||||||
|
|
||||||
|
async def to_state(self, store: "DataStore") -> "PerConnectionState":
|
||||||
|
"""Convert into a standard `PerConnectionState`"""
|
||||||
|
rooms = {
|
||||||
|
room_id: HaveSentRoom(
|
||||||
|
status=status.status,
|
||||||
|
last_token=(
|
||||||
|
await RoomStreamToken.parse(store, status.last_token)
|
||||||
|
if status.last_token is not None
|
||||||
|
else None
|
||||||
|
),
|
||||||
|
)
|
||||||
|
for room_id, status in self.rooms._statuses.items()
|
||||||
|
}
|
||||||
|
|
||||||
|
receipts = {
|
||||||
|
room_id: HaveSentRoom(
|
||||||
|
status=status.status,
|
||||||
|
last_token=(
|
||||||
|
await MultiWriterStreamToken.parse(store, status.last_token)
|
||||||
|
if status.last_token is not None
|
||||||
|
else None
|
||||||
|
),
|
||||||
|
)
|
||||||
|
for room_id, status in self.receipts._statuses.items()
|
||||||
|
}
|
||||||
|
|
||||||
|
return PerConnectionState(
|
||||||
|
rooms=RoomStatusMap(rooms),
|
||||||
|
receipts=RoomStatusMap(receipts),
|
||||||
|
room_configs=self.room_configs,
|
||||||
|
list_to_rooms=self.list_to_rooms,
|
||||||
|
)
|
||||||
@@ -161,29 +161,38 @@ class StateDeltasStore(SQLBaseStore):
|
|||||||
self._get_max_stream_id_in_current_state_deltas_txn,
|
self._get_max_stream_id_in_current_state_deltas_txn,
|
||||||
)
|
)
|
||||||
|
|
||||||
@trace
|
|
||||||
async def get_current_state_deltas_for_room(
|
|
||||||
self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
|
|
||||||
) -> List[StateDelta]:
|
|
||||||
"""Get the state deltas between two tokens."""
|
|
||||||
|
|
||||||
if not self._curr_state_delta_stream_cache.has_entity_changed(
|
|
||||||
room_id, from_token.stream
|
|
||||||
):
|
|
||||||
return []
|
|
||||||
|
|
||||||
def get_current_state_deltas_for_room_txn(
|
def get_current_state_deltas_for_room_txn(
|
||||||
|
self,
|
||||||
txn: LoggingTransaction,
|
txn: LoggingTransaction,
|
||||||
|
room_id: str,
|
||||||
|
*,
|
||||||
|
from_token: Optional[RoomStreamToken],
|
||||||
|
to_token: Optional[RoomStreamToken],
|
||||||
) -> List[StateDelta]:
|
) -> List[StateDelta]:
|
||||||
sql = """
|
"""
|
||||||
|
Get the state deltas between two tokens.
|
||||||
|
|
||||||
|
(> `from_token` and <= `to_token`)
|
||||||
|
"""
|
||||||
|
from_clause = ""
|
||||||
|
from_args = []
|
||||||
|
if from_token is not None:
|
||||||
|
from_clause = "AND ? < stream_id"
|
||||||
|
from_args = [from_token.stream]
|
||||||
|
|
||||||
|
to_clause = ""
|
||||||
|
to_args = []
|
||||||
|
if to_token is not None:
|
||||||
|
to_clause = "AND stream_id <= ?"
|
||||||
|
to_args = [to_token.get_max_stream_pos()]
|
||||||
|
|
||||||
|
sql = f"""
|
||||||
SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
|
SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
|
||||||
FROM current_state_delta_stream
|
FROM current_state_delta_stream
|
||||||
WHERE room_id = ? AND ? < stream_id AND stream_id <= ?
|
WHERE room_id = ? {from_clause} {to_clause}
|
||||||
ORDER BY stream_id ASC
|
ORDER BY stream_id ASC
|
||||||
"""
|
"""
|
||||||
txn.execute(
|
txn.execute(sql, [room_id] + from_args + to_args)
|
||||||
sql, (room_id, from_token.stream, to_token.get_max_stream_pos())
|
|
||||||
)
|
|
||||||
|
|
||||||
return [
|
return [
|
||||||
StateDelta(
|
StateDelta(
|
||||||
@@ -198,8 +207,34 @@ class StateDeltasStore(SQLBaseStore):
|
|||||||
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
|
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@trace
|
||||||
|
async def get_current_state_deltas_for_room(
|
||||||
|
self,
|
||||||
|
room_id: str,
|
||||||
|
*,
|
||||||
|
from_token: Optional[RoomStreamToken],
|
||||||
|
to_token: Optional[RoomStreamToken],
|
||||||
|
) -> List[StateDelta]:
|
||||||
|
"""
|
||||||
|
Get the state deltas between two tokens.
|
||||||
|
|
||||||
|
(> `from_token` and <= `to_token`)
|
||||||
|
"""
|
||||||
|
|
||||||
|
if (
|
||||||
|
from_token is not None
|
||||||
|
and not self._curr_state_delta_stream_cache.has_entity_changed(
|
||||||
|
room_id, from_token.stream
|
||||||
|
)
|
||||||
|
):
|
||||||
|
return []
|
||||||
|
|
||||||
return await self.db_pool.runInteraction(
|
return await self.db_pool.runInteraction(
|
||||||
"get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
|
"get_current_state_deltas_for_room",
|
||||||
|
self.get_current_state_deltas_for_room_txn,
|
||||||
|
room_id,
|
||||||
|
from_token=from_token,
|
||||||
|
to_token=to_token,
|
||||||
)
|
)
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
|
|||||||
@@ -50,6 +50,7 @@ from typing import (
|
|||||||
Dict,
|
Dict,
|
||||||
Iterable,
|
Iterable,
|
||||||
List,
|
List,
|
||||||
|
Mapping,
|
||||||
Optional,
|
Optional,
|
||||||
Protocol,
|
Protocol,
|
||||||
Set,
|
Set,
|
||||||
@@ -80,7 +81,7 @@ from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
|||||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
||||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
||||||
from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection
|
from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection
|
||||||
from synapse.util.caches.descriptors import cached
|
from synapse.util.caches.descriptors import cached, cachedList
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
from synapse.util.cancellation import cancellable
|
from synapse.util.cancellation import cancellable
|
||||||
from synapse.util.iterutils import batch_iter
|
from synapse.util.iterutils import batch_iter
|
||||||
@@ -1263,12 +1264,76 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
async def get_last_event_pos_in_room(
|
||||||
|
self,
|
||||||
|
room_id: str,
|
||||||
|
event_types: Optional[StrCollection] = None,
|
||||||
|
) -> Optional[Tuple[str, PersistedEventPosition]]:
|
||||||
|
"""
|
||||||
|
Returns the ID and event position of the last event in a room.
|
||||||
|
|
||||||
|
Based on `get_last_event_pos_in_room_before_stream_ordering(...)`
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id
|
||||||
|
event_types: Optional allowlist of event types to filter by
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The ID of the most recent event and it's position, or None if there are no
|
||||||
|
events in the room that match the given event types.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _get_last_event_pos_in_room_txn(
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
) -> Optional[Tuple[str, PersistedEventPosition]]:
|
||||||
|
event_type_clause = ""
|
||||||
|
event_type_args: List[str] = []
|
||||||
|
if event_types is not None and len(event_types) > 0:
|
||||||
|
event_type_clause, event_type_args = make_in_list_sql_clause(
|
||||||
|
txn.database_engine, "type", event_types
|
||||||
|
)
|
||||||
|
event_type_clause = f"AND {event_type_clause}"
|
||||||
|
|
||||||
|
sql = f"""
|
||||||
|
SELECT event_id, stream_ordering, instance_name
|
||||||
|
FROM events
|
||||||
|
LEFT JOIN rejections USING (event_id)
|
||||||
|
WHERE room_id = ?
|
||||||
|
{event_type_clause}
|
||||||
|
AND NOT outlier
|
||||||
|
AND rejections.event_id IS NULL
|
||||||
|
ORDER BY stream_ordering DESC
|
||||||
|
LIMIT 1
|
||||||
|
"""
|
||||||
|
|
||||||
|
txn.execute(
|
||||||
|
sql,
|
||||||
|
[room_id] + event_type_args,
|
||||||
|
)
|
||||||
|
|
||||||
|
row = cast(Optional[Tuple[str, int, str]], txn.fetchone())
|
||||||
|
if row is not None:
|
||||||
|
event_id, stream_ordering, instance_name = row
|
||||||
|
|
||||||
|
return event_id, PersistedEventPosition(
|
||||||
|
# If instance_name is null we default to "master"
|
||||||
|
instance_name or "master",
|
||||||
|
stream_ordering,
|
||||||
|
)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
return await self.db_pool.runInteraction(
|
||||||
|
"get_last_event_pos_in_room",
|
||||||
|
_get_last_event_pos_in_room_txn,
|
||||||
|
)
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
async def get_last_event_pos_in_room_before_stream_ordering(
|
async def get_last_event_pos_in_room_before_stream_ordering(
|
||||||
self,
|
self,
|
||||||
room_id: str,
|
room_id: str,
|
||||||
end_token: RoomStreamToken,
|
end_token: RoomStreamToken,
|
||||||
event_types: Optional[Collection[str]] = None,
|
event_types: Optional[StrCollection] = None,
|
||||||
) -> Optional[Tuple[str, PersistedEventPosition]]:
|
) -> Optional[Tuple[str, PersistedEventPosition]]:
|
||||||
"""
|
"""
|
||||||
Returns the ID and event position of the last event in a room at or before a
|
Returns the ID and event position of the last event in a room at or before a
|
||||||
@@ -1382,7 +1447,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
min_token = end_token.stream
|
min_token = end_token.stream
|
||||||
max_token = end_token.get_max_stream_pos()
|
|
||||||
results: Dict[str, int] = {}
|
results: Dict[str, int] = {}
|
||||||
|
|
||||||
# First, we check for the rooms in the stream change cache to see if we
|
# First, we check for the rooms in the stream change cache to see if we
|
||||||
@@ -1395,49 +1459,22 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||||||
else:
|
else:
|
||||||
missing_room_ids.add(room_id)
|
missing_room_ids.add(room_id)
|
||||||
|
|
||||||
|
if not missing_room_ids:
|
||||||
|
return results
|
||||||
|
|
||||||
# Next, we query the stream position from the DB. At first we fetch all
|
# Next, we query the stream position from the DB. At first we fetch all
|
||||||
# positions less than the *max* stream pos in the token, then filter
|
# positions less than the *max* stream pos in the token, then filter
|
||||||
# them down. We do this as a) this is a cheaper query, and b) the vast
|
# them down. We do this as a) this is a cheaper query, and b) the vast
|
||||||
# majority of rooms will have a latest token from before the min stream
|
# majority of rooms will have a latest token from before the min stream
|
||||||
# pos.
|
# pos.
|
||||||
|
|
||||||
def bulk_get_last_event_pos_txn(
|
uncapped_results = await self._bulk_get_max_event_pos(missing_room_ids)
|
||||||
txn: LoggingTransaction, batch_room_ids: StrCollection
|
|
||||||
) -> Dict[str, int]:
|
|
||||||
# This query fetches the latest stream position in the rooms before
|
|
||||||
# the given max position.
|
|
||||||
clause, args = make_in_list_sql_clause(
|
|
||||||
self.database_engine, "room_id", batch_room_ids
|
|
||||||
)
|
|
||||||
sql = f"""
|
|
||||||
SELECT room_id, (
|
|
||||||
SELECT stream_ordering FROM events AS e
|
|
||||||
LEFT JOIN rejections USING (event_id)
|
|
||||||
WHERE e.room_id = r.room_id
|
|
||||||
AND stream_ordering <= ?
|
|
||||||
AND NOT outlier
|
|
||||||
AND rejection_reason IS NULL
|
|
||||||
ORDER BY stream_ordering DESC
|
|
||||||
LIMIT 1
|
|
||||||
)
|
|
||||||
FROM rooms AS r
|
|
||||||
WHERE {clause}
|
|
||||||
"""
|
|
||||||
txn.execute(sql, [max_token] + args)
|
|
||||||
return {row[0]: row[1] for row in txn}
|
|
||||||
|
|
||||||
recheck_rooms: Set[str] = set()
|
|
||||||
for batched in batch_iter(missing_room_ids, 1000):
|
|
||||||
result = await self.db_pool.runInteraction(
|
|
||||||
"bulk_get_last_event_pos_in_room_before_stream_ordering",
|
|
||||||
bulk_get_last_event_pos_txn,
|
|
||||||
batched,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Check that the stream position for the rooms are from before the
|
# Check that the stream position for the rooms are from before the
|
||||||
# minimum position of the token. If not then we need to fetch more
|
# minimum position of the token. If not then we need to fetch more
|
||||||
# rows.
|
# rows.
|
||||||
for room_id, stream in result.items():
|
recheck_rooms: Set[str] = set()
|
||||||
|
for room_id, stream in uncapped_results.items():
|
||||||
if stream <= min_token:
|
if stream <= min_token:
|
||||||
results[room_id] = stream
|
results[room_id] = stream
|
||||||
else:
|
else:
|
||||||
@@ -1446,49 +1483,80 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||||||
if not recheck_rooms:
|
if not recheck_rooms:
|
||||||
return results
|
return results
|
||||||
|
|
||||||
# For the remaining rooms we need to fetch all rows between the min and
|
for room_id in recheck_rooms:
|
||||||
# max stream positions in the end token, and filter out the rows that
|
result = await self.get_last_event_pos_in_room_before_stream_ordering(
|
||||||
# are after the end token.
|
room_id, end_token
|
||||||
#
|
)
|
||||||
# This query should be fast as the range between the min and max should
|
if result is not None:
|
||||||
# be small.
|
results[room_id] = result[1].stream
|
||||||
|
|
||||||
def bulk_get_last_event_pos_recheck_txn(
|
return results
|
||||||
txn: LoggingTransaction, batch_room_ids: StrCollection
|
|
||||||
|
@cached()
|
||||||
|
async def _get_max_event_pos(self, room_id: str) -> int:
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
@cachedList(cached_method_name="_get_max_event_pos", list_name="room_ids")
|
||||||
|
async def _bulk_get_max_event_pos(
|
||||||
|
self, room_ids: StrCollection
|
||||||
|
) -> Mapping[str, int]:
|
||||||
|
"""Fetch the max position of a persisted event in the room."""
|
||||||
|
|
||||||
|
now_token = self.get_room_max_token()
|
||||||
|
max_pos = now_token.get_max_stream_pos()
|
||||||
|
|
||||||
|
results: Dict[str, int] = {}
|
||||||
|
missing_room_ids: Set[str] = set()
|
||||||
|
for room_id in room_ids:
|
||||||
|
stream_pos = self._events_stream_cache.get_max_pos_of_last_change(room_id)
|
||||||
|
if stream_pos is not None:
|
||||||
|
results[room_id] = stream_pos
|
||||||
|
else:
|
||||||
|
missing_room_ids.add(room_id)
|
||||||
|
|
||||||
|
if not missing_room_ids:
|
||||||
|
return results
|
||||||
|
|
||||||
|
def bulk_get_max_event_pos_txn(
|
||||||
|
txn: LoggingTransaction, batched_room_ids: StrCollection
|
||||||
) -> Dict[str, int]:
|
) -> Dict[str, int]:
|
||||||
clause, args = make_in_list_sql_clause(
|
clause, args = make_in_list_sql_clause(
|
||||||
self.database_engine, "room_id", batch_room_ids
|
self.database_engine, "room_id", batched_room_ids
|
||||||
)
|
)
|
||||||
sql = f"""
|
sql = f"""
|
||||||
SELECT room_id, instance_name, stream_ordering
|
SELECT room_id, (
|
||||||
FROM events
|
SELECT stream_ordering FROM events AS e
|
||||||
WHERE ? < stream_ordering AND stream_ordering <= ?
|
LEFT JOIN rejections USING (event_id)
|
||||||
|
WHERE e.room_id = r.room_id
|
||||||
|
AND e.stream_ordering <= ?
|
||||||
AND NOT outlier
|
AND NOT outlier
|
||||||
AND rejection_reason IS NULL
|
AND rejection_reason IS NULL
|
||||||
AND {clause}
|
ORDER BY stream_ordering DESC
|
||||||
ORDER BY stream_ordering ASC
|
LIMIT 1
|
||||||
"""
|
|
||||||
txn.execute(sql, [min_token, max_token] + args)
|
|
||||||
|
|
||||||
# We take the max stream ordering that is less than the token. Since
|
|
||||||
# we ordered by stream ordering we just need to iterate through and
|
|
||||||
# take the last matching stream ordering.
|
|
||||||
txn_results: Dict[str, int] = {}
|
|
||||||
for row in txn:
|
|
||||||
room_id = row[0]
|
|
||||||
event_pos = PersistedEventPosition(row[1], row[2])
|
|
||||||
if not event_pos.persisted_after(end_token):
|
|
||||||
txn_results[room_id] = event_pos.stream
|
|
||||||
|
|
||||||
return txn_results
|
|
||||||
|
|
||||||
for batched in batch_iter(recheck_rooms, 1000):
|
|
||||||
recheck_result = await self.db_pool.runInteraction(
|
|
||||||
"bulk_get_last_event_pos_in_room_before_stream_ordering_recheck",
|
|
||||||
bulk_get_last_event_pos_recheck_txn,
|
|
||||||
batched,
|
|
||||||
)
|
)
|
||||||
results.update(recheck_result)
|
FROM rooms AS r
|
||||||
|
WHERE {clause}
|
||||||
|
"""
|
||||||
|
txn.execute(sql, [max_pos] + args)
|
||||||
|
return {row[0]: row[1] for row in txn}
|
||||||
|
|
||||||
|
recheck_rooms: Set[str] = set()
|
||||||
|
for batched in batch_iter(room_ids, 1000):
|
||||||
|
batch_results = await self.db_pool.runInteraction(
|
||||||
|
"_bulk_get_max_event_pos", bulk_get_max_event_pos_txn, batched
|
||||||
|
)
|
||||||
|
for room_id, stream_ordering in batch_results.items():
|
||||||
|
if stream_ordering <= now_token.stream:
|
||||||
|
results.update(batch_results)
|
||||||
|
else:
|
||||||
|
recheck_rooms.add(room_id)
|
||||||
|
|
||||||
|
for room_id in recheck_rooms:
|
||||||
|
result = await self.get_last_event_pos_in_room_before_stream_ordering(
|
||||||
|
room_id, now_token
|
||||||
|
)
|
||||||
|
if result is not None:
|
||||||
|
results[room_id] = result[1].stream
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
|||||||
@@ -28,6 +28,11 @@ if TYPE_CHECKING:
|
|||||||
from synapse.storage.database import LoggingDatabaseConnection
|
from synapse.storage.database import LoggingDatabaseConnection
|
||||||
|
|
||||||
|
|
||||||
|
# A string that will be replaced with the appropriate auto increment directive
|
||||||
|
# for the database engine, expands to an auto incrementing integer primary key.
|
||||||
|
AUTO_INCREMENT_PRIMARY_KEYPLACEHOLDER = "$%AUTO_INCREMENT_PRIMARY_KEY%$"
|
||||||
|
|
||||||
|
|
||||||
class IsolationLevel(IntEnum):
|
class IsolationLevel(IntEnum):
|
||||||
READ_COMMITTED: int = 1
|
READ_COMMITTED: int = 1
|
||||||
REPEATABLE_READ: int = 2
|
REPEATABLE_READ: int = 2
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, Any, Mapping, NoReturn, Optional, Tuple, cast
|
|||||||
import psycopg2.extensions
|
import psycopg2.extensions
|
||||||
|
|
||||||
from synapse.storage.engines._base import (
|
from synapse.storage.engines._base import (
|
||||||
|
AUTO_INCREMENT_PRIMARY_KEYPLACEHOLDER,
|
||||||
BaseDatabaseEngine,
|
BaseDatabaseEngine,
|
||||||
IncorrectDatabaseSetup,
|
IncorrectDatabaseSetup,
|
||||||
IsolationLevel,
|
IsolationLevel,
|
||||||
@@ -256,4 +257,10 @@ class PostgresEngine(
|
|||||||
executing the script in its own transaction. The script transaction is
|
executing the script in its own transaction. The script transaction is
|
||||||
left open and it is the responsibility of the caller to commit it.
|
left open and it is the responsibility of the caller to commit it.
|
||||||
"""
|
"""
|
||||||
|
# Replace auto increment placeholder with the appropriate directive
|
||||||
|
script = script.replace(
|
||||||
|
AUTO_INCREMENT_PRIMARY_KEYPLACEHOLDER,
|
||||||
|
"BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY",
|
||||||
|
)
|
||||||
|
|
||||||
cursor.execute(f"COMMIT; BEGIN TRANSACTION; {script}")
|
cursor.execute(f"COMMIT; BEGIN TRANSACTION; {script}")
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ import threading
|
|||||||
from typing import TYPE_CHECKING, Any, List, Mapping, Optional
|
from typing import TYPE_CHECKING, Any, List, Mapping, Optional
|
||||||
|
|
||||||
from synapse.storage.engines import BaseDatabaseEngine
|
from synapse.storage.engines import BaseDatabaseEngine
|
||||||
|
from synapse.storage.engines._base import AUTO_INCREMENT_PRIMARY_KEYPLACEHOLDER
|
||||||
from synapse.storage.types import Cursor
|
from synapse.storage.types import Cursor
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
@@ -168,6 +169,11 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]):
|
|||||||
> first. No other implicit transaction control is performed; any transaction
|
> first. No other implicit transaction control is performed; any transaction
|
||||||
> control must be added to sql_script.
|
> control must be added to sql_script.
|
||||||
"""
|
"""
|
||||||
|
# Replace auto increment placeholder with the appropriate directive
|
||||||
|
script = script.replace(
|
||||||
|
AUTO_INCREMENT_PRIMARY_KEYPLACEHOLDER, "INTEGER PRIMARY KEY AUTOINCREMENT"
|
||||||
|
)
|
||||||
|
|
||||||
# The implementation of `executescript` can be found at
|
# The implementation of `executescript` can be found at
|
||||||
# https://github.com/python/cpython/blob/3.11/Modules/_sqlite/cursor.c#L1035.
|
# https://github.com/python/cpython/blob/3.11/Modules/_sqlite/cursor.c#L1035.
|
||||||
cursor.executescript(f"BEGIN TRANSACTION; {script}")
|
cursor.executescript(f"BEGIN TRANSACTION; {script}")
|
||||||
|
|||||||
@@ -19,7 +19,7 @@
|
|||||||
#
|
#
|
||||||
#
|
#
|
||||||
|
|
||||||
SCHEMA_VERSION = 86 # remember to update the list below when updating
|
SCHEMA_VERSION = 87 # remember to update the list below when updating
|
||||||
"""Represents the expectations made by the codebase about the database schema
|
"""Represents the expectations made by the codebase about the database schema
|
||||||
|
|
||||||
This should be incremented whenever the codebase changes its requirements on the
|
This should be incremented whenever the codebase changes its requirements on the
|
||||||
@@ -142,6 +142,10 @@ Changes in SCHEMA_VERSION = 85
|
|||||||
|
|
||||||
Changes in SCHEMA_VERSION = 86
|
Changes in SCHEMA_VERSION = 86
|
||||||
- Add a column `authenticated` to the tables `local_media_repository` and `remote_media_cache`
|
- Add a column `authenticated` to the tables `local_media_repository` and `remote_media_cache`
|
||||||
|
|
||||||
|
Changes in SCHEMA_VERSION = 87
|
||||||
|
- Add tables to store Sliding Sync data for quick filtering/sorting
|
||||||
|
(`sliding_sync_joined_rooms`, `sliding_sync_membership_snapshots`)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,153 @@
|
|||||||
|
--
|
||||||
|
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||||
|
--
|
||||||
|
-- Copyright (C) 2024 New Vector, Ltd
|
||||||
|
--
|
||||||
|
-- This program is free software: you can redistribute it and/or modify
|
||||||
|
-- it under the terms of the GNU Affero General Public License as
|
||||||
|
-- published by the Free Software Foundation, either version 3 of the
|
||||||
|
-- License, or (at your option) any later version.
|
||||||
|
--
|
||||||
|
-- See the GNU Affero General Public License for more details:
|
||||||
|
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||||
|
|
||||||
|
-- This table is a list/queue used to keep track of which rooms need to be inserted into
|
||||||
|
-- `sliding_sync_joined_rooms`. We do this to avoid reading from `current_state_events`
|
||||||
|
-- during the background update to populate `sliding_sync_joined_rooms` which works but
|
||||||
|
-- it takes a lot of work for the database to grab `DISTINCT` room_ids given how many
|
||||||
|
-- state events there are for each room.
|
||||||
|
CREATE TABLE IF NOT EXISTS sliding_sync_joined_rooms_to_recalculate(
|
||||||
|
room_id TEXT NOT NULL REFERENCES rooms(room_id),
|
||||||
|
PRIMARY KEY (room_id)
|
||||||
|
);
|
||||||
|
|
||||||
|
-- A table for storing room meta data (current state relevant to sliding sync) that the
|
||||||
|
-- local server is still participating in (someone local is joined to the room).
|
||||||
|
--
|
||||||
|
-- We store the joined rooms in separate table from `sliding_sync_membership_snapshots`
|
||||||
|
-- because we need up-to-date information for joined rooms and it can be shared across
|
||||||
|
-- everyone who is joined.
|
||||||
|
--
|
||||||
|
-- This table is kept in sync with `current_state_events` which means if the server is
|
||||||
|
-- no longer participating in a room, the row will be deleted.
|
||||||
|
CREATE TABLE IF NOT EXISTS sliding_sync_joined_rooms(
|
||||||
|
room_id TEXT NOT NULL REFERENCES rooms(room_id),
|
||||||
|
-- The `stream_ordering` of the most-recent/latest event in the room
|
||||||
|
event_stream_ordering BIGINT NOT NULL REFERENCES events(stream_ordering),
|
||||||
|
-- The `stream_ordering` of the last event according to the `bump_event_types`
|
||||||
|
bump_stamp BIGINT,
|
||||||
|
-- `m.room.create` -> `content.type` (current state)
|
||||||
|
--
|
||||||
|
-- Useful for the `spaces`/`not_spaces` filter in the Sliding Sync API
|
||||||
|
room_type TEXT,
|
||||||
|
-- `m.room.name` -> `content.name` (current state)
|
||||||
|
--
|
||||||
|
-- Useful for the room meta data and `room_name_like` filter in the Sliding Sync API
|
||||||
|
room_name TEXT,
|
||||||
|
-- `m.room.encryption` -> `content.algorithm` (current state)
|
||||||
|
--
|
||||||
|
-- Useful for the `is_encrypted` filter in the Sliding Sync API
|
||||||
|
is_encrypted BOOLEAN DEFAULT FALSE NOT NULL,
|
||||||
|
-- `m.room.tombstone` -> `content.replacement_room` (according to the current state at the
|
||||||
|
-- time of the membership).
|
||||||
|
--
|
||||||
|
-- Useful for the `include_old_rooms` functionality in the Sliding Sync API
|
||||||
|
tombstone_successor_room_id TEXT,
|
||||||
|
PRIMARY KEY (room_id)
|
||||||
|
);
|
||||||
|
|
||||||
|
-- So we can purge rooms easily.
|
||||||
|
--
|
||||||
|
-- The primary key is already `room_id`
|
||||||
|
|
||||||
|
-- So we can sort by `stream_ordering
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_joined_rooms_event_stream_ordering ON sliding_sync_joined_rooms(event_stream_ordering);
|
||||||
|
|
||||||
|
-- A table for storing a snapshot of room meta data (historical current state relevant
|
||||||
|
-- for sliding sync) at the time of a local user's membership. Only has rows for the
|
||||||
|
-- latest membership event for a given local user in a room which matches
|
||||||
|
-- `local_current_membership` .
|
||||||
|
--
|
||||||
|
-- We store all memberships including joins. This makes it easy to reference this table
|
||||||
|
-- to find all membership for a given user and shares the same semantics as
|
||||||
|
-- `local_current_membership`. And we get to avoid some table maintenance; if we only
|
||||||
|
-- stored non-joins, we would have to delete the row for the user when the user joins
|
||||||
|
-- the room. Stripped state doesn't include the `m.room.tombstone` event, so we just
|
||||||
|
-- assume that the room doesn't have a tombstone.
|
||||||
|
--
|
||||||
|
-- For remote invite/knocks where the server is not participating in the room, we will
|
||||||
|
-- use stripped state events to populate this table. We assume that if any stripped
|
||||||
|
-- state is given, it will include all possible stripped state events types. For
|
||||||
|
-- example, if stripped state is given but `m.room.encryption` isn't included, we will
|
||||||
|
-- assume that the room is not encrypted.
|
||||||
|
--
|
||||||
|
-- We don't include `bump_stamp` here because we can just use the `stream_ordering` from
|
||||||
|
-- the membership event itself as the `bump_stamp`.
|
||||||
|
CREATE TABLE IF NOT EXISTS sliding_sync_membership_snapshots(
|
||||||
|
room_id TEXT NOT NULL REFERENCES rooms(room_id),
|
||||||
|
user_id TEXT NOT NULL,
|
||||||
|
-- Useful to be able to tell leaves from kicks (where the `user_id` is different from the `sender`)
|
||||||
|
sender TEXT NOT NULL,
|
||||||
|
membership_event_id TEXT NOT NULL REFERENCES events(event_id),
|
||||||
|
membership TEXT NOT NULL,
|
||||||
|
-- This is an integer just to match `room_memberships` and also means we don't need
|
||||||
|
-- to do any casting.
|
||||||
|
forgotten INTEGER DEFAULT 0 NOT NULL,
|
||||||
|
-- `stream_ordering` of the `membership_event_id`
|
||||||
|
event_stream_ordering BIGINT NOT NULL REFERENCES events(stream_ordering),
|
||||||
|
-- For remote invites/knocks that don't include any stripped state, we want to be
|
||||||
|
-- able to distinguish between a room with `None` as valid value for some state and
|
||||||
|
-- room where the state is completely unknown. Basically, this should be True unless
|
||||||
|
-- no stripped state was provided for a remote invite/knock (False).
|
||||||
|
has_known_state BOOLEAN DEFAULT FALSE NOT NULL,
|
||||||
|
-- `m.room.create` -> `content.type` (according to the current state at the time of
|
||||||
|
-- the membership).
|
||||||
|
--
|
||||||
|
-- Useful for the `spaces`/`not_spaces` filter in the Sliding Sync API
|
||||||
|
room_type TEXT,
|
||||||
|
-- `m.room.name` -> `content.name` (according to the current state at the time of
|
||||||
|
-- the membership).
|
||||||
|
--
|
||||||
|
-- Useful for the room meta data and `room_name_like` filter in the Sliding Sync API
|
||||||
|
room_name TEXT,
|
||||||
|
-- `m.room.encryption` -> `content.algorithm` (according to the current state at the
|
||||||
|
-- time of the membership).
|
||||||
|
--
|
||||||
|
-- Useful for the `is_encrypted` filter in the Sliding Sync API
|
||||||
|
is_encrypted BOOLEAN DEFAULT FALSE NOT NULL,
|
||||||
|
-- `m.room.tombstone` -> `content.replacement_room` (according to the current state at the
|
||||||
|
-- time of the membership).
|
||||||
|
--
|
||||||
|
-- Useful for the `include_old_rooms` functionality in the Sliding Sync API
|
||||||
|
tombstone_successor_room_id TEXT,
|
||||||
|
PRIMARY KEY (room_id, user_id)
|
||||||
|
);
|
||||||
|
|
||||||
|
-- So we can purge rooms easily.
|
||||||
|
--
|
||||||
|
-- Since we're using a multi-column index as the primary key (room_id, user_id), the
|
||||||
|
-- first index column (room_id) is always usable for searching so we don't need to
|
||||||
|
-- create a separate index for it.
|
||||||
|
--
|
||||||
|
-- CREATE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_room_id ON sliding_sync_membership_snapshots(room_id);
|
||||||
|
|
||||||
|
-- So we can fetch all rooms for a given user
|
||||||
|
CREATE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_user_id ON sliding_sync_membership_snapshots(user_id);
|
||||||
|
-- So we can sort by `stream_ordering
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_event_stream_ordering ON sliding_sync_membership_snapshots(event_stream_ordering);
|
||||||
|
|
||||||
|
|
||||||
|
-- Add a series of background updates to populate the new `sliding_sync_joined_rooms` table:
|
||||||
|
--
|
||||||
|
-- 1. Add a background update to prefill `sliding_sync_joined_rooms_to_recalculate`.
|
||||||
|
-- We do a one-shot bulk insert from the `rooms` table to prefill.
|
||||||
|
-- 2. Add a background update to populate the new `sliding_sync_joined_rooms` table
|
||||||
|
--
|
||||||
|
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||||
|
(8701, 'sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update', '{}');
|
||||||
|
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
|
||||||
|
(8701, 'sliding_sync_joined_rooms_bg_update', '{}', 'sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update');
|
||||||
|
|
||||||
|
-- Add a background updates to populate the new `sliding_sync_membership_snapshots` table
|
||||||
|
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||||
|
(8701, 'sliding_sync_membership_snapshots_bg_update', '{}');
|
||||||
@@ -0,0 +1,78 @@
|
|||||||
|
--
|
||||||
|
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||||
|
--
|
||||||
|
-- Copyright (C) 2024 New Vector, Ltd
|
||||||
|
--
|
||||||
|
-- This program is free software: you can redistribute it and/or modify
|
||||||
|
-- it under the terms of the GNU Affero General Public License as
|
||||||
|
-- published by the Free Software Foundation, either version 3 of the
|
||||||
|
-- License, or (at your option) any later version.
|
||||||
|
--
|
||||||
|
-- See the GNU Affero General Public License for more details:
|
||||||
|
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||||
|
|
||||||
|
|
||||||
|
-- Table to track active sliding sync connections.
|
||||||
|
--
|
||||||
|
-- A new connection will be created for every sliding sync request without a
|
||||||
|
-- `since` token for a given `conn_id` for a device.#
|
||||||
|
--
|
||||||
|
-- Once a new connection is created and used we delete all other connections for
|
||||||
|
-- the `conn_id`.
|
||||||
|
CREATE TABLE sliding_sync_connections(
|
||||||
|
connection_key $%AUTO_INCREMENT_PRIMARY_KEY%$,
|
||||||
|
user_id TEXT NOT NULL,
|
||||||
|
device_id TEXT NOT NULL,
|
||||||
|
conn_id TEXT NOT NULL,
|
||||||
|
created_ts BIGINT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX sliding_sync_connections_idx ON sliding_sync_connections(user_id, device_id, conn_id);
|
||||||
|
|
||||||
|
-- We track per-connection state by associating changes to the state with
|
||||||
|
-- connection positions. This ensures that we correctly track state even if we
|
||||||
|
-- see retries of requests.
|
||||||
|
--
|
||||||
|
-- If the client starts a "new" connection (by not specifying a since token),
|
||||||
|
-- we'll clear out the other connections (to ensure that we don't end up with
|
||||||
|
-- lots of connection keys).
|
||||||
|
CREATE TABLE sliding_sync_connection_positions(
|
||||||
|
connection_position $%AUTO_INCREMENT_PRIMARY_KEY%$,
|
||||||
|
connection_key BIGINT NOT NULL REFERENCES sliding_sync_connections(connection_key) ON DELETE CASCADE,
|
||||||
|
created_ts BIGINT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX sliding_sync_connection_positions_key ON sliding_sync_connection_positions(connection_key);
|
||||||
|
|
||||||
|
|
||||||
|
-- To save space we deduplicate the `required_state` json by assigning IDs to
|
||||||
|
-- different values.
|
||||||
|
CREATE TABLE sliding_sync_connection_required_state(
|
||||||
|
required_state_id $%AUTO_INCREMENT_PRIMARY_KEY%$,
|
||||||
|
connection_key BIGINT NOT NULL REFERENCES sliding_sync_connections(connection_key) ON DELETE CASCADE,
|
||||||
|
required_state TEXT NOT NULL -- We store this as a json list of event type / state key tuples.
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX sliding_sync_connection_required_state_conn_pos ON sliding_sync_connections(connection_key);
|
||||||
|
|
||||||
|
|
||||||
|
-- Stores the room configs we have seen for rooms in a connection.
|
||||||
|
CREATE TABLE sliding_sync_connection_room_configs(
|
||||||
|
connection_position BIGINT NOT NULL REFERENCES sliding_sync_connection_positions(connection_position) ON DELETE CASCADE,
|
||||||
|
room_id TEXT NOT NULL,
|
||||||
|
timeline_limit BIGINT NOT NULL,
|
||||||
|
required_state_id BIGINT NOT NULL REFERENCES sliding_sync_connection_required_state(required_state_id)
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE UNIQUE INDEX sliding_sync_connection_room_configs_idx ON sliding_sync_connection_room_configs(connection_position, room_id);
|
||||||
|
|
||||||
|
-- Stores what data we have sent for given streams down given connections.
|
||||||
|
CREATE TABLE sliding_sync_connection_streams(
|
||||||
|
connection_position BIGINT NOT NULL REFERENCES sliding_sync_connection_positions(connection_position) ON DELETE CASCADE,
|
||||||
|
stream TEXT NOT NULL, -- e.g. "events" or "receipts"
|
||||||
|
room_id TEXT NOT NULL,
|
||||||
|
room_status TEXT NOT NULL, -- "live" or "previously", i.e. the `HaveSentRoomFlag` value
|
||||||
|
last_position TEXT -- For "previously" the token for the stream we have sent up to.
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE UNIQUE INDEX sliding_sync_connection_streams_idx ON sliding_sync_connection_streams(connection_position, room_id, stream);
|
||||||
@@ -0,0 +1,22 @@
|
|||||||
|
--
|
||||||
|
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||||
|
--
|
||||||
|
-- Copyright (C) 2024 New Vector, Ltd
|
||||||
|
--
|
||||||
|
-- This program is free software: you can redistribute it and/or modify
|
||||||
|
-- it under the terms of the GNU Affero General Public License as
|
||||||
|
-- published by the Free Software Foundation, either version 3 of the
|
||||||
|
-- License, or (at your option) any later version.
|
||||||
|
--
|
||||||
|
-- See the GNU Affero General Public License for more details:
|
||||||
|
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||||
|
|
||||||
|
|
||||||
|
-- Stores the room lists for a connection
|
||||||
|
CREATE TABLE sliding_sync_connection_room_lists(
|
||||||
|
connection_key BIGINT NOT NULL REFERENCES sliding_sync_connections(connection_key) ON DELETE CASCADE,
|
||||||
|
list_name TEXT NOT NULL,
|
||||||
|
room_id TEXT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX sliding_sync_connection_room_lists_idx ON sliding_sync_connection_room_lists(connection_key);
|
||||||
@@ -17,33 +17,9 @@
|
|||||||
# [This file includes modifications made by New Vector Limited]
|
# [This file includes modifications made by New Vector Limited]
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
from enum import Enum
|
|
||||||
from typing import TYPE_CHECKING, Dict, Final, List, Mapping, Optional, Sequence, Tuple
|
|
||||||
|
|
||||||
import attr
|
|
||||||
from typing_extensions import TypedDict
|
|
||||||
|
|
||||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
from typing import List, Optional, TypedDict
|
||||||
|
|
||||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
|
||||||
from pydantic.v1 import Extra
|
|
||||||
else:
|
|
||||||
from pydantic import Extra
|
|
||||||
|
|
||||||
from synapse.events import EventBase
|
|
||||||
from synapse.types import (
|
|
||||||
DeviceListUpdates,
|
|
||||||
JsonDict,
|
|
||||||
JsonMapping,
|
|
||||||
Requester,
|
|
||||||
SlidingSyncStreamToken,
|
|
||||||
StreamToken,
|
|
||||||
UserID,
|
|
||||||
)
|
|
||||||
from synapse.types.rest.client import SlidingSyncBody
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from synapse.handlers.relations import BundledAggregations
|
|
||||||
|
|
||||||
|
|
||||||
class ShutdownRoomParams(TypedDict):
|
class ShutdownRoomParams(TypedDict):
|
||||||
@@ -101,335 +77,3 @@ class ShutdownRoomResponse(TypedDict):
|
|||||||
failed_to_kick_users: List[str]
|
failed_to_kick_users: List[str]
|
||||||
local_aliases: List[str]
|
local_aliases: List[str]
|
||||||
new_room_id: Optional[str]
|
new_room_id: Optional[str]
|
||||||
|
|
||||||
|
|
||||||
class SlidingSyncConfig(SlidingSyncBody):
|
|
||||||
"""
|
|
||||||
Inherit from `SlidingSyncBody` since we need all of the same fields and add a few
|
|
||||||
extra fields that we need in the handler
|
|
||||||
"""
|
|
||||||
|
|
||||||
user: UserID
|
|
||||||
requester: Requester
|
|
||||||
|
|
||||||
# Pydantic config
|
|
||||||
class Config:
|
|
||||||
# By default, ignore fields that we don't recognise.
|
|
||||||
extra = Extra.ignore
|
|
||||||
# By default, don't allow fields to be reassigned after parsing.
|
|
||||||
allow_mutation = False
|
|
||||||
# Allow custom types like `UserID` to be used in the model
|
|
||||||
arbitrary_types_allowed = True
|
|
||||||
|
|
||||||
|
|
||||||
class OperationType(Enum):
|
|
||||||
"""
|
|
||||||
Represents the operation types in a Sliding Sync window.
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
SYNC: Sets a range of entries. Clients SHOULD discard what they previous knew about
|
|
||||||
entries in this range.
|
|
||||||
INSERT: Sets a single entry. If the position is not empty then clients MUST move
|
|
||||||
entries to the left or the right depending on where the closest empty space is.
|
|
||||||
DELETE: Remove a single entry. Often comes before an INSERT to allow entries to move
|
|
||||||
places.
|
|
||||||
INVALIDATE: Remove a range of entries. Clients MAY persist the invalidated range for
|
|
||||||
offline support, but they should be treated as empty when additional operations
|
|
||||||
which concern indexes in the range arrive from the server.
|
|
||||||
"""
|
|
||||||
|
|
||||||
SYNC: Final = "SYNC"
|
|
||||||
INSERT: Final = "INSERT"
|
|
||||||
DELETE: Final = "DELETE"
|
|
||||||
INVALIDATE: Final = "INVALIDATE"
|
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
|
||||||
class SlidingSyncResult:
|
|
||||||
"""
|
|
||||||
The Sliding Sync result to be serialized to JSON for a response.
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
next_pos: The next position token in the sliding window to request (next_batch).
|
|
||||||
lists: Sliding window API. A map of list key to list results.
|
|
||||||
rooms: Room subscription API. A map of room ID to room results.
|
|
||||||
extensions: Extensions API. A map of extension key to extension results.
|
|
||||||
"""
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
|
||||||
class RoomResult:
|
|
||||||
"""
|
|
||||||
Attributes:
|
|
||||||
name: Room name or calculated room name.
|
|
||||||
avatar: Room avatar
|
|
||||||
heroes: List of stripped membership events (containing `user_id` and optionally
|
|
||||||
`avatar_url` and `displayname`) for the users used to calculate the room name.
|
|
||||||
is_dm: Flag to specify whether the room is a direct-message room (most likely
|
|
||||||
between two people).
|
|
||||||
initial: Flag which is set when this is the first time the server is sending this
|
|
||||||
data on this connection. Clients can use this flag to replace or update
|
|
||||||
their local state. When there is an update, servers MUST omit this flag
|
|
||||||
entirely and NOT send "initial":false as this is wasteful on bandwidth. The
|
|
||||||
absence of this flag means 'false'.
|
|
||||||
unstable_expanded_timeline: Flag which is set if we're returning more historic
|
|
||||||
events due to the timeline limit having increased. See "XXX: Odd behavior"
|
|
||||||
comment ing `synapse.handlers.sliding_sync`.
|
|
||||||
required_state: The current state of the room
|
|
||||||
timeline: Latest events in the room. The last event is the most recent.
|
|
||||||
bundled_aggregations: A mapping of event ID to the bundled aggregations for
|
|
||||||
the timeline events above. This allows clients to show accurate reaction
|
|
||||||
counts (or edits, threads), even if some of the reaction events were skipped
|
|
||||||
over in a gappy sync.
|
|
||||||
stripped_state: Stripped state events (for rooms where the usre is
|
|
||||||
invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2,
|
|
||||||
absent on joined/left rooms
|
|
||||||
prev_batch: A token that can be passed as a start parameter to the
|
|
||||||
`/rooms/<room_id>/messages` API to retrieve earlier messages.
|
|
||||||
limited: True if there are more events than `timeline_limit` looking
|
|
||||||
backwards from the `response.pos` to the `request.pos`.
|
|
||||||
num_live: The number of timeline events which have just occurred and are not historical.
|
|
||||||
The last N events are 'live' and should be treated as such. This is mostly
|
|
||||||
useful to determine whether a given @mention event should make a noise or not.
|
|
||||||
Clients cannot rely solely on the absence of `initial: true` to determine live
|
|
||||||
events because if a room not in the sliding window bumps into the window because
|
|
||||||
of an @mention it will have `initial: true` yet contain a single live event
|
|
||||||
(with potentially other old events in the timeline).
|
|
||||||
bump_stamp: The `stream_ordering` of the last event according to the
|
|
||||||
`bump_event_types`. This helps clients sort more readily without them
|
|
||||||
needing to pull in a bunch of the timeline to determine the last activity.
|
|
||||||
`bump_event_types` is a thing because for example, we don't want display
|
|
||||||
name changes to mark the room as unread and bump it to the top. For
|
|
||||||
encrypted rooms, we just have to consider any activity as a bump because we
|
|
||||||
can't see the content and the client has to figure it out for themselves.
|
|
||||||
joined_count: The number of users with membership of join, including the client's
|
|
||||||
own user ID. (same as sync `v2 m.joined_member_count`)
|
|
||||||
invited_count: The number of users with membership of invite. (same as sync v2
|
|
||||||
`m.invited_member_count`)
|
|
||||||
notification_count: The total number of unread notifications for this room. (same
|
|
||||||
as sync v2)
|
|
||||||
highlight_count: The number of unread notifications for this room with the highlight
|
|
||||||
flag set. (same as sync v2)
|
|
||||||
"""
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
|
||||||
class StrippedHero:
|
|
||||||
user_id: str
|
|
||||||
display_name: Optional[str]
|
|
||||||
avatar_url: Optional[str]
|
|
||||||
|
|
||||||
name: Optional[str]
|
|
||||||
avatar: Optional[str]
|
|
||||||
heroes: Optional[List[StrippedHero]]
|
|
||||||
is_dm: bool
|
|
||||||
initial: bool
|
|
||||||
unstable_expanded_timeline: bool
|
|
||||||
# Should be empty for invite/knock rooms with `stripped_state`
|
|
||||||
required_state: List[EventBase]
|
|
||||||
# Should be empty for invite/knock rooms with `stripped_state`
|
|
||||||
timeline_events: List[EventBase]
|
|
||||||
bundled_aggregations: Optional[Dict[str, "BundledAggregations"]]
|
|
||||||
# Optional because it's only relevant to invite/knock rooms
|
|
||||||
stripped_state: List[JsonDict]
|
|
||||||
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
|
|
||||||
prev_batch: Optional[StreamToken]
|
|
||||||
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
|
|
||||||
limited: Optional[bool]
|
|
||||||
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
|
|
||||||
num_live: Optional[int]
|
|
||||||
bump_stamp: int
|
|
||||||
joined_count: int
|
|
||||||
invited_count: int
|
|
||||||
notification_count: int
|
|
||||||
highlight_count: int
|
|
||||||
|
|
||||||
def __bool__(self) -> bool:
|
|
||||||
return (
|
|
||||||
# If this is the first time the client is seeing the room, we should not filter it out
|
|
||||||
# under any circumstance.
|
|
||||||
self.initial
|
|
||||||
# We need to let the client know if there are any new events
|
|
||||||
or bool(self.required_state)
|
|
||||||
or bool(self.timeline_events)
|
|
||||||
or bool(self.stripped_state)
|
|
||||||
)
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
|
||||||
class SlidingWindowList:
|
|
||||||
"""
|
|
||||||
Attributes:
|
|
||||||
count: The total number of entries in the list. Always present if this list
|
|
||||||
is.
|
|
||||||
ops: The sliding list operations to perform.
|
|
||||||
"""
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
|
||||||
class Operation:
|
|
||||||
"""
|
|
||||||
Attributes:
|
|
||||||
op: The operation type to perform.
|
|
||||||
range: Which index positions are affected by this operation. These are
|
|
||||||
both inclusive.
|
|
||||||
room_ids: Which room IDs are affected by this operation. These IDs match
|
|
||||||
up to the positions in the `range`, so the last room ID in this list
|
|
||||||
matches the 9th index. The room data is held in a separate object.
|
|
||||||
"""
|
|
||||||
|
|
||||||
op: OperationType
|
|
||||||
range: Tuple[int, int]
|
|
||||||
room_ids: List[str]
|
|
||||||
|
|
||||||
count: int
|
|
||||||
ops: List[Operation]
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
|
||||||
class Extensions:
|
|
||||||
"""Responses for extensions
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
to_device: The to-device extension (MSC3885)
|
|
||||||
e2ee: The E2EE device extension (MSC3884)
|
|
||||||
"""
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
|
||||||
class ToDeviceExtension:
|
|
||||||
"""The to-device extension (MSC3885)
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
next_batch: The to-device stream token the client should use
|
|
||||||
to get more results
|
|
||||||
events: A list of to-device messages for the client
|
|
||||||
"""
|
|
||||||
|
|
||||||
next_batch: str
|
|
||||||
events: Sequence[JsonMapping]
|
|
||||||
|
|
||||||
def __bool__(self) -> bool:
|
|
||||||
return bool(self.events)
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
|
||||||
class E2eeExtension:
|
|
||||||
"""The E2EE device extension (MSC3884)
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
device_list_updates: List of user_ids whose devices have changed or left (only
|
|
||||||
present on incremental syncs).
|
|
||||||
device_one_time_keys_count: Map from key algorithm to the number of
|
|
||||||
unclaimed one-time keys currently held on the server for this device. If
|
|
||||||
an algorithm is unlisted, the count for that algorithm is assumed to be
|
|
||||||
zero. If this entire parameter is missing, the count for all algorithms
|
|
||||||
is assumed to be zero.
|
|
||||||
device_unused_fallback_key_types: List of unused fallback key algorithms
|
|
||||||
for this device.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Only present on incremental syncs
|
|
||||||
device_list_updates: Optional[DeviceListUpdates]
|
|
||||||
device_one_time_keys_count: Mapping[str, int]
|
|
||||||
device_unused_fallback_key_types: Sequence[str]
|
|
||||||
|
|
||||||
def __bool__(self) -> bool:
|
|
||||||
# Note that "signed_curve25519" is always returned in key count responses
|
|
||||||
# regardless of whether we uploaded any keys for it. This is necessary until
|
|
||||||
# https://github.com/matrix-org/matrix-doc/issues/3298 is fixed.
|
|
||||||
#
|
|
||||||
# Also related:
|
|
||||||
# https://github.com/element-hq/element-android/issues/3725 and
|
|
||||||
# https://github.com/matrix-org/synapse/issues/10456
|
|
||||||
default_otk = self.device_one_time_keys_count.get("signed_curve25519")
|
|
||||||
more_than_default_otk = len(self.device_one_time_keys_count) > 1 or (
|
|
||||||
default_otk is not None and default_otk > 0
|
|
||||||
)
|
|
||||||
|
|
||||||
return bool(
|
|
||||||
more_than_default_otk
|
|
||||||
or self.device_list_updates
|
|
||||||
or self.device_unused_fallback_key_types
|
|
||||||
)
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
|
||||||
class AccountDataExtension:
|
|
||||||
"""The Account Data extension (MSC3959)
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
global_account_data_map: Mapping from `type` to `content` of global account
|
|
||||||
data events.
|
|
||||||
account_data_by_room_map: Mapping from room_id to mapping of `type` to
|
|
||||||
`content` of room account data events.
|
|
||||||
"""
|
|
||||||
|
|
||||||
global_account_data_map: Mapping[str, JsonMapping]
|
|
||||||
account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]]
|
|
||||||
|
|
||||||
def __bool__(self) -> bool:
|
|
||||||
return bool(
|
|
||||||
self.global_account_data_map or self.account_data_by_room_map
|
|
||||||
)
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
|
||||||
class ReceiptsExtension:
|
|
||||||
"""The Receipts extension (MSC3960)
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
room_id_to_receipt_map: Mapping from room_id to `m.receipt` ephemeral
|
|
||||||
event (type, content)
|
|
||||||
"""
|
|
||||||
|
|
||||||
room_id_to_receipt_map: Mapping[str, JsonMapping]
|
|
||||||
|
|
||||||
def __bool__(self) -> bool:
|
|
||||||
return bool(self.room_id_to_receipt_map)
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
|
||||||
class TypingExtension:
|
|
||||||
"""The Typing Notification extension (MSC3961)
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
room_id_to_typing_map: Mapping from room_id to `m.typing` ephemeral
|
|
||||||
event (type, content)
|
|
||||||
"""
|
|
||||||
|
|
||||||
room_id_to_typing_map: Mapping[str, JsonMapping]
|
|
||||||
|
|
||||||
def __bool__(self) -> bool:
|
|
||||||
return bool(self.room_id_to_typing_map)
|
|
||||||
|
|
||||||
to_device: Optional[ToDeviceExtension] = None
|
|
||||||
e2ee: Optional[E2eeExtension] = None
|
|
||||||
account_data: Optional[AccountDataExtension] = None
|
|
||||||
receipts: Optional[ReceiptsExtension] = None
|
|
||||||
typing: Optional[TypingExtension] = None
|
|
||||||
|
|
||||||
def __bool__(self) -> bool:
|
|
||||||
return bool(
|
|
||||||
self.to_device
|
|
||||||
or self.e2ee
|
|
||||||
or self.account_data
|
|
||||||
or self.receipts
|
|
||||||
or self.typing
|
|
||||||
)
|
|
||||||
|
|
||||||
next_pos: SlidingSyncStreamToken
|
|
||||||
lists: Dict[str, SlidingWindowList]
|
|
||||||
rooms: Dict[str, RoomResult]
|
|
||||||
extensions: Extensions
|
|
||||||
|
|
||||||
def __bool__(self) -> bool:
|
|
||||||
"""Make the result appear empty if there are no updates. This is used
|
|
||||||
to tell if the notifier needs to wait for more events when polling for
|
|
||||||
events.
|
|
||||||
"""
|
|
||||||
# We don't include `self.lists` here, as a) `lists` is always non-empty even if
|
|
||||||
# there are no changes, and b) since we're sorting rooms by `stream_ordering` of
|
|
||||||
# the latest activity, anything that would cause the order to change would end
|
|
||||||
# up in `self.rooms` and cause us to send down the change.
|
|
||||||
return bool(self.rooms or self.extensions)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
|
|
||||||
"Return a new empty result"
|
|
||||||
return SlidingSyncResult(
|
|
||||||
next_pos=next_pos,
|
|
||||||
lists={},
|
|
||||||
rooms={},
|
|
||||||
extensions=SlidingSyncResult.Extensions(),
|
|
||||||
)
|
|
||||||
|
|||||||
@@ -18,29 +18,393 @@ from collections import ChainMap
|
|||||||
from enum import Enum
|
from enum import Enum
|
||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
|
AbstractSet,
|
||||||
Callable,
|
Callable,
|
||||||
Dict,
|
Dict,
|
||||||
Final,
|
Final,
|
||||||
Generic,
|
Generic,
|
||||||
|
List,
|
||||||
Mapping,
|
Mapping,
|
||||||
MutableMapping,
|
MutableMapping,
|
||||||
Optional,
|
Optional,
|
||||||
|
Sequence,
|
||||||
Set,
|
Set,
|
||||||
|
Tuple,
|
||||||
TypeVar,
|
TypeVar,
|
||||||
cast,
|
cast,
|
||||||
)
|
)
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
|
||||||
|
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.types import MultiWriterStreamToken, RoomStreamToken, StrCollection, UserID
|
from synapse.types import MultiWriterStreamToken, RoomStreamToken, StrCollection, UserID
|
||||||
from synapse.types.handlers import SlidingSyncConfig
|
|
||||||
|
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||||
|
from pydantic.v1 import Extra
|
||||||
|
else:
|
||||||
|
from pydantic import Extra
|
||||||
|
|
||||||
|
from synapse.events import EventBase
|
||||||
|
from synapse.types import (
|
||||||
|
DeviceListUpdates,
|
||||||
|
JsonDict,
|
||||||
|
JsonMapping,
|
||||||
|
Requester,
|
||||||
|
SlidingSyncStreamToken,
|
||||||
|
StreamToken,
|
||||||
|
)
|
||||||
|
from synapse.types.rest.client import SlidingSyncBody
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
pass
|
from synapse.handlers.relations import BundledAggregations
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Sliding Sync: The event types that clients should consider as new activity and affect
|
||||||
|
# the `bump_stamp`
|
||||||
|
SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES = {
|
||||||
|
EventTypes.Create,
|
||||||
|
EventTypes.Message,
|
||||||
|
EventTypes.Encrypted,
|
||||||
|
EventTypes.Sticker,
|
||||||
|
EventTypes.CallInvite,
|
||||||
|
EventTypes.PollStart,
|
||||||
|
EventTypes.LiveLocationShareStart,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class SlidingSyncConfig(SlidingSyncBody):
|
||||||
|
"""
|
||||||
|
Inherit from `SlidingSyncBody` since we need all of the same fields and add a few
|
||||||
|
extra fields that we need in the handler
|
||||||
|
"""
|
||||||
|
|
||||||
|
user: UserID
|
||||||
|
requester: Requester
|
||||||
|
|
||||||
|
# Pydantic config
|
||||||
|
class Config:
|
||||||
|
# By default, ignore fields that we don't recognise.
|
||||||
|
extra = Extra.ignore
|
||||||
|
# By default, don't allow fields to be reassigned after parsing.
|
||||||
|
allow_mutation = False
|
||||||
|
# Allow custom types like `UserID` to be used in the model
|
||||||
|
arbitrary_types_allowed = True
|
||||||
|
|
||||||
|
|
||||||
|
class OperationType(Enum):
|
||||||
|
"""
|
||||||
|
Represents the operation types in a Sliding Sync window.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
SYNC: Sets a range of entries. Clients SHOULD discard what they previous knew about
|
||||||
|
entries in this range.
|
||||||
|
INSERT: Sets a single entry. If the position is not empty then clients MUST move
|
||||||
|
entries to the left or the right depending on where the closest empty space is.
|
||||||
|
DELETE: Remove a single entry. Often comes before an INSERT to allow entries to move
|
||||||
|
places.
|
||||||
|
INVALIDATE: Remove a range of entries. Clients MAY persist the invalidated range for
|
||||||
|
offline support, but they should be treated as empty when additional operations
|
||||||
|
which concern indexes in the range arrive from the server.
|
||||||
|
"""
|
||||||
|
|
||||||
|
SYNC: Final = "SYNC"
|
||||||
|
INSERT: Final = "INSERT"
|
||||||
|
DELETE: Final = "DELETE"
|
||||||
|
INVALIDATE: Final = "INVALIDATE"
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class SlidingSyncResult:
|
||||||
|
"""
|
||||||
|
The Sliding Sync result to be serialized to JSON for a response.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
next_pos: The next position token in the sliding window to request (next_batch).
|
||||||
|
lists: Sliding window API. A map of list key to list results.
|
||||||
|
rooms: Room subscription API. A map of room ID to room results.
|
||||||
|
extensions: Extensions API. A map of extension key to extension results.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class RoomResult:
|
||||||
|
"""
|
||||||
|
Attributes:
|
||||||
|
name: Room name or calculated room name.
|
||||||
|
avatar: Room avatar
|
||||||
|
heroes: List of stripped membership events (containing `user_id` and optionally
|
||||||
|
`avatar_url` and `displayname`) for the users used to calculate the room name.
|
||||||
|
is_dm: Flag to specify whether the room is a direct-message room (most likely
|
||||||
|
between two people).
|
||||||
|
initial: Flag which is set when this is the first time the server is sending this
|
||||||
|
data on this connection. Clients can use this flag to replace or update
|
||||||
|
their local state. When there is an update, servers MUST omit this flag
|
||||||
|
entirely and NOT send "initial":false as this is wasteful on bandwidth. The
|
||||||
|
absence of this flag means 'false'.
|
||||||
|
unstable_expanded_timeline: Flag which is set if we're returning more historic
|
||||||
|
events due to the timeline limit having increased. See "XXX: Odd behavior"
|
||||||
|
comment ing `synapse.handlers.sliding_sync`.
|
||||||
|
required_state: The current state of the room
|
||||||
|
timeline: Latest events in the room. The last event is the most recent.
|
||||||
|
bundled_aggregations: A mapping of event ID to the bundled aggregations for
|
||||||
|
the timeline events above. This allows clients to show accurate reaction
|
||||||
|
counts (or edits, threads), even if some of the reaction events were skipped
|
||||||
|
over in a gappy sync.
|
||||||
|
stripped_state: Stripped state events (for rooms where the usre is
|
||||||
|
invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2,
|
||||||
|
absent on joined/left rooms
|
||||||
|
prev_batch: A token that can be passed as a start parameter to the
|
||||||
|
`/rooms/<room_id>/messages` API to retrieve earlier messages.
|
||||||
|
limited: True if there are more events than `timeline_limit` looking
|
||||||
|
backwards from the `response.pos` to the `request.pos`.
|
||||||
|
num_live: The number of timeline events which have just occurred and are not historical.
|
||||||
|
The last N events are 'live' and should be treated as such. This is mostly
|
||||||
|
useful to determine whether a given @mention event should make a noise or not.
|
||||||
|
Clients cannot rely solely on the absence of `initial: true` to determine live
|
||||||
|
events because if a room not in the sliding window bumps into the window because
|
||||||
|
of an @mention it will have `initial: true` yet contain a single live event
|
||||||
|
(with potentially other old events in the timeline).
|
||||||
|
bump_stamp: The `stream_ordering` of the last event according to the
|
||||||
|
`bump_event_types`. This helps clients sort more readily without them
|
||||||
|
needing to pull in a bunch of the timeline to determine the last activity.
|
||||||
|
`bump_event_types` is a thing because for example, we don't want display
|
||||||
|
name changes to mark the room as unread and bump it to the top. For
|
||||||
|
encrypted rooms, we just have to consider any activity as a bump because we
|
||||||
|
can't see the content and the client has to figure it out for themselves.
|
||||||
|
joined_count: The number of users with membership of join, including the client's
|
||||||
|
own user ID. (same as sync `v2 m.joined_member_count`)
|
||||||
|
invited_count: The number of users with membership of invite. (same as sync v2
|
||||||
|
`m.invited_member_count`)
|
||||||
|
notification_count: The total number of unread notifications for this room. (same
|
||||||
|
as sync v2)
|
||||||
|
highlight_count: The number of unread notifications for this room with the highlight
|
||||||
|
flag set. (same as sync v2)
|
||||||
|
"""
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class StrippedHero:
|
||||||
|
user_id: str
|
||||||
|
display_name: Optional[str]
|
||||||
|
avatar_url: Optional[str]
|
||||||
|
|
||||||
|
name: Optional[str]
|
||||||
|
avatar: Optional[str]
|
||||||
|
heroes: Optional[List[StrippedHero]]
|
||||||
|
is_dm: bool
|
||||||
|
initial: bool
|
||||||
|
unstable_expanded_timeline: bool
|
||||||
|
# Should be empty for invite/knock rooms with `stripped_state`
|
||||||
|
required_state: List[EventBase]
|
||||||
|
# Should be empty for invite/knock rooms with `stripped_state`
|
||||||
|
timeline_events: List[EventBase]
|
||||||
|
bundled_aggregations: Optional[Dict[str, "BundledAggregations"]]
|
||||||
|
# Optional because it's only relevant to invite/knock rooms
|
||||||
|
stripped_state: List[JsonDict]
|
||||||
|
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
|
||||||
|
prev_batch: Optional[StreamToken]
|
||||||
|
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
|
||||||
|
limited: Optional[bool]
|
||||||
|
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
|
||||||
|
num_live: Optional[int]
|
||||||
|
bump_stamp: int
|
||||||
|
joined_count: int
|
||||||
|
invited_count: int
|
||||||
|
notification_count: int
|
||||||
|
highlight_count: int
|
||||||
|
|
||||||
|
def __bool__(self) -> bool:
|
||||||
|
return (
|
||||||
|
# If this is the first time the client is seeing the room, we should not filter it out
|
||||||
|
# under any circumstance.
|
||||||
|
self.initial
|
||||||
|
# We need to let the client know if there are any new events
|
||||||
|
or bool(self.required_state)
|
||||||
|
or bool(self.timeline_events)
|
||||||
|
or bool(self.stripped_state)
|
||||||
|
)
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class SlidingWindowList:
|
||||||
|
"""
|
||||||
|
Attributes:
|
||||||
|
count: The total number of entries in the list. Always present if this list
|
||||||
|
is.
|
||||||
|
ops: The sliding list operations to perform.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class Operation:
|
||||||
|
"""
|
||||||
|
Attributes:
|
||||||
|
op: The operation type to perform.
|
||||||
|
range: Which index positions are affected by this operation. These are
|
||||||
|
both inclusive.
|
||||||
|
room_ids: Which room IDs are affected by this operation. These IDs match
|
||||||
|
up to the positions in the `range`, so the last room ID in this list
|
||||||
|
matches the 9th index. The room data is held in a separate object.
|
||||||
|
"""
|
||||||
|
|
||||||
|
op: OperationType
|
||||||
|
range: Tuple[int, int]
|
||||||
|
room_ids: List[str]
|
||||||
|
|
||||||
|
count: int
|
||||||
|
ops: List[Operation]
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class Extensions:
|
||||||
|
"""Responses for extensions
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
to_device: The to-device extension (MSC3885)
|
||||||
|
e2ee: The E2EE device extension (MSC3884)
|
||||||
|
"""
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class ToDeviceExtension:
|
||||||
|
"""The to-device extension (MSC3885)
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
next_batch: The to-device stream token the client should use
|
||||||
|
to get more results
|
||||||
|
events: A list of to-device messages for the client
|
||||||
|
"""
|
||||||
|
|
||||||
|
next_batch: str
|
||||||
|
events: Sequence[JsonMapping]
|
||||||
|
|
||||||
|
def __bool__(self) -> bool:
|
||||||
|
return bool(self.events)
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class E2eeExtension:
|
||||||
|
"""The E2EE device extension (MSC3884)
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
device_list_updates: List of user_ids whose devices have changed or left (only
|
||||||
|
present on incremental syncs).
|
||||||
|
device_one_time_keys_count: Map from key algorithm to the number of
|
||||||
|
unclaimed one-time keys currently held on the server for this device. If
|
||||||
|
an algorithm is unlisted, the count for that algorithm is assumed to be
|
||||||
|
zero. If this entire parameter is missing, the count for all algorithms
|
||||||
|
is assumed to be zero.
|
||||||
|
device_unused_fallback_key_types: List of unused fallback key algorithms
|
||||||
|
for this device.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Only present on incremental syncs
|
||||||
|
device_list_updates: Optional[DeviceListUpdates]
|
||||||
|
device_one_time_keys_count: Mapping[str, int]
|
||||||
|
device_unused_fallback_key_types: Sequence[str]
|
||||||
|
|
||||||
|
def __bool__(self) -> bool:
|
||||||
|
# Note that "signed_curve25519" is always returned in key count responses
|
||||||
|
# regardless of whether we uploaded any keys for it. This is necessary until
|
||||||
|
# https://github.com/matrix-org/matrix-doc/issues/3298 is fixed.
|
||||||
|
#
|
||||||
|
# Also related:
|
||||||
|
# https://github.com/element-hq/element-android/issues/3725 and
|
||||||
|
# https://github.com/matrix-org/synapse/issues/10456
|
||||||
|
default_otk = self.device_one_time_keys_count.get("signed_curve25519")
|
||||||
|
more_than_default_otk = len(self.device_one_time_keys_count) > 1 or (
|
||||||
|
default_otk is not None and default_otk > 0
|
||||||
|
)
|
||||||
|
|
||||||
|
return bool(
|
||||||
|
more_than_default_otk
|
||||||
|
or self.device_list_updates
|
||||||
|
or self.device_unused_fallback_key_types
|
||||||
|
)
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class AccountDataExtension:
|
||||||
|
"""The Account Data extension (MSC3959)
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
global_account_data_map: Mapping from `type` to `content` of global account
|
||||||
|
data events.
|
||||||
|
account_data_by_room_map: Mapping from room_id to mapping of `type` to
|
||||||
|
`content` of room account data events.
|
||||||
|
"""
|
||||||
|
|
||||||
|
global_account_data_map: Mapping[str, JsonMapping]
|
||||||
|
account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]]
|
||||||
|
|
||||||
|
def __bool__(self) -> bool:
|
||||||
|
return bool(
|
||||||
|
self.global_account_data_map or self.account_data_by_room_map
|
||||||
|
)
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class ReceiptsExtension:
|
||||||
|
"""The Receipts extension (MSC3960)
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
room_id_to_receipt_map: Mapping from room_id to `m.receipt` ephemeral
|
||||||
|
event (type, content)
|
||||||
|
"""
|
||||||
|
|
||||||
|
room_id_to_receipt_map: Mapping[str, JsonMapping]
|
||||||
|
|
||||||
|
def __bool__(self) -> bool:
|
||||||
|
return bool(self.room_id_to_receipt_map)
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class TypingExtension:
|
||||||
|
"""The Typing Notification extension (MSC3961)
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
room_id_to_typing_map: Mapping from room_id to `m.typing` ephemeral
|
||||||
|
event (type, content)
|
||||||
|
"""
|
||||||
|
|
||||||
|
room_id_to_typing_map: Mapping[str, JsonMapping]
|
||||||
|
|
||||||
|
def __bool__(self) -> bool:
|
||||||
|
return bool(self.room_id_to_typing_map)
|
||||||
|
|
||||||
|
to_device: Optional[ToDeviceExtension] = None
|
||||||
|
e2ee: Optional[E2eeExtension] = None
|
||||||
|
account_data: Optional[AccountDataExtension] = None
|
||||||
|
receipts: Optional[ReceiptsExtension] = None
|
||||||
|
typing: Optional[TypingExtension] = None
|
||||||
|
|
||||||
|
def __bool__(self) -> bool:
|
||||||
|
return bool(
|
||||||
|
self.to_device
|
||||||
|
or self.e2ee
|
||||||
|
or self.account_data
|
||||||
|
or self.receipts
|
||||||
|
or self.typing
|
||||||
|
)
|
||||||
|
|
||||||
|
next_pos: SlidingSyncStreamToken
|
||||||
|
lists: Dict[str, SlidingWindowList]
|
||||||
|
rooms: Dict[str, RoomResult]
|
||||||
|
extensions: Extensions
|
||||||
|
|
||||||
|
def __bool__(self) -> bool:
|
||||||
|
"""Make the result appear empty if there are no updates. This is used
|
||||||
|
to tell if the notifier needs to wait for more events when polling for
|
||||||
|
events.
|
||||||
|
"""
|
||||||
|
# We don't include `self.lists` here, as a) `lists` is always non-empty even if
|
||||||
|
# there are no changes, and b) since we're sorting rooms by `stream_ordering` of
|
||||||
|
# the latest activity, anything that would cause the order to change would end
|
||||||
|
# up in `self.rooms` and cause us to send down the change.
|
||||||
|
return bool(self.rooms or self.extensions)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
|
||||||
|
"Return a new empty result"
|
||||||
|
return SlidingSyncResult(
|
||||||
|
next_pos=next_pos,
|
||||||
|
lists={},
|
||||||
|
rooms={},
|
||||||
|
extensions=SlidingSyncResult.Extensions(),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class StateValues:
|
class StateValues:
|
||||||
"""
|
"""
|
||||||
@@ -60,7 +424,7 @@ class StateValues:
|
|||||||
|
|
||||||
# We can't freeze this class because we want to update it in place with the
|
# We can't freeze this class because we want to update it in place with the
|
||||||
# de-duplicated data.
|
# de-duplicated data.
|
||||||
@attr.s(slots=True, auto_attribs=True)
|
@attr.s(slots=True, auto_attribs=True, frozen=True)
|
||||||
class RoomSyncConfig:
|
class RoomSyncConfig:
|
||||||
"""
|
"""
|
||||||
Holds the config for what data we should fetch for a room in the sync response.
|
Holds the config for what data we should fetch for a room in the sync response.
|
||||||
@@ -74,7 +438,7 @@ class RoomSyncConfig:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
timeline_limit: int
|
timeline_limit: int
|
||||||
required_state_map: Dict[str, Set[str]]
|
required_state_map: Mapping[str, AbstractSet[str]]
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_room_config(
|
def from_room_config(
|
||||||
@@ -148,7 +512,7 @@ class RoomSyncConfig:
|
|||||||
|
|
||||||
def deep_copy(self) -> "RoomSyncConfig":
|
def deep_copy(self) -> "RoomSyncConfig":
|
||||||
required_state_map: Dict[str, Set[str]] = {
|
required_state_map: Dict[str, Set[str]] = {
|
||||||
state_type: state_key_set.copy()
|
state_type: set(state_key_set)
|
||||||
for state_type, state_key_set in self.required_state_map.items()
|
for state_type, state_key_set in self.required_state_map.items()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -159,14 +523,20 @@ class RoomSyncConfig:
|
|||||||
|
|
||||||
def combine_room_sync_config(
|
def combine_room_sync_config(
|
||||||
self, other_room_sync_config: "RoomSyncConfig"
|
self, other_room_sync_config: "RoomSyncConfig"
|
||||||
) -> None:
|
) -> "RoomSyncConfig":
|
||||||
"""
|
"""
|
||||||
Combine this `RoomSyncConfig` with another `RoomSyncConfig` and take the
|
Combine this `RoomSyncConfig` with another `RoomSyncConfig` and return the
|
||||||
superset union of the two.
|
superset union of the two.
|
||||||
"""
|
"""
|
||||||
|
timeline_limit = self.timeline_limit
|
||||||
|
required_state_map = {
|
||||||
|
event_type: set(state_keys)
|
||||||
|
for event_type, state_keys in self.required_state_map.items()
|
||||||
|
}
|
||||||
|
|
||||||
# Take the highest timeline limit
|
# Take the highest timeline limit
|
||||||
if self.timeline_limit < other_room_sync_config.timeline_limit:
|
if self.timeline_limit < other_room_sync_config.timeline_limit:
|
||||||
self.timeline_limit = other_room_sync_config.timeline_limit
|
timeline_limit = other_room_sync_config.timeline_limit
|
||||||
|
|
||||||
# Union the required state
|
# Union the required state
|
||||||
for (
|
for (
|
||||||
@@ -175,14 +545,14 @@ class RoomSyncConfig:
|
|||||||
) in other_room_sync_config.required_state_map.items():
|
) in other_room_sync_config.required_state_map.items():
|
||||||
# If we already have a wildcard for everything, we don't need to add
|
# If we already have a wildcard for everything, we don't need to add
|
||||||
# anything else
|
# anything else
|
||||||
if StateValues.WILDCARD in self.required_state_map.get(
|
if StateValues.WILDCARD in required_state_map.get(
|
||||||
StateValues.WILDCARD, set()
|
StateValues.WILDCARD, set()
|
||||||
):
|
):
|
||||||
break
|
break
|
||||||
|
|
||||||
# If we already have a wildcard `state_key` for this `state_type`, we don't need
|
# If we already have a wildcard `state_key` for this `state_type`, we don't need
|
||||||
# to add anything else
|
# to add anything else
|
||||||
if StateValues.WILDCARD in self.required_state_map.get(state_type, set()):
|
if StateValues.WILDCARD in required_state_map.get(state_type, set()):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# If we're getting wildcards for the `state_type` and `state_key`, that's
|
# If we're getting wildcards for the `state_type` and `state_key`, that's
|
||||||
@@ -191,16 +561,14 @@ class RoomSyncConfig:
|
|||||||
state_type == StateValues.WILDCARD
|
state_type == StateValues.WILDCARD
|
||||||
and StateValues.WILDCARD in state_key_set
|
and StateValues.WILDCARD in state_key_set
|
||||||
):
|
):
|
||||||
self.required_state_map = {state_type: {StateValues.WILDCARD}}
|
required_state_map = {state_type: {StateValues.WILDCARD}}
|
||||||
# We can break, since we don't need to add anything else
|
# We can break, since we don't need to add anything else
|
||||||
break
|
break
|
||||||
|
|
||||||
for state_key in state_key_set:
|
for state_key in state_key_set:
|
||||||
# If we already have a wildcard for this specific `state_key`, we don't need
|
# If we already have a wildcard for this specific `state_key`, we don't need
|
||||||
# to add it since the wildcard already covers it.
|
# to add it since the wildcard already covers it.
|
||||||
if state_key in self.required_state_map.get(
|
if state_key in required_state_map.get(StateValues.WILDCARD, set()):
|
||||||
StateValues.WILDCARD, set()
|
|
||||||
):
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# If we're getting a wildcard for the `state_type`, get rid of any other
|
# If we're getting a wildcard for the `state_type`, get rid of any other
|
||||||
@@ -211,7 +579,7 @@ class RoomSyncConfig:
|
|||||||
# Make a copy so we don't run into an error: `dictionary changed size
|
# Make a copy so we don't run into an error: `dictionary changed size
|
||||||
# during iteration`, when we remove items
|
# during iteration`, when we remove items
|
||||||
for existing_state_type, existing_state_key_set in list(
|
for existing_state_type, existing_state_key_set in list(
|
||||||
self.required_state_map.items()
|
required_state_map.items()
|
||||||
):
|
):
|
||||||
# Make a copy so we don't run into an error: `Set changed size during
|
# Make a copy so we don't run into an error: `Set changed size during
|
||||||
# iteration`, when we filter out and remove items
|
# iteration`, when we filter out and remove items
|
||||||
@@ -221,19 +589,21 @@ class RoomSyncConfig:
|
|||||||
|
|
||||||
# If we've the left the `set()` empty, remove it from the map
|
# If we've the left the `set()` empty, remove it from the map
|
||||||
if existing_state_key_set == set():
|
if existing_state_key_set == set():
|
||||||
self.required_state_map.pop(existing_state_type, None)
|
required_state_map.pop(existing_state_type, None)
|
||||||
|
|
||||||
# If we're getting a wildcard `state_key`, get rid of any other state_keys
|
# If we're getting a wildcard `state_key`, get rid of any other state_keys
|
||||||
# for this `state_type` since the wildcard will cover it already.
|
# for this `state_type` since the wildcard will cover it already.
|
||||||
if state_key == StateValues.WILDCARD:
|
if state_key == StateValues.WILDCARD:
|
||||||
self.required_state_map[state_type] = {state_key}
|
required_state_map[state_type] = {state_key}
|
||||||
break
|
break
|
||||||
# Otherwise, just add it to the set
|
# Otherwise, just add it to the set
|
||||||
else:
|
else:
|
||||||
if self.required_state_map.get(state_type) is None:
|
if required_state_map.get(state_type) is None:
|
||||||
self.required_state_map[state_type] = {state_key}
|
required_state_map[state_type] = {state_key}
|
||||||
else:
|
else:
|
||||||
self.required_state_map[state_type].add(state_key)
|
required_state_map[state_type].add(state_key)
|
||||||
|
|
||||||
|
return RoomSyncConfig(timeline_limit, required_state_map)
|
||||||
|
|
||||||
def must_await_full_state(
|
def must_await_full_state(
|
||||||
self,
|
self,
|
||||||
@@ -324,7 +694,7 @@ class HaveSentRoomFlag(Enum):
|
|||||||
LIVE = "live"
|
LIVE = "live"
|
||||||
|
|
||||||
|
|
||||||
T = TypeVar("T")
|
T = TypeVar("T", str, RoomStreamToken, MultiWriterStreamToken)
|
||||||
|
|
||||||
|
|
||||||
@attr.s(auto_attribs=True, slots=True, frozen=True)
|
@attr.s(auto_attribs=True, slots=True, frozen=True)
|
||||||
@@ -383,6 +753,9 @@ class RoomStatusMap(Generic[T]):
|
|||||||
|
|
||||||
return RoomStatusMap(statuses=dict(self._statuses))
|
return RoomStatusMap(statuses=dict(self._statuses))
|
||||||
|
|
||||||
|
def __len__(self) -> int:
|
||||||
|
return len(self._statuses)
|
||||||
|
|
||||||
|
|
||||||
class MutableRoomStatusMap(RoomStatusMap[T]):
|
class MutableRoomStatusMap(RoomStatusMap[T]):
|
||||||
"""A mutable version of `RoomStatusMap`"""
|
"""A mutable version of `RoomStatusMap`"""
|
||||||
@@ -439,7 +812,7 @@ class MutableRoomStatusMap(RoomStatusMap[T]):
|
|||||||
self._statuses[room_id] = HaveSentRoom.previously(from_token)
|
self._statuses[room_id] = HaveSentRoom.previously(from_token)
|
||||||
|
|
||||||
|
|
||||||
@attr.s(auto_attribs=True)
|
@attr.s(auto_attribs=True, frozen=True)
|
||||||
class PerConnectionState:
|
class PerConnectionState:
|
||||||
"""The per-connection state. A snapshot of what we've sent down the
|
"""The per-connection state. A snapshot of what we've sent down the
|
||||||
connection before.
|
connection before.
|
||||||
@@ -467,14 +840,18 @@ class PerConnectionState:
|
|||||||
|
|
||||||
room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict)
|
room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict)
|
||||||
|
|
||||||
|
list_to_rooms: Mapping[str, AbstractSet[str]] = attr.Factory(dict)
|
||||||
|
|
||||||
def get_mutable(self) -> "MutablePerConnectionState":
|
def get_mutable(self) -> "MutablePerConnectionState":
|
||||||
"""Get a mutable copy of this state."""
|
"""Get a mutable copy of this state."""
|
||||||
room_configs = cast(MutableMapping[str, RoomSyncConfig], self.room_configs)
|
room_configs = cast(MutableMapping[str, RoomSyncConfig], self.room_configs)
|
||||||
|
list_to_rooms = cast(MutableMapping[str, Set[str]], self.list_to_rooms)
|
||||||
|
|
||||||
return MutablePerConnectionState(
|
return MutablePerConnectionState(
|
||||||
rooms=self.rooms.get_mutable(),
|
rooms=self.rooms.get_mutable(),
|
||||||
receipts=self.receipts.get_mutable(),
|
receipts=self.receipts.get_mutable(),
|
||||||
room_configs=ChainMap({}, room_configs),
|
room_configs=ChainMap({}, room_configs),
|
||||||
|
list_to_rooms=ChainMap({}, list_to_rooms),
|
||||||
)
|
)
|
||||||
|
|
||||||
def copy(self) -> "PerConnectionState":
|
def copy(self) -> "PerConnectionState":
|
||||||
@@ -482,6 +859,15 @@ class PerConnectionState:
|
|||||||
rooms=self.rooms.copy(),
|
rooms=self.rooms.copy(),
|
||||||
receipts=self.receipts.copy(),
|
receipts=self.receipts.copy(),
|
||||||
room_configs=dict(self.room_configs),
|
room_configs=dict(self.room_configs),
|
||||||
|
list_to_rooms=dict(self.list_to_rooms),
|
||||||
|
)
|
||||||
|
|
||||||
|
def __len__(self) -> int:
|
||||||
|
return (
|
||||||
|
len(self.rooms)
|
||||||
|
+ len(self.receipts)
|
||||||
|
+ len(self.room_configs)
|
||||||
|
+ len(self.list_to_rooms)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -494,13 +880,20 @@ class MutablePerConnectionState(PerConnectionState):
|
|||||||
|
|
||||||
room_configs: typing.ChainMap[str, RoomSyncConfig]
|
room_configs: typing.ChainMap[str, RoomSyncConfig]
|
||||||
|
|
||||||
|
list_to_rooms: typing.ChainMap[str, Set[str]]
|
||||||
|
|
||||||
def has_updates(self) -> bool:
|
def has_updates(self) -> bool:
|
||||||
return (
|
return (
|
||||||
bool(self.rooms.get_updates())
|
bool(self.rooms.get_updates())
|
||||||
or bool(self.receipts.get_updates())
|
or bool(self.receipts.get_updates())
|
||||||
or bool(self.get_room_config_updates())
|
or bool(self.get_room_config_updates())
|
||||||
|
or bool(self.list_to_rooms.maps[0])
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_room_config_updates(self) -> Mapping[str, RoomSyncConfig]:
|
def get_room_config_updates(self) -> Mapping[str, RoomSyncConfig]:
|
||||||
"""Get updates to the room sync config"""
|
"""Get updates to the room sync config"""
|
||||||
return self.room_configs.maps[0]
|
return self.room_configs.maps[0]
|
||||||
|
|
||||||
|
def get_list_to_rooms_updates(self) -> Mapping[str, StrCollection]:
|
||||||
|
"""Get updates to the `list_to_rooms`"""
|
||||||
|
return self.list_to_rooms.maps[0]
|
||||||
@@ -18,7 +18,6 @@
|
|||||||
#
|
#
|
||||||
#
|
#
|
||||||
import logging
|
import logging
|
||||||
from copy import deepcopy
|
|
||||||
from typing import Dict, List, Optional
|
from typing import Dict, List, Optional
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
@@ -47,7 +46,7 @@ from synapse.rest.client import knock, login, room
|
|||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
||||||
from synapse.types import JsonDict, StreamToken, UserID
|
from synapse.types import JsonDict, StreamToken, UserID
|
||||||
from synapse.types.handlers import SlidingSyncConfig
|
from synapse.types.handlers.sliding_sync import SlidingSyncConfig
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
|
|
||||||
from tests.replication._base import BaseMultiWorkerStreamTestCase
|
from tests.replication._base import BaseMultiWorkerStreamTestCase
|
||||||
@@ -566,23 +565,11 @@ class RoomSyncConfigTestCase(TestCase):
|
|||||||
"""
|
"""
|
||||||
Combine A into B and B into A to make sure we get the same result.
|
Combine A into B and B into A to make sure we get the same result.
|
||||||
"""
|
"""
|
||||||
# Since we're mutating these in place, make a copy for each of our trials
|
combined_config = a.combine_room_sync_config(b)
|
||||||
room_sync_config_a = deepcopy(a)
|
self._assert_room_config_equal(combined_config, expected, "B into A")
|
||||||
room_sync_config_b = deepcopy(b)
|
|
||||||
|
|
||||||
# Combine B into A
|
combined_config = a.combine_room_sync_config(b)
|
||||||
room_sync_config_a.combine_room_sync_config(room_sync_config_b)
|
self._assert_room_config_equal(combined_config, expected, "A into B")
|
||||||
|
|
||||||
self._assert_room_config_equal(room_sync_config_a, expected, "B into A")
|
|
||||||
|
|
||||||
# Since we're mutating these in place, make a copy for each of our trials
|
|
||||||
room_sync_config_a = deepcopy(a)
|
|
||||||
room_sync_config_b = deepcopy(b)
|
|
||||||
|
|
||||||
# Combine A into B
|
|
||||||
room_sync_config_b.combine_room_sync_config(room_sync_config_a)
|
|
||||||
|
|
||||||
self._assert_room_config_equal(room_sync_config_b, expected, "A into B")
|
|
||||||
|
|
||||||
|
|
||||||
class GetRoomMembershipForUserAtToTokenTestCase(HomeserverTestCase):
|
class GetRoomMembershipForUserAtToTokenTestCase(HomeserverTestCase):
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ import logging
|
|||||||
from twisted.test.proto_helpers import MemoryReactor
|
from twisted.test.proto_helpers import MemoryReactor
|
||||||
|
|
||||||
import synapse.rest.admin
|
import synapse.rest.admin
|
||||||
from synapse.api.constants import EventTypes, Membership
|
from synapse.api.constants import EventContentFields, EventTypes, Membership
|
||||||
from synapse.api.room_versions import RoomVersions
|
from synapse.api.room_versions import RoomVersions
|
||||||
from synapse.rest.client import login, room, sync
|
from synapse.rest.client import login, room, sync
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
@@ -44,6 +44,10 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
|||||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||||
self.store = hs.get_datastores().main
|
self.store = hs.get_datastores().main
|
||||||
self.storage_controllers = hs.get_storage_controllers()
|
self.storage_controllers = hs.get_storage_controllers()
|
||||||
|
self.state_handler = self.hs.get_state_handler()
|
||||||
|
persistence = self.hs.get_storage_controllers().persistence
|
||||||
|
assert persistence is not None
|
||||||
|
self.persistence = persistence
|
||||||
|
|
||||||
def test_rooms_meta_when_joined(self) -> None:
|
def test_rooms_meta_when_joined(self) -> None:
|
||||||
"""
|
"""
|
||||||
@@ -600,16 +604,16 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
|||||||
Test that `bump_stamp` ignores backfilled events, i.e. events with a
|
Test that `bump_stamp` ignores backfilled events, i.e. events with a
|
||||||
negative stream ordering.
|
negative stream ordering.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
user1_id = self.register_user("user1", "pass")
|
user1_id = self.register_user("user1", "pass")
|
||||||
user1_tok = self.login(user1_id, "pass")
|
user1_tok = self.login(user1_id, "pass")
|
||||||
|
|
||||||
# Create a remote room
|
# Create a remote room
|
||||||
creator = "@user:other"
|
creator = "@user:other"
|
||||||
room_id = "!foo:other"
|
room_id = "!foo:other"
|
||||||
|
room_version = RoomVersions.V10
|
||||||
shared_kwargs = {
|
shared_kwargs = {
|
||||||
"room_id": room_id,
|
"room_id": room_id,
|
||||||
"room_version": "10",
|
"room_version": room_version.identifier,
|
||||||
}
|
}
|
||||||
|
|
||||||
create_tuple = self.get_success(
|
create_tuple = self.get_success(
|
||||||
@@ -618,6 +622,12 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
|||||||
prev_event_ids=[],
|
prev_event_ids=[],
|
||||||
type=EventTypes.Create,
|
type=EventTypes.Create,
|
||||||
state_key="",
|
state_key="",
|
||||||
|
content={
|
||||||
|
# The `ROOM_CREATOR` field could be removed if we used a room
|
||||||
|
# version > 10 (in favor of relying on `sender`)
|
||||||
|
EventContentFields.ROOM_CREATOR: creator,
|
||||||
|
EventContentFields.ROOM_VERSION: room_version.identifier,
|
||||||
|
},
|
||||||
sender=creator,
|
sender=creator,
|
||||||
**shared_kwargs,
|
**shared_kwargs,
|
||||||
)
|
)
|
||||||
@@ -667,22 +677,29 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
|||||||
]
|
]
|
||||||
|
|
||||||
# Ensure the local HS knows the room version
|
# Ensure the local HS knows the room version
|
||||||
self.get_success(
|
self.get_success(self.store.store_room(room_id, creator, False, room_version))
|
||||||
self.store.store_room(room_id, creator, False, RoomVersions.V10)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Persist these events as backfilled events.
|
# Persist these events as backfilled events.
|
||||||
persistence = self.hs.get_storage_controllers().persistence
|
|
||||||
assert persistence is not None
|
|
||||||
|
|
||||||
for event, context in remote_events_and_contexts:
|
for event, context in remote_events_and_contexts:
|
||||||
self.get_success(persistence.persist_event(event, context, backfilled=True))
|
self.get_success(
|
||||||
|
self.persistence.persist_event(event, context, backfilled=True)
|
||||||
|
)
|
||||||
|
|
||||||
# Now we join the local user to the room
|
# Now we join the local user to the room. We want to make this feel as close to
|
||||||
join_tuple = self.get_success(
|
# the real `process_remote_join()` as possible but we'd like to avoid some of
|
||||||
|
# the auth checks that would be done in the real code.
|
||||||
|
#
|
||||||
|
# FIXME: The test was originally written using this less-real
|
||||||
|
# `persist_event(...)` shortcut but it would be nice to use the real remote join
|
||||||
|
# process in a `FederatingHomeserverTestCase`.
|
||||||
|
flawed_join_tuple = self.get_success(
|
||||||
create_event(
|
create_event(
|
||||||
self.hs,
|
self.hs,
|
||||||
prev_event_ids=[invite_tuple[0].event_id],
|
prev_event_ids=[invite_tuple[0].event_id],
|
||||||
|
# This doesn't work correctly to create an `EventContext` that includes
|
||||||
|
# both of these state events. I assume it's because we're working on our
|
||||||
|
# local homeserver which has the remote state set as `outlier`. We have
|
||||||
|
# to create our own EventContext below to get this right.
|
||||||
auth_event_ids=[create_tuple[0].event_id, invite_tuple[0].event_id],
|
auth_event_ids=[create_tuple[0].event_id, invite_tuple[0].event_id],
|
||||||
type=EventTypes.Member,
|
type=EventTypes.Member,
|
||||||
state_key=user1_id,
|
state_key=user1_id,
|
||||||
@@ -691,7 +708,22 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
|||||||
**shared_kwargs,
|
**shared_kwargs,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
self.get_success(persistence.persist_event(*join_tuple))
|
# We have to create our own context to get the state set correctly. If we use
|
||||||
|
# the `EventContext` from the `flawed_join_tuple`, the `current_state_events`
|
||||||
|
# table will only have the join event in it which should never happen in our
|
||||||
|
# real server.
|
||||||
|
join_event = flawed_join_tuple[0]
|
||||||
|
join_context = self.get_success(
|
||||||
|
self.state_handler.compute_event_context(
|
||||||
|
join_event,
|
||||||
|
state_ids_before_event={
|
||||||
|
(e.type, e.state_key): e.event_id
|
||||||
|
for e in [create_tuple[0], invite_tuple[0]]
|
||||||
|
},
|
||||||
|
partial_state=False,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.get_success(self.persistence.persist_event(join_event, join_context))
|
||||||
|
|
||||||
# Doing an SS request should return a positive `bump_stamp`, even though
|
# Doing an SS request should return a positive `bump_stamp`, even though
|
||||||
# the only event that matches the bump types has as negative stream
|
# the only event that matches the bump types has as negative stream
|
||||||
|
|||||||
@@ -191,8 +191,14 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
|
|||||||
}
|
}
|
||||||
_, from_token = self.do_sync(sync_body, tok=user1_tok)
|
_, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||||
|
|
||||||
# Reset the in-memory cache
|
# Reset the positions
|
||||||
self.hs.get_sliding_sync_handler().connection_store._connections.clear()
|
self.get_success(
|
||||||
|
self.store.db_pool.simple_delete(
|
||||||
|
table="sliding_sync_connections",
|
||||||
|
keyvalues={"user_id": user1_id},
|
||||||
|
desc="clear_cache",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
# Make the Sliding Sync request
|
# Make the Sliding Sync request
|
||||||
channel = self.make_request(
|
channel = self.make_request(
|
||||||
|
|||||||
@@ -112,6 +112,24 @@ class UpdateUpsertManyTests(unittest.HomeserverTestCase):
|
|||||||
{(1, "user1", "hello"), (2, "user2", "bleb")},
|
{(1, "user1", "hello"), (2, "user2", "bleb")},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self.get_success(
|
||||||
|
self.storage.db_pool.runInteraction(
|
||||||
|
"test",
|
||||||
|
self.storage.db_pool.simple_upsert_many_txn,
|
||||||
|
self.table_name,
|
||||||
|
key_names=key_names,
|
||||||
|
key_values=[[2, "user2"]],
|
||||||
|
value_names=[],
|
||||||
|
value_values=[],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check results are what we expect
|
||||||
|
self.assertEqual(
|
||||||
|
set(self._dump_table_to_tuple()),
|
||||||
|
{(1, "user1", "hello"), (2, "user2", "bleb")},
|
||||||
|
)
|
||||||
|
|
||||||
def test_simple_update_many(self) -> None:
|
def test_simple_update_many(self) -> None:
|
||||||
"""
|
"""
|
||||||
simple_update_many performs many updates at once.
|
simple_update_many performs many updates at once.
|
||||||
|
|||||||
@@ -19,6 +19,7 @@
|
|||||||
#
|
#
|
||||||
#
|
#
|
||||||
|
|
||||||
|
import logging
|
||||||
from typing import List, Optional
|
from typing import List, Optional
|
||||||
|
|
||||||
from twisted.test.proto_helpers import MemoryReactor
|
from twisted.test.proto_helpers import MemoryReactor
|
||||||
@@ -35,6 +36,8 @@ from synapse.util import Clock
|
|||||||
|
|
||||||
from tests.unittest import HomeserverTestCase
|
from tests.unittest import HomeserverTestCase
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ExtremPruneTestCase(HomeserverTestCase):
|
class ExtremPruneTestCase(HomeserverTestCase):
|
||||||
servlets = [
|
servlets = [
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ from typing import List, Optional, Tuple, cast
|
|||||||
|
|
||||||
from twisted.test.proto_helpers import MemoryReactor
|
from twisted.test.proto_helpers import MemoryReactor
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes, JoinRules, Membership
|
from synapse.api.constants import EventContentFields, EventTypes, JoinRules, Membership
|
||||||
from synapse.api.room_versions import RoomVersions
|
from synapse.api.room_versions import RoomVersions
|
||||||
from synapse.rest import admin
|
from synapse.rest import admin
|
||||||
from synapse.rest.admin import register_servlets_for_client_rest_resource
|
from synapse.rest.admin import register_servlets_for_client_rest_resource
|
||||||
@@ -38,6 +38,7 @@ from synapse.util import Clock
|
|||||||
from tests import unittest
|
from tests import unittest
|
||||||
from tests.server import TestHomeServer
|
from tests.server import TestHomeServer
|
||||||
from tests.test_utils import event_injection
|
from tests.test_utils import event_injection
|
||||||
|
from tests.test_utils.event_injection import create_event
|
||||||
from tests.unittest import skip_unless
|
from tests.unittest import skip_unless
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -54,6 +55,10 @@ class RoomMemberStoreTestCase(unittest.HomeserverTestCase):
|
|||||||
# We can't test the RoomMemberStore on its own without the other event
|
# We can't test the RoomMemberStore on its own without the other event
|
||||||
# storage logic
|
# storage logic
|
||||||
self.store = hs.get_datastores().main
|
self.store = hs.get_datastores().main
|
||||||
|
self.state_handler = self.hs.get_state_handler()
|
||||||
|
persistence = self.hs.get_storage_controllers().persistence
|
||||||
|
assert persistence is not None
|
||||||
|
self.persistence = persistence
|
||||||
|
|
||||||
self.u_alice = self.register_user("alice", "pass")
|
self.u_alice = self.register_user("alice", "pass")
|
||||||
self.t_alice = self.login("alice", "pass")
|
self.t_alice = self.login("alice", "pass")
|
||||||
@@ -220,31 +225,166 @@ class RoomMemberStoreTestCase(unittest.HomeserverTestCase):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def test_join_locally_forgotten_room(self) -> None:
|
def test_join_locally_forgotten_room(self) -> None:
|
||||||
"""Tests if a user joins a forgotten room the room is not forgotten anymore."""
|
"""
|
||||||
self.room = self.helper.create_room_as(self.u_alice, tok=self.t_alice)
|
Tests if a user joins a forgotten room, the room is not forgotten anymore.
|
||||||
self.assertFalse(
|
|
||||||
self.get_success(self.store.is_locally_forgotten_room(self.room))
|
Since a room can't be re-joined if everyone has left. This can only happen with
|
||||||
|
a room with remote users in it.
|
||||||
|
"""
|
||||||
|
user1_id = self.register_user("user1", "pass")
|
||||||
|
user1_tok = self.login(user1_id, "pass")
|
||||||
|
|
||||||
|
# Create a remote room
|
||||||
|
creator = "@user:other"
|
||||||
|
room_id = "!foo:other"
|
||||||
|
room_version = RoomVersions.V10
|
||||||
|
shared_kwargs = {
|
||||||
|
"room_id": room_id,
|
||||||
|
"room_version": room_version.identifier,
|
||||||
|
}
|
||||||
|
|
||||||
|
create_tuple = self.get_success(
|
||||||
|
create_event(
|
||||||
|
self.hs,
|
||||||
|
prev_event_ids=[],
|
||||||
|
type=EventTypes.Create,
|
||||||
|
state_key="",
|
||||||
|
content={
|
||||||
|
# The `ROOM_CREATOR` field could be removed if we used a room
|
||||||
|
# version > 10 (in favor of relying on `sender`)
|
||||||
|
EventContentFields.ROOM_CREATOR: creator,
|
||||||
|
EventContentFields.ROOM_VERSION: room_version.identifier,
|
||||||
|
},
|
||||||
|
sender=creator,
|
||||||
|
**shared_kwargs,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
creator_tuple = self.get_success(
|
||||||
|
create_event(
|
||||||
|
self.hs,
|
||||||
|
prev_event_ids=[create_tuple[0].event_id],
|
||||||
|
auth_event_ids=[create_tuple[0].event_id],
|
||||||
|
type=EventTypes.Member,
|
||||||
|
state_key=creator,
|
||||||
|
content={"membership": Membership.JOIN},
|
||||||
|
sender=creator,
|
||||||
|
**shared_kwargs,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
# after leaving and forget the room, it is forgotten
|
remote_events_and_contexts = [
|
||||||
|
create_tuple,
|
||||||
|
creator_tuple,
|
||||||
|
]
|
||||||
|
|
||||||
|
# Ensure the local HS knows the room version
|
||||||
|
self.get_success(self.store.store_room(room_id, creator, False, room_version))
|
||||||
|
|
||||||
|
# Persist these events as backfilled events.
|
||||||
|
for event, context in remote_events_and_contexts:
|
||||||
self.get_success(
|
self.get_success(
|
||||||
event_injection.inject_member_event(
|
self.persistence.persist_event(event, context, backfilled=True)
|
||||||
self.hs, self.room, self.u_alice, "leave"
|
|
||||||
)
|
|
||||||
)
|
|
||||||
self.get_success(self.store.forget(self.u_alice, self.room))
|
|
||||||
self.assertTrue(
|
|
||||||
self.get_success(self.store.is_locally_forgotten_room(self.room))
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# after rejoin the room is not forgotten anymore
|
# Now we join the local user to the room. We want to make this feel as close to
|
||||||
self.get_success(
|
# the real `process_remote_join()` as possible but we'd like to avoid some of
|
||||||
event_injection.inject_member_event(
|
# the auth checks that would be done in the real code.
|
||||||
self.hs, self.room, self.u_alice, "join"
|
#
|
||||||
|
# FIXME: The test was originally written using this less-real
|
||||||
|
# `persist_event(...)` shortcut but it would be nice to use the real remote join
|
||||||
|
# process in a `FederatingHomeserverTestCase`.
|
||||||
|
flawed_join_tuple = self.get_success(
|
||||||
|
create_event(
|
||||||
|
self.hs,
|
||||||
|
prev_event_ids=[creator_tuple[0].event_id],
|
||||||
|
# This doesn't work correctly to create an `EventContext` that includes
|
||||||
|
# both of these state events. I assume it's because we're working on our
|
||||||
|
# local homeserver which has the remote state set as `outlier`. We have
|
||||||
|
# to create our own EventContext below to get this right.
|
||||||
|
auth_event_ids=[create_tuple[0].event_id],
|
||||||
|
type=EventTypes.Member,
|
||||||
|
state_key=user1_id,
|
||||||
|
content={"membership": Membership.JOIN},
|
||||||
|
sender=user1_id,
|
||||||
|
**shared_kwargs,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
# We have to create our own context to get the state set correctly. If we use
|
||||||
|
# the `EventContext` from the `flawed_join_tuple`, the `current_state_events`
|
||||||
|
# table will only have the join event in it which should never happen in our
|
||||||
|
# real server.
|
||||||
|
join_event = flawed_join_tuple[0]
|
||||||
|
join_context = self.get_success(
|
||||||
|
self.state_handler.compute_event_context(
|
||||||
|
join_event,
|
||||||
|
state_ids_before_event={
|
||||||
|
(e.type, e.state_key): e.event_id for e in [create_tuple[0]]
|
||||||
|
},
|
||||||
|
partial_state=False,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.get_success(self.persistence.persist_event(join_event, join_context))
|
||||||
|
|
||||||
|
# The room shouldn't be forgotten because the local user just joined
|
||||||
self.assertFalse(
|
self.assertFalse(
|
||||||
self.get_success(self.store.is_locally_forgotten_room(self.room))
|
self.get_success(self.store.is_locally_forgotten_room(room_id))
|
||||||
|
)
|
||||||
|
|
||||||
|
# After all of the local users (there is only user1) leave and forgetting the
|
||||||
|
# room, it is forgotten
|
||||||
|
user1_leave_response = self.helper.leave(room_id, user1_id, tok=user1_tok)
|
||||||
|
user1_leave_event = self.get_success(
|
||||||
|
self.store.get_event(user1_leave_response["event_id"])
|
||||||
|
)
|
||||||
|
self.get_success(self.store.forget(user1_id, room_id))
|
||||||
|
self.assertTrue(self.get_success(self.store.is_locally_forgotten_room(room_id)))
|
||||||
|
|
||||||
|
# Join the local user to the room (again). We want to make this feel as close to
|
||||||
|
# the real `process_remote_join()` as possible but we'd like to avoid some of
|
||||||
|
# the auth checks that would be done in the real code.
|
||||||
|
#
|
||||||
|
# FIXME: The test was originally written using this less-real
|
||||||
|
# `event_injection.inject_member_event(...)` shortcut but it would be nice to
|
||||||
|
# use the real remote join process in a `FederatingHomeserverTestCase`.
|
||||||
|
flawed_join_tuple = self.get_success(
|
||||||
|
create_event(
|
||||||
|
self.hs,
|
||||||
|
prev_event_ids=[user1_leave_response["event_id"]],
|
||||||
|
# This doesn't work correctly to create an `EventContext` that includes
|
||||||
|
# both of these state events. I assume it's because we're working on our
|
||||||
|
# local homeserver which has the remote state set as `outlier`. We have
|
||||||
|
# to create our own EventContext below to get this right.
|
||||||
|
auth_event_ids=[
|
||||||
|
create_tuple[0].event_id,
|
||||||
|
user1_leave_response["event_id"],
|
||||||
|
],
|
||||||
|
type=EventTypes.Member,
|
||||||
|
state_key=user1_id,
|
||||||
|
content={"membership": Membership.JOIN},
|
||||||
|
sender=user1_id,
|
||||||
|
**shared_kwargs,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# We have to create our own context to get the state set correctly. If we use
|
||||||
|
# the `EventContext` from the `flawed_join_tuple`, the `current_state_events`
|
||||||
|
# table will only have the join event in it which should never happen in our
|
||||||
|
# real server.
|
||||||
|
join_event = flawed_join_tuple[0]
|
||||||
|
join_context = self.get_success(
|
||||||
|
self.state_handler.compute_event_context(
|
||||||
|
join_event,
|
||||||
|
state_ids_before_event={
|
||||||
|
(e.type, e.state_key): e.event_id
|
||||||
|
for e in [create_tuple[0], user1_leave_event]
|
||||||
|
},
|
||||||
|
partial_state=False,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.get_success(self.persistence.persist_event(join_event, join_context))
|
||||||
|
|
||||||
|
# After the local user rejoins the remote room, it isn't forgotten anymore
|
||||||
|
self.assertFalse(
|
||||||
|
self.get_success(self.store.is_locally_forgotten_room(room_id))
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
4799
tests/storage/test_sliding_sync_tables.py
Normal file
4799
tests/storage/test_sliding_sync_tables.py
Normal file
File diff suppressed because it is too large
Load Diff
@@ -272,8 +272,8 @@ class TestCase(unittest.TestCase):
|
|||||||
|
|
||||||
def assertIncludes(
|
def assertIncludes(
|
||||||
self,
|
self,
|
||||||
actual_items: AbstractSet[str],
|
actual_items: AbstractSet[TV],
|
||||||
expected_items: AbstractSet[str],
|
expected_items: AbstractSet[TV],
|
||||||
exact: bool = False,
|
exact: bool = False,
|
||||||
message: Optional[str] = None,
|
message: Optional[str] = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|||||||
Reference in New Issue
Block a user