Compare commits

...

159 Commits

Author SHA1 Message Date
Erik Johnston
4fd68e2736 FIXUP 2024-08-28 15:05:56 +01:00
Erik Johnston
c5a90575bf FIXUP 2024-08-28 15:04:48 +01:00
Erik Johnston
a217155570 Merge branch 'madlittlemods/sliding-sync-pre-populate-room-meta-data' into erikj/ss_hacks 2024-08-28 14:58:31 +01:00
Erik Johnston
90d0e035dd Fix port script tests by handling empty DBs correctly 2024-08-28 14:39:25 +01:00
Erik Johnston
ab414f2ab8 Use event_auth table to get previous membership 2024-08-28 14:23:32 +01:00
Erik Johnston
6f9932d146 Handle old rows with null event_stream_ordering column 2024-08-28 14:23:06 +01:00
Erik Johnston
bb905cd02c Only run the sliding sync background updates on the main database 2024-08-28 11:44:56 +01:00
Erik Johnston
7c9c62051c Remove all rooms pulled out from the queue 2024-08-28 11:31:19 +01:00
Eric Eastwood
da463fb102 Add unique index right away for sliding_sync_joined_rooms_to_recalculate
This makes it so we can always `upsert` to avoid duplicates otherwise
I'm not sure of how to not insert duplicates in certain situations
(see FIXME in the diff) which would cause problems down the line
for the unique index being added later.
2024-08-28 00:50:33 -05:00
Eric Eastwood
8468401a97 Adapt to using sliding_sync_joined_rooms_to_recalculate table 2024-08-28 00:42:14 -05:00
Eric Eastwood
53b7309f6c Add sliding_sync_joined_rooms_to_recalculate table 2024-08-27 20:48:56 -05:00
Eric Eastwood
94e1a54687 get_events(...) will omit events from unknown room versions
Thanks @erikjohnston
2024-08-27 20:01:55 -05:00
Eric Eastwood
a507f152c9 Use stream_id of some point before we fetch the current state
This is simpler and some rooms are so old that they don't have
`current_state_delta_stream` yet. It's easier if we just get a
general max `stream_id` of the whole table than the max `stream_id`
for the specific room anyway.

Thanks @erikjohnston
2024-08-27 19:45:50 -05:00
Eric Eastwood
9d08bc2157 Remove debug logs 2024-08-27 19:35:05 -05:00
Eric Eastwood
56a4c0ba6e Round out tests 2024-08-27 19:34:16 -05:00
Eric Eastwood
85a60c3132 More tests 2024-08-27 19:27:24 -05:00
Eric Eastwood
e5e7269998 Add more tests 2024-08-27 18:49:53 -05:00
Eric Eastwood
c8e17f7479 Add test when no rooms 2024-08-27 18:21:25 -05:00
Eric Eastwood
4dc9e268e6 Add test for catch-up background update 2024-08-27 18:08:17 -05:00
Eric Eastwood
9a7d8c2be4 Start catch-up if nothing written yet 2024-08-27 17:28:07 -05:00
Eric Eastwood
c51a309da5 Maybe: always start background update 2024-08-27 17:11:51 -05:00
Eric Eastwood
9764f626ea Fix query in Postgres 2024-08-27 12:09:00 -05:00
Eric Eastwood
7a0c281028 Add placeholder tests 2024-08-26 19:43:52 -05:00
Eric Eastwood
7fe5d31e20 Note down caveat about forgotten 2024-08-26 18:52:10 -05:00
Eric Eastwood
53473a0eb4 Adapt sliding_sync_joined_rooms background update to use event_stream_ordering for progress
This way we can re-use it for the catch-up background process
2024-08-26 18:35:49 -05:00
Eric Eastwood
eb3c84cf45 Kick-off background update for out-of-date snapshots 2024-08-26 17:31:26 -05:00
Eric Eastwood
6a44686dc3 Why it matters 2024-08-26 16:32:59 -05:00
Eric Eastwood
a94c1dd62c Add more context for why 2024-08-26 16:27:20 -05:00
Eric Eastwood
8bddbe23bd Clear out-of-date rows 2024-08-26 16:19:47 -05:00
Eric Eastwood
addb91485f Split test cases 2024-08-26 16:11:56 -05:00
Eric Eastwood
9795556052 Update comment 2024-08-26 14:42:55 -05:00
Erik Johnston
c44db28958 Merge branch 'erikj/ss_sort_cache' into erikj/ss_hacks 2024-08-23 15:49:31 +01:00
Erik Johnston
c6204d3fa1 Speed up sort rooms via cache 2024-08-23 15:49:15 +01:00
Erik Johnston
1ead5e0c0e Merge branch 'erikj/ss_store_lists' into erikj/ss_hacks 2024-08-23 11:34:33 +01:00
Erik Johnston
8d1d8f9b3b Add TODO 2024-08-23 11:34:17 +01:00
Erik Johnston
651e520292 Store list data 2024-08-23 11:33:31 +01:00
Erik Johnston
f457dbee35 Add room_to_lists field. 2024-08-23 11:24:41 +01:00
Erik Johnston
77d3fa8b9e Fixup 2024-08-23 10:26:06 +01:00
Erik Johnston
4a9d81f6ad Fixup 2024-08-23 10:12:16 +01:00
Erik Johnston
96f476d9b4 Merge remote-tracking branch 'origin/madlittlemods/sliding-sync-pre-populate-room-meta-data' into erikj/ss_hacks 2024-08-23 10:00:55 +01:00
Erik Johnston
e2501a0bd7 Merge remote-tracking branch 'origin/erikj/ss_store_state' into erikj/ss_hacks 2024-08-23 09:54:21 +01:00
Eric Eastwood
a57d47b778 Use simple_upsert_txn for sliding_sync_membership_snapshots in background update 2024-08-22 23:59:06 -05:00
Eric Eastwood
b6a7d2bf6c Use simple_upsert_txn for sliding_sync_joined_rooms in background update 2024-08-22 23:30:00 -05:00
Eric Eastwood
f8926d07df Fix partial-stated room re-syncing state but nothing has changed
Fixes failing test in CI: `tests.handlers.test_federation.PartialJoinTestCase.test_failed_partial_join_is_clean`
```
2024-08-22 18:57:22-0500 [-] synapse.metrics.background_process_metrics - 253 - ERROR - sync_partial_state_room-0 - Background process 'sync_partial_state_room' threw an exception
	Traceback (most recent call last):
	  File "synapse/synapse/metrics/background_process_metrics.py", line 251, in run
	    return await func(*args, **kwargs)
	           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
	  File "synapse/synapse/handlers/federation.py", line 1842, in _sync_partial_state_room_wrapper
	    await self._sync_partial_state_room(
	  File "synapse/synapse/handlers/federation.py", line 1933, in _sync_partial_state_room
	    await self.state_handler.update_current_state(room_id)
	  File "synapse/synapse/state/__init__.py", line 554, in update_current_state
	    await self._storage_controllers.persistence.update_current_state(room_id)
	  File "synapse/synapse/storage/controllers/persist_events.py", line 491, in update_current_state
	    await self._event_persist_queue.add_to_queue(
	  File "synapse/synapse/storage/controllers/persist_events.py", line 245, in add_to_queue
	    res = await make_deferred_yieldable(end_item.deferred.observe())
	          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
	  File "synapse/synapse/storage/controllers/persist_events.py", line 288, in handle_queue_loop
	    ret = await self._per_item_callback(room_id, item.task)
	          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
	  File "synapse/synapse/storage/controllers/persist_events.py", line 370, in _process_event_persist_queue_task
	    await self._update_current_state(room_id, task)
	  File "synapse/synapse/storage/controllers/persist_events.py", line 507, in _update_current_state
	    await self.persist_events_store._calculate_sliding_sync_table_changes(
	  File "synapse/synapse/storage/databases/main/events.py", line 624, in _calculate_sliding_sync_table_changes
	    assert most_recent_event_pos_results, (
	           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
	AssertionError: We should not be seeing `None` here because we are still in the room (!room:example.com) and it should at-least have a join membership event that's keeping us here.
```
2024-08-22 19:06:04 -05:00
Eric Eastwood
21cc97ba9d Use simple_upsert_many_txn for sliding_sync_membership_snapshots
See https://github.com/element-hq/synapse/pull/17512#discussion_r1726820006
2024-08-22 18:52:05 -05:00
Eric Eastwood
fdb8b5931f Correct comment 2024-08-22 18:43:51 -05:00
Eric Eastwood
4b866c4fca Simplify what we need to think about to grab the best effort value 2024-08-22 18:36:43 -05:00
Eric Eastwood
088a4c7cf0 Use simple_upsert_txn to update sliding_sync_joined_rooms
See https://github.com/element-hq/synapse/pull/17512#discussion_r1726817206
2024-08-22 18:26:37 -05:00
Eric Eastwood
0726a6d58b Derive best effort stream_ordering outside of the transaction
See https://github.com/element-hq/synapse/pull/17512#discussion_r1727995882
2024-08-22 18:13:00 -05:00
Eric Eastwood
6edc4c78ce Allow for no bump_stamp (fix portdb CI job)
See https://github.com/element-hq/synapse/pull/17512#discussion_r1725998219
2024-08-22 17:09:43 -05:00
Eric Eastwood
44432e2118 Move tests to dedicated file
See https://github.com/element-hq/synapse/pull/17512#discussion_r1726849798
2024-08-22 16:56:09 -05:00
Eric Eastwood
bcba8cccfe No need for transaction
See https://github.com/element-hq/synapse/pull/17512#discussion_r1726844107
2024-08-22 16:52:31 -05:00
Eric Eastwood
693c06b2f1 Move away from backfill language 2024-08-22 16:48:02 -05:00
Eric Eastwood
4d87fa61c6 "backfill" -> "bg_update"
See https://github.com/element-hq/synapse/pull/17512#discussion_r1726837698
2024-08-22 16:44:02 -05:00
Eric Eastwood
d61aada8ba Simplify _update_sliding_sync_tables_with_new_persisted_events_txn()
See
https://github.com/element-hq/synapse/pull/17512#discussion_r1719997640
https://github.com/element-hq/synapse/pull/17512#discussion_r1726828894
https://github.com/element-hq/synapse/pull/17512#discussion_r1726836440
2024-08-22 16:35:59 -05:00
Eric Eastwood
6723824c4a Prefer simple_delete_many_txn
See https://github.com/element-hq/synapse/pull/17512#discussion_r1726819380
2024-08-22 15:47:44 -05:00
Eric Eastwood
980ee9aad6 Prefer simple_update_txn
See https://github.com/element-hq/synapse/pull/17512#discussion_r1726808112
2024-08-22 15:40:08 -05:00
Erik Johnston
03eac5ae60 Newsfile 2024-08-22 18:04:23 +01:00
Erik Johnston
ed7591cbef Remove mark_token_seen 2024-08-22 18:04:23 +01:00
Erik Johnston
3838b18d3b Store state 2024-08-22 18:04:23 +01:00
Erik Johnston
b3d8e2d2bd Add simple_insert_returning_txn 2024-08-22 18:04:23 +01:00
Erik Johnston
d1ee253bef Allow making columns AUTOINCREMENT 2024-08-22 18:04:23 +01:00
Erik Johnston
87d53368d7 Newsfile 2024-08-22 18:03:57 +01:00
Erik Johnston
e34d634778 Make PerConnectionState immutable 2024-08-22 18:03:57 +01:00
Erik Johnston
7087c7c3d5 Make RoomSyncConfig immutable 2024-08-22 18:03:57 +01:00
Eric Eastwood
fc73b6ffc9 Rename insert_key/insert_value
See https://github.com/element-hq/synapse/pull/17512#discussion_r1726805894
2024-08-22 11:44:57 -05:00
Eric Eastwood
9b8d2017af Check events_and_context for state events
See https://github.com/element-hq/synapse/pull/17512#discussion_r1726798803
2024-08-22 11:34:40 -05:00
Erik Johnston
5b77f4a67a Update mypy plugin to handle enums and typevars 2024-08-22 17:04:21 +01:00
Erik Johnston
e2ade85250 Move sliding sync types 2024-08-22 17:04:21 +01:00
Eric Eastwood
339500d067 Fix sub-query selecting multiple rows 2024-08-21 22:56:25 -05:00
Eric Eastwood
31300f4ce5 More docstring 2024-08-21 22:15:19 -05:00
Eric Eastwood
b45b1896aa Fill out docstring 2024-08-21 22:09:57 -05:00
Eric Eastwood
ee2ef0b4d9 Add forgotten column 2024-08-21 21:54:22 -05:00
Eric Eastwood
0a938b137a Add missing boolean column to portdb script 2024-08-21 19:59:35 -05:00
Eric Eastwood
97248362d0 Log which room is strange 2024-08-21 19:26:14 -05:00
Eric Eastwood
02711552cf Better handle none case 2024-08-21 19:11:08 -05:00
Eric Eastwood
8ddf5c7235 Add tombstone to tests 2024-08-21 19:05:59 -05:00
Eric Eastwood
513ec8e906 Update tests 2024-08-21 18:51:04 -05:00
Eric Eastwood
cda2311520 Add tombstone_successor_room_id column 2024-08-21 18:21:44 -05:00
Eric Eastwood
c612572d12 Move away from stream_id
See https://github.com/element-hq/synapse/pull/17512#discussion_r1725806543
2024-08-21 17:14:34 -05:00
Eric Eastwood
5b1db39bb7 Add sender column so we can tell leaves from kicks 2024-08-21 16:42:12 -05:00
Eric Eastwood
e7a3328228 Pre-populate membership and membership_event_stream_ordering
See https://github.com/element-hq/synapse/pull/17512#discussion_r1725311745
2024-08-21 16:20:00 -05:00
Eric Eastwood
f6d7ffd9c5 Move _calculate_sliding_sync_table_changes(...) after we assign stream_ordering to events
See https://github.com/element-hq/synapse/pull/17512#discussion_r1725728637
2024-08-21 16:10:14 -05:00
Eric Eastwood
772c501bb6 Use available stream_id
See https://github.com/element-hq/synapse/pull/17512#discussion_r1725310035
2024-08-21 15:04:51 -05:00
Eric Eastwood
cda92af4a6 No need to update event_stream_ordering/bump_stamp ON CONFLICT 2024-08-21 14:46:50 -05:00
Eric Eastwood
d3f90e4bd8 Get full events for _sliding_sync_joined_rooms_backfill 2024-08-21 14:39:54 -05:00
Eric Eastwood
a5e06c6a8d Move back to the main store 2024-08-21 11:14:15 -05:00
Eric Eastwood
0233e20aa3 Use full event version after solving the circular import issues 2024-08-21 10:44:49 -05:00
Eric Eastwood
357132db1d Go back to simpler fetching senders 2024-08-20 21:54:03 -05:00
Eric Eastwood
726a8e9698 Attempt getting real events in backgroun update (needs work) 2024-08-20 21:48:44 -05:00
Eric Eastwood
cc200ee9f5 Merge branch 'develop' into madlittlemods/sliding-sync-pre-populate-room-meta-data 2024-08-20 17:25:48 -05:00
Eric Eastwood
3eb77c3a2a Add sanity checks and fix wrong variable usage 2024-08-20 17:24:43 -05:00
Eric Eastwood
45c89ec625 Move pre-processing completely outside transaction 2024-08-20 15:41:53 -05:00
Eric Eastwood
ac5b05c86b Use TypedDict 2024-08-20 13:29:56 -05:00
Eric Eastwood
2964c567d3 Use dicts 2024-08-20 13:21:19 -05:00
Eric Eastwood
95d39db772 Closer types 2024-08-20 11:55:24 -05:00
Eric Eastwood
6cc6bdbedf Start of moving logic outside of the transaction (pre-process) 2024-08-20 11:10:34 -05:00
Eric Eastwood
574a04a40f Test state reset on membership 2024-08-19 23:30:25 -05:00
Eric Eastwood
8ee2e114dd Add test to handle state reset in the meta data 2024-08-19 23:22:24 -05:00
Eric Eastwood
98fb56e5fe Prefer _update_sliding_sync_tables_with_new_persisted_events_txn(...) to do the right thing
See https://github.com/element-hq/synapse/pull/17512#discussion_r1719992152
2024-08-19 22:30:50 -05:00
Eric Eastwood
df0c57d383 Merge branch 'develop' into madlittlemods/sliding-sync-pre-populate-room-meta-data 2024-08-19 16:34:41 -05:00
Eric Eastwood
d2f5247e77 Update comment 2024-08-16 00:15:03 -05:00
Eric Eastwood
c89d859c7c Fill in docstrings 2024-08-15 23:52:01 -05:00
Eric Eastwood
2ec93e3f0d Move function next to other helpers 2024-08-15 23:39:39 -05:00
Eric Eastwood
fa63c02648 Fix lints 2024-08-15 23:30:16 -05:00
Eric Eastwood
419be7c6b2 Finish off background update tests 2024-08-15 23:29:29 -05:00
Eric Eastwood
ef5f0fca3a Add more tests 2024-08-15 23:18:50 -05:00
Eric Eastwood
fb5af8f5fa Add background update test for sliding_sync_membership_snapshots 2024-08-15 22:13:32 -05:00
Eric Eastwood
8461faf384 Add historical case to background update 2024-08-15 21:56:12 -05:00
Eric Eastwood
6c2fc1d20f Move background updates to StateBackgroundUpdateStore
So we can access `_get_state_groups_from_groups_txn(...)`
2024-08-15 20:51:43 -05:00
Eric Eastwood
cbeff57402 Use helper 2024-08-15 20:31:57 -05:00
Eric Eastwood
4b42e44ef9 Work on background update for sliding_sync_membership_snapshots 2024-08-15 00:21:35 -05:00
Eric Eastwood
d113e743ae Fix lints 2024-08-14 19:30:52 -05:00
Eric Eastwood
23e0d34a2d Add more tests 2024-08-14 19:30:22 -05:00
Eric Eastwood
1c931cb3e7 Add background update for sliding_sync_joined_rooms 2024-08-14 19:19:15 -05:00
Eric Eastwood
9f551f0e97 Fix lints 2024-08-14 11:32:33 -05:00
Eric Eastwood
c8508f113a Clean up tables when a room is purged/deleted 2024-08-14 11:27:57 -05:00
Eric Eastwood
f49003c35c No invites needed 2024-08-13 18:55:59 -05:00
Eric Eastwood
8b0e1692f9 More realistic remote room forgotten test 2024-08-13 18:51:11 -05:00
Eric Eastwood
5df94f47b5 Fix running into StopIteration
More context about how/why `StopIteration` was being ignored silently
which made this problem harder to debug. See
https://github.com/element-hq/synapse/pull/17512#discussion_r1715954505
2024-08-13 17:15:09 -05:00
Eric Eastwood
3566abd9bc Fix boolean schema for Postgres 2024-08-13 15:14:48 -05:00
Eric Eastwood
96a4614f92 Update fixme comment 2024-08-13 14:50:11 -05:00
Eric Eastwood
dc447a673f Clarify when/why we upsert 2024-08-13 14:47:17 -05:00
Eric Eastwood
32ae162278 Fix rejecting invite when no_longer_in_room (and other non-join transitions) 2024-08-13 14:35:24 -05:00
Eric Eastwood
a90f3d4ae2 Merge branch 'develop' into madlittlemods/sliding-sync-pre-populate-room-meta-data 2024-08-13 12:28:36 -05:00
Eric Eastwood
eb3a185cfc Fix federating backfill test 2024-08-13 12:24:53 -05:00
Eric Eastwood
517946d940 Fix lints 2024-08-12 20:31:25 -05:00
Eric Eastwood
f600eacd0d Adjust test description 2024-08-12 20:30:48 -05:00
Eric Eastwood
3423eb72d5 Add test to make sure snapshot evolves with membership 2024-08-12 20:29:58 -05:00
Eric Eastwood
5589ae48ca Add test for remote invite rejected/retracted 2024-08-12 20:14:14 -05:00
Eric Eastwood
83a5858083 Add tests for remote invites 2024-08-12 19:57:28 -05:00
Eric Eastwood
3e1f24ea11 User ID is not unique because user is joined to many rooms 2024-08-12 19:46:23 -05:00
Eric Eastwood
ab074f5335 Fix events from rooms we're not joined to affecting the joined room stream ordering 2024-08-12 19:40:53 -05:00
Eric Eastwood
53232e6df5 Fill in for remote invites (out of band, outlier membership) 2024-08-12 18:14:02 -05:00
Eric Eastwood
f069659343 Fix lints 2024-08-12 15:49:40 -05:00
Eric Eastwood
552f8f496d Update descriptions 2024-08-12 15:43:06 -05:00
Eric Eastwood
0af3b4822c Refactor to sliding_sync_membership_snapshots 2024-08-12 15:10:44 -05:00
Eric Eastwood
ed47a7eff5 Fix bumping when events are persisted out of order 2024-08-12 11:27:17 -05:00
Eric Eastwood
3367422fd3 Need to fix upsert 2024-08-08 18:23:50 -05:00
Eric Eastwood
ca909013c8 Fill in stream_ordering/bump_stamp for any event being persisted 2024-08-08 17:49:15 -05:00
Eric Eastwood
cc2d2b6b9f Fill in stream_ordering/bump_stamp when we add current state to the joined rooms table 2024-08-08 15:41:55 -05:00
Eric Eastwood
bc3796d333 Fix some lints 2024-08-07 20:49:46 -05:00
Eric Eastwood
5cf3ad3d7f Handle server left room 2024-08-07 20:47:13 -05:00
Eric Eastwood
bf78692ba0 Handle to_delete 2024-08-07 20:09:53 -05:00
Eric Eastwood
a1aaa47dad Add more tests 2024-08-07 19:58:51 -05:00
Eric Eastwood
c590474757 Test non-joins 2024-08-07 19:24:58 -05:00
Eric Eastwood
5b1053f23e Better test assertions 2024-08-07 19:07:43 -05:00
Eric Eastwood
68a3daf605 Fix comparison and insert 2024-08-07 18:10:24 -05:00
Eric Eastwood
61cea4e9b7 Closer to right 2024-08-07 18:07:53 -05:00
Eric Eastwood
87d95615d4 Change to updating the latest membership in the room 2024-08-07 16:37:17 -05:00
Eric Eastwood
cb335805d4 Server left room test 2024-08-07 10:46:34 -05:00
Eric Eastwood
2f3bd27284 Test is running 2024-08-06 16:50:14 -05:00
Eric Eastwood
f96d0c36a3 Special treatment for boolean columns
See 1dfa59b238/docs/development/database_schema.md (boolean-columns)
2024-08-06 15:30:51 -05:00
Eric Eastwood
1a251d5211 Fill in sliding_sync_non_join_memberships when current state changes 2024-08-06 15:18:34 -05:00
Eric Eastwood
2b5f07d714 Start of updating sliding_sync_joined_rooms 2024-08-05 22:34:21 -05:00
Eric Eastwood
ad1c887b4c Merge branch 'develop' into madlittlemods/sliding-sync-pre-populate-room-meta-data 2024-08-05 18:19:17 -05:00
Eric Eastwood
8392d6ac3b Use foreign keys 2024-07-31 19:00:55 -05:00
Eric Eastwood
e7e9cb289d Add changelog 2024-07-31 18:43:50 -05:00
Eric Eastwood
d26ac746d4 Start thinking about schemas 2024-07-31 18:40:23 -05:00
39 changed files with 8505 additions and 728 deletions

1
changelog.d/17512.misc Normal file
View File

@@ -0,0 +1 @@
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.

1
changelog.d/17599.misc Normal file
View File

@@ -0,0 +1 @@
Store sliding sync per-connection state in the database.

1
changelog.d/17600.misc Normal file
View File

@@ -0,0 +1 @@
Make the sliding sync `PerConnectionState` class immutable.

View File

@@ -38,6 +38,7 @@ from mypy.types import (
NoneType,
TupleType,
TypeAliasType,
TypeVarType,
UninhabitedType,
UnionType,
)
@@ -233,6 +234,7 @@ IMMUTABLE_CUSTOM_TYPES = {
"synapse.synapse_rust.push.FilteredPushRules",
# This is technically not immutable, but close enough.
"signedjson.types.VerifyKey",
"synapse.types.StrCollection",
}
# Immutable containers only if the values are also immutable.
@@ -298,7 +300,7 @@ def is_cacheable(
elif rt.type.fullname in MUTABLE_CONTAINER_TYPES:
# Mutable containers are mutable regardless of their underlying type.
return False, None
return False, f"container {rt.type.fullname} is mutable"
elif "attrs" in rt.type.metadata:
# attrs classes are only cachable iff it is frozen (immutable itself)
@@ -318,6 +320,9 @@ def is_cacheable(
else:
return False, "non-frozen attrs class"
elif rt.type.is_enum:
# We assume Enum values are immutable
return True, None
else:
# Ensure we fail for unknown types, these generally means that the
# above code is not complete.
@@ -326,6 +331,18 @@ def is_cacheable(
f"Don't know how to handle {rt.type.fullname} return type instance",
)
elif isinstance(rt, TypeVarType):
# We consider TypeVars immutable if they are bound to a set of immutable
# types.
if rt.values:
for value in rt.values:
ok, note = is_cacheable(value, signature, verbose)
if not ok:
return False, f"TypeVar bound not cacheable {value}"
return True, None
return False, "TypeVar is unbound"
elif isinstance(rt, NoneType):
# None is cachable.
return True, None

View File

@@ -129,6 +129,11 @@ BOOLEAN_COLUMNS = {
"remote_media_cache": ["authenticated"],
"room_stats_state": ["is_federatable"],
"rooms": ["is_public", "has_auth_chain_index"],
"sliding_sync_joined_rooms": ["is_encrypted"],
"sliding_sync_membership_snapshots": [
"has_known_state",
"is_encrypted",
],
"users": ["shadow_banned", "approved", "locked", "suspended"],
"un_partial_stated_event_stream": ["rejection_status_changed"],
"users_who_share_rooms": ["share_private"],

View File

@@ -245,6 +245,8 @@ class EventContentFields:
# `m.room.encryption`` algorithm field
ENCRYPTION_ALGORITHM: Final = "algorithm"
TOMBSTONE_SUCCESSOR_ROOM: Final = "replacement_room"
class EventUnsignedContentFields:
"""Fields found inside the 'unsigned' data on events"""

View File

@@ -98,6 +98,7 @@ from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.databases.main.search import SearchStore
from synapse.storage.databases.main.session import SessionStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
from synapse.storage.databases.main.sliding_sync import SlidingSyncStore
from synapse.storage.databases.main.state import StateGroupWorkerStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.stream import StreamWorkerStore
@@ -159,6 +160,7 @@ class GenericWorkerStore(
SessionStore,
TaskSchedulerWorkerStore,
ExperimentalFeaturesStore,
SlidingSyncStore,
):
# Properties that multiple storage classes define. Tell mypy what the
# expected type is.

View File

@@ -17,6 +17,7 @@ import logging
from itertools import chain
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Dict,
List,
@@ -45,13 +46,6 @@ from synapse.events.utils import parse_stripped_state_event, strip_event
from synapse.handlers.relations import BundledAggregations
from synapse.handlers.sliding_sync.extensions import SlidingSyncExtensionHandler
from synapse.handlers.sliding_sync.store import SlidingSyncConnectionStore
from synapse.handlers.sliding_sync.types import (
HaveSentRoomFlag,
MutablePerConnectionState,
PerConnectionState,
RoomSyncConfig,
StateValues,
)
from synapse.logging.opentracing import (
SynapseTags,
log_kv,
@@ -83,7 +77,17 @@ from synapse.types import (
StreamToken,
UserID,
)
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
from synapse.types.handlers.sliding_sync import (
SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
HaveSentRoomFlag,
MutablePerConnectionState,
OperationType,
PerConnectionState,
RoomSyncConfig,
SlidingSyncConfig,
SlidingSyncResult,
StateValues,
)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import concurrently_execute
from synapse.visibility import filter_events_for_client
@@ -107,18 +111,6 @@ class Sentinel(enum.Enum):
UNSET_SENTINEL = object()
# The event types that clients should consider as new activity.
DEFAULT_BUMP_EVENT_TYPES = {
EventTypes.Create,
EventTypes.Message,
EventTypes.Encrypted,
EventTypes.Sticker,
EventTypes.CallInvite,
EventTypes.PollStart,
EventTypes.LiveLocationShareStart,
}
@attr.s(slots=True, frozen=True, auto_attribs=True)
class _RoomMembershipForUser:
"""
@@ -206,7 +198,7 @@ class SlidingSyncHandler:
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
self.is_mine_id = hs.is_mine_id
self.connection_store = SlidingSyncConnectionStore()
self.connection_store = SlidingSyncConnectionStore(self.store)
self.extensions = SlidingSyncExtensionHandler(hs)
async def wait_for_sync_for_user(
@@ -330,11 +322,7 @@ class SlidingSyncHandler:
sync_config, from_token
)
)
await self.connection_store.mark_token_seen(
sync_config=sync_config,
from_token=from_token,
)
new_connection_state = previous_connection_state.get_mutable()
# Get all of the room IDs that the user should be able to see in the sync
# response
@@ -352,6 +340,10 @@ class SlidingSyncHandler:
)
)
lists_to_rooms: Mapping[str, AbstractSet[str]] = {}
if previous_connection_state is not None:
lists_to_rooms = previous_connection_state.list_to_rooms
# Assemble sliding window lists
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
# Keep track of the rooms that we can display and need to fetch more info about
@@ -368,13 +360,26 @@ class SlidingSyncHandler:
for list_key, list_config in sync_config.lists.items():
# Apply filters
filtered_sync_room_map = sync_room_map
if list_config.filters is not None:
filtered_sync_room_map = await self.filter_rooms(
sync_config.user,
sync_room_map,
list_config.filters,
to_token,
previous_found_rooms = lists_to_rooms.get(list_key)
if previous_found_rooms:
filtered_sync_room_map = {
room_id: sync_room_map[room_id]
for room_id in previous_found_rooms
}
# TODO: Record changes to the list.
else:
filtered_sync_room_map = sync_room_map
if list_config.filters is not None:
filtered_sync_room_map = await self.filter_rooms(
sync_config.user,
sync_room_map,
list_config.filters,
to_token,
)
new_connection_state.list_to_rooms[list_key] = set(
filtered_sync_room_map.keys()
)
# Find which rooms are partially stated and may need to be filtered out
@@ -430,15 +435,11 @@ class SlidingSyncHandler:
room_id
)
if existing_room_sync_config is not None:
existing_room_sync_config.combine_room_sync_config(
room_sync_config = existing_room_sync_config.combine_room_sync_config(
room_sync_config
)
else:
# Make a copy so if we modify it later, it doesn't
# affect all references.
relevant_room_map[room_id] = (
room_sync_config.deep_copy()
)
relevant_room_map[room_id] = room_sync_config
room_ids_in_list.append(room_id)
@@ -503,11 +504,13 @@ class SlidingSyncHandler:
# and need to fetch more info about.
existing_room_sync_config = relevant_room_map.get(room_id)
if existing_room_sync_config is not None:
existing_room_sync_config.combine_room_sync_config(
room_sync_config
room_sync_config = (
existing_room_sync_config.combine_room_sync_config(
room_sync_config
)
)
else:
relevant_room_map[room_id] = room_sync_config
relevant_room_map[room_id] = room_sync_config
# Fetch room data
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
@@ -569,8 +572,6 @@ class SlidingSyncHandler:
if room_id in rooms_should_send
}
new_connection_state = previous_connection_state.get_mutable()
@trace
@tag_args
async def handle_room(room_id: str) -> None:
@@ -2229,7 +2230,9 @@ class SlidingSyncHandler:
# Figure out the last bump event in the room
last_bump_event_result = (
await self.store.get_last_event_pos_in_room_before_stream_ordering(
room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
room_id,
to_token.room_key,
event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
)
)

View File

@@ -19,11 +19,6 @@ from typing_extensions import assert_never
from synapse.api.constants import AccountDataTypes
from synapse.handlers.receipts import ReceiptEventSource
from synapse.handlers.sliding_sync.types import (
HaveSentRoomFlag,
MutablePerConnectionState,
PerConnectionState,
)
from synapse.logging.opentracing import trace
from synapse.types import (
DeviceListUpdates,
@@ -32,7 +27,14 @@ from synapse.types import (
SlidingSyncStreamToken,
StreamToken,
)
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
from synapse.types.handlers.sliding_sync import (
HaveSentRoomFlag,
MutablePerConnectionState,
OperationType,
PerConnectionState,
SlidingSyncConfig,
SlidingSyncResult,
)
if TYPE_CHECKING:
from synapse.server import HomeServer

View File

@@ -13,18 +13,18 @@
#
import logging
from typing import TYPE_CHECKING, Dict, Optional, Tuple
from typing import TYPE_CHECKING, Optional
import attr
from synapse.api.errors import SlidingSyncUnknownPosition
from synapse.handlers.sliding_sync.types import (
from synapse.logging.opentracing import trace
from synapse.storage.databases.main import DataStore
from synapse.types import SlidingSyncStreamToken
from synapse.types.handlers.sliding_sync import (
MutablePerConnectionState,
PerConnectionState,
SlidingSyncConfig,
)
from synapse.logging.opentracing import trace
from synapse.types import SlidingSyncStreamToken
from synapse.types.handlers import SlidingSyncConfig
if TYPE_CHECKING:
pass
@@ -61,20 +61,7 @@ class SlidingSyncConnectionStore:
to mapping of room ID to `HaveSentRoom`.
"""
# `(user_id, conn_id)` -> `connection_position` -> `PerConnectionState`
_connections: Dict[Tuple[str, str], Dict[int, PerConnectionState]] = attr.Factory(
dict
)
async def is_valid_token(
self, sync_config: SlidingSyncConfig, connection_token: int
) -> bool:
"""Return whether the connection token is valid/recognized"""
if connection_token == 0:
return True
conn_key = self._get_connection_key(sync_config)
return connection_token in self._connections.get(conn_key, {})
store: "DataStore"
async def get_per_connection_state(
self,
@@ -86,23 +73,20 @@ class SlidingSyncConnectionStore:
Raises:
SlidingSyncUnknownPosition if the connection_token is unknown
"""
if from_token is None:
if from_token is None or from_token.connection_position == 0:
return PerConnectionState()
connection_position = from_token.connection_position
if connection_position == 0:
# Initial sync (request without a `from_token`) starts at `0` so
# there is no existing per-connection state
return PerConnectionState()
conn_id = sync_config.conn_id or ""
conn_key = self._get_connection_key(sync_config)
sync_statuses = self._connections.get(conn_key, {})
connection_state = sync_statuses.get(connection_position)
device_id = sync_config.requester.device_id
assert device_id is not None
if connection_state is None:
raise SlidingSyncUnknownPosition()
return connection_state
return await self.store.get_per_connection_state(
sync_config.user.to_string(),
device_id,
conn_id,
from_token.connection_position,
)
@trace
async def record_new_state(
@@ -116,85 +100,24 @@ class SlidingSyncConnectionStore:
If there are no changes to the state this may return the same token as
the existing per-connection state.
"""
prev_connection_token = 0
if from_token is not None:
prev_connection_token = from_token.connection_position
if not new_connection_state.has_updates():
return prev_connection_token
if from_token is not None:
return from_token.connection_position
else:
return 0
conn_key = self._get_connection_key(sync_config)
sync_statuses = self._connections.setdefault(conn_key, {})
if from_token is not None and from_token.connection_position == 0:
from_token = None
# Generate a new token, removing any existing entries in that token
# (which can happen if requests get resent).
new_store_token = prev_connection_token + 1
sync_statuses.pop(new_store_token, None)
# We copy the `MutablePerConnectionState` so that the inner `ChainMap`s
# don't grow forever.
sync_statuses[new_store_token] = new_connection_state.copy()
return new_store_token
@trace
async def mark_token_seen(
self,
sync_config: SlidingSyncConfig,
from_token: Optional[SlidingSyncStreamToken],
) -> None:
"""We have received a request with the given token, so we can clear out
any other tokens associated with the connection.
If there is no from token then we have started afresh, and so we delete
all tokens associated with the device.
"""
# Clear out any tokens for the connection that doesn't match the one
# from the request.
conn_key = self._get_connection_key(sync_config)
sync_statuses = self._connections.pop(conn_key, {})
if from_token is None:
return
sync_statuses = {
connection_token: room_statuses
for connection_token, room_statuses in sync_statuses.items()
if connection_token == from_token.connection_position
}
if sync_statuses:
self._connections[conn_key] = sync_statuses
@staticmethod
def _get_connection_key(sync_config: SlidingSyncConfig) -> Tuple[str, str]:
"""Return a unique identifier for this connection.
The first part is simply the user ID.
The second part is generally a combination of device ID and conn_id.
However, both these two are optional (e.g. puppet access tokens don't
have device IDs), so this handles those edge cases.
We use this over the raw `conn_id` to avoid clashes between different
clients that use the same `conn_id`. Imagine a user uses a web client
that uses `conn_id: main_sync_loop` and an Android client that also has
a `conn_id: main_sync_loop`.
"""
user_id = sync_config.user.to_string()
# Only one sliding sync connection is allowed per given conn_id (empty
# or not).
conn_id = sync_config.conn_id or ""
if sync_config.requester.device_id:
return (user_id, f"D/{sync_config.requester.device_id}/{conn_id}")
device_id = sync_config.requester.device_id
assert device_id is not None
if sync_config.requester.access_token_id:
# If we don't have a device, then the access token ID should be a
# stable ID.
return (user_id, f"A/{sync_config.requester.access_token_id}/{conn_id}")
# If we have neither then its likely an AS or some weird token. Either
# way we can just fail here.
raise Exception("Cannot use sliding sync with access token type")
return await self.store.persist_per_connection_state(
sync_config.user.to_string(),
device_id,
conn_id,
from_token.connection_position if from_token else None,
new_connection_state,
)

View File

@@ -502,8 +502,15 @@ class EventsPersistenceStorageController:
"""
state = await self._calculate_current_state(room_id)
delta = await self._calculate_state_delta(room_id, state)
sliding_sync_table_changes = (
await self.persist_events_store._calculate_sliding_sync_table_changes(
room_id, [], delta
)
)
await self.persist_events_store.update_current_state(room_id, delta)
await self.persist_events_store.update_current_state(
room_id, delta, sliding_sync_table_changes
)
async def _calculate_current_state(self, room_id: str) -> StateMap[str]:
"""Calculate the current state of a room, based on the forward extremities

View File

@@ -35,6 +35,7 @@ from typing import (
Iterable,
Iterator,
List,
Mapping,
Optional,
Sequence,
Tuple,
@@ -64,6 +65,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.background_updates import BackgroundUpdater
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.types import Connection, Cursor, SQLQueryParameters
from synapse.types import StrCollection
from synapse.util.async_helpers import delay_cancellation
from synapse.util.iterutils import batch_iter
@@ -1095,6 +1097,50 @@ class DatabasePool:
txn.execute(sql, vals)
@staticmethod
def simple_insert_returning_txn(
txn: LoggingTransaction,
table: str,
values: Dict[str, Any],
returning: StrCollection,
) -> Tuple[Any, ...]:
"""Executes a `INSERT INTO... RETURNING...` statement (or equivalent for
SQLite versions that don't support it).
"""
if txn.database_engine.supports_returning:
keys, vals = zip(*values.items())
sql = "INSERT INTO %s (%s) VALUES(%s) RETURNING %s" % (
table,
", ".join(k for k in keys),
", ".join("?" for _ in keys),
", ".join(k for k in returning),
)
txn.execute(sql, vals)
row = txn.fetchone()
assert row is not None
return row
else:
# For old versions of SQLite we do a standard insert and then can
# use `last_insert_rowid` to get at the row we just inserted
DatabasePool.simple_insert_txn(
txn,
table=table,
values=values,
)
txn.execute("SELECT last_insert_rowid()")
row = txn.fetchone()
assert row is not None
(rowid,) = row
row = DatabasePool.simple_select_one_txn(
txn, table=table, keyvalues={"rowid": rowid}, retcols=returning
)
assert row is not None
return row
async def simple_insert_many(
self,
table: str,
@@ -1254,9 +1300,9 @@ class DatabasePool:
self,
txn: LoggingTransaction,
table: str,
keyvalues: Dict[str, Any],
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
keyvalues: Mapping[str, Any],
values: Mapping[str, Any],
insertion_values: Optional[Mapping[str, Any]] = None,
where_clause: Optional[str] = None,
) -> bool:
"""
@@ -1299,9 +1345,9 @@ class DatabasePool:
self,
txn: LoggingTransaction,
table: str,
keyvalues: Dict[str, Any],
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
keyvalues: Mapping[str, Any],
values: Mapping[str, Any],
insertion_values: Optional[Mapping[str, Any]] = None,
where_clause: Optional[str] = None,
lock: bool = True,
) -> bool:
@@ -1322,7 +1368,7 @@ class DatabasePool:
if lock:
# We need to lock the table :(
self.engine.lock_table(txn, table)
txn.database_engine.lock_table(txn, table)
def _getwhere(key: str) -> str:
# If the value we're passing in is None (aka NULL), we need to use
@@ -1376,13 +1422,13 @@ class DatabasePool:
# successfully inserted
return True
@staticmethod
def simple_upsert_txn_native_upsert(
self,
txn: LoggingTransaction,
table: str,
keyvalues: Dict[str, Any],
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
keyvalues: Mapping[str, Any],
values: Mapping[str, Any],
insertion_values: Optional[Mapping[str, Any]] = None,
where_clause: Optional[str] = None,
) -> bool:
"""
@@ -1535,8 +1581,8 @@ class DatabasePool:
self.simple_upsert_txn_emulated(txn, table, _keys, _vals, lock=False)
@staticmethod
def simple_upsert_many_txn_native_upsert(
self,
txn: LoggingTransaction,
table: str,
key_names: Collection[str],
@@ -1966,8 +2012,8 @@ class DatabasePool:
def simple_update_txn(
txn: LoggingTransaction,
table: str,
keyvalues: Dict[str, Any],
updatevalues: Dict[str, Any],
keyvalues: Mapping[str, Any],
updatevalues: Mapping[str, Any],
) -> int:
"""
Update rows in the given database table.

View File

@@ -33,6 +33,7 @@ from synapse.storage.database import (
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.databases.main.sliding_sync import SlidingSyncStore
from synapse.storage.databases.main.stats import UserSortOrder
from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.types import Cursor
@@ -156,6 +157,7 @@ class DataStore(
LockStore,
SessionStore,
TaskSchedulerWorkerStore,
SlidingSyncStore,
):
def __init__(
self,

View File

@@ -313,6 +313,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
"get_unread_event_push_actions_by_room_for_user", (room_id,)
)
self._attempt_to_invalidate_cache("_get_max_event_pos", (room_id,))
# The `_get_membership_from_event_id` is immutable, except for the
# case where we look up an event *before* persisting it.
self._attempt_to_invalidate_cache("_get_membership_from_event_id", (event_id,))

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -457,6 +457,8 @@ class EventsWorkerStore(SQLBaseStore):
) -> Optional[EventBase]:
"""Get an event from the database by event_id.
Events for unknown room versions will also be filtered out.
Args:
event_id: The event_id of the event to fetch
@@ -511,6 +513,10 @@ class EventsWorkerStore(SQLBaseStore):
) -> Dict[str, EventBase]:
"""Get events from the database
Unknown events will be omitted from the response.
Events for unknown room versions will also be filtered out.
Args:
event_ids: The event_ids of the events to fetch
@@ -553,6 +559,8 @@ class EventsWorkerStore(SQLBaseStore):
Unknown events will be omitted from the response.
Events for unknown room versions will also be filtered out.
Args:
event_ids: The event_ids of the events to fetch

View File

@@ -454,6 +454,10 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# so must be deleted first.
"local_current_membership",
"room_memberships",
# Note: the sliding_sync_ tables have foreign keys to the `events` table
# so must be deleted first.
"sliding_sync_joined_rooms",
"sliding_sync_membership_snapshots",
"events",
"federation_inbound_events_staging",
"receipts_graph",

View File

@@ -1337,6 +1337,12 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
keyvalues={"user_id": user_id, "room_id": room_id},
updatevalues={"forgotten": 1},
)
self.db_pool.simple_update_txn(
txn,
table="sliding_sync_membership_snapshots",
keyvalues={"user_id": user_id, "room_id": room_id},
updatevalues={"forgotten": 1},
)
self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
self._invalidate_cache_and_stream(

View File

@@ -0,0 +1,506 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
from typing import TYPE_CHECKING, AbstractSet, Dict, List, Mapping, Optional, Set, cast
import attr
from synapse.api.errors import SlidingSyncUnknownPosition
from synapse.logging.opentracing import log_kv
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import LoggingTransaction
from synapse.types import MultiWriterStreamToken, RoomStreamToken
from synapse.types.handlers.sliding_sync import (
HaveSentRoom,
HaveSentRoomFlag,
MutablePerConnectionState,
PerConnectionState,
RoomStatusMap,
RoomSyncConfig,
)
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
if TYPE_CHECKING:
from synapse.storage.databases.main import DataStore
class SlidingSyncStore(SQLBaseStore):
async def persist_per_connection_state(
self,
user_id: str,
device_id: str,
conn_id: str,
previous_connection_position: Optional[int],
per_connection_state: "MutablePerConnectionState",
) -> int:
"""Persist updates to the per-connection state for a sliding sync
connection.
Returns:
The connection position of the newly persisted state.
"""
store = cast("DataStore", self)
return await self.db_pool.runInteraction(
"persist_per_connection_state",
self.persist_per_connection_state_txn,
user_id=user_id,
device_id=device_id,
conn_id=conn_id,
previous_connection_position=previous_connection_position,
per_connection_state=await PerConnectionStateDB.from_state(
per_connection_state, store
),
)
def persist_per_connection_state_txn(
self,
txn: LoggingTransaction,
user_id: str,
device_id: str,
conn_id: str,
previous_connection_position: Optional[int],
per_connection_state: "PerConnectionStateDB",
) -> int:
# First we fetch the (or create) the connection key associated with the
# previous connection position.
if previous_connection_position is not None:
# The `previous_connection_position` is a user-supplied value, so we
# need to make sure that the one they supplied is actually theirs.
sql = """
SELECT connection_key
FROM sliding_sync_connection_positions
INNER JOIN sliding_sync_connections USING (connection_key)
WHERE
connection_position = ?
AND user_id = ? AND device_id = ? AND conn_id = ?
"""
txn.execute(
sql, (previous_connection_position, user_id, device_id, conn_id)
)
row = txn.fetchone()
if row is None:
raise SlidingSyncUnknownPosition()
(connection_key,) = row
else:
# We're restarting the connection, so we clear all existing
# connections. We do this here to ensure that if we get lots of
# one-shot requests we don't stack up lots of entries.
self.db_pool.simple_delete_txn(
txn,
table="sliding_sync_connections",
keyvalues={
"user_id": user_id,
"device_id": device_id,
"conn_id": conn_id,
},
)
(connection_key,) = self.db_pool.simple_insert_returning_txn(
txn,
table="sliding_sync_connections",
values={
"user_id": user_id,
"device_id": device_id,
"conn_id": conn_id,
"created_ts": self._clock.time_msec(),
},
returning=("connection_key",),
)
# Define a new connection position for the updates
(connection_position,) = self.db_pool.simple_insert_returning_txn(
txn,
table="sliding_sync_connection_positions",
values={
"connection_key": connection_key,
"created_ts": self._clock.time_msec(),
},
returning=("connection_position",),
)
# We need to deduplicate the `required_state` JSON. We do this by
# fetching all JSON associated with the connection and comparing that
# with the updates to `required_state`
# Dict from required state json -> required state ID
required_state_to_id: Dict[str, int] = {}
if previous_connection_position is not None:
rows = self.db_pool.simple_select_list_txn(
txn,
table="sliding_sync_connection_required_state",
keyvalues={"connection_key": connection_key},
retcols=("required_state_id", "required_state"),
)
for required_state_id, required_state in rows:
required_state_to_id[required_state] = required_state_id
room_to_state_ids: Dict[str, int] = {}
unique_required_state: Dict[str, List[str]] = {}
for room_id, room_state in per_connection_state.room_configs.items():
serialized_state = json_encoder.encode(
# We store the required state as a sorted list of event type /
# state key tuples.
sorted(
(event_type, state_key)
for event_type, state_keys in room_state.required_state_map.items()
for state_key in state_keys
)
)
existing_state_id = required_state_to_id.get(serialized_state)
if existing_state_id is not None:
room_to_state_ids[room_id] = existing_state_id
else:
unique_required_state.setdefault(serialized_state, []).append(room_id)
# Insert any new `required_state` json we haven't previously seen.
for serialized_required_state, room_ids in unique_required_state.items():
(required_state_id,) = self.db_pool.simple_insert_returning_txn(
txn,
table="sliding_sync_connection_required_state",
values={
"connection_key": connection_key,
"required_state": serialized_required_state,
},
returning=("required_state_id",),
)
for room_id in room_ids:
room_to_state_ids[room_id] = required_state_id
# Copy over state from the previous connection position (we'll overwrite
# these rows with any changes).
if previous_connection_position is not None:
sql = """
INSERT INTO sliding_sync_connection_streams
(connection_position, stream, room_id, room_status, last_position)
SELECT ?, stream, room_id, room_status, last_position
FROM sliding_sync_connection_streams
WHERE connection_position = ?
"""
txn.execute(sql, (connection_position, previous_connection_position))
sql = """
INSERT INTO sliding_sync_connection_room_configs
(connection_position, room_id, timeline_limit, required_state_id)
SELECT ?, room_id, timeline_limit, required_state_id
FROM sliding_sync_connection_room_configs
WHERE connection_position = ?
"""
txn.execute(sql, (connection_position, previous_connection_position))
# We now upsert the changes to the various streams.
key_values = []
value_values = []
for room_id, have_sent_room in per_connection_state.rooms._statuses.items():
key_values.append((connection_position, "rooms", room_id))
value_values.append(
(have_sent_room.status.value, have_sent_room.last_token)
)
for room_id, have_sent_room in per_connection_state.receipts._statuses.items():
key_values.append((connection_position, "receipts", room_id))
value_values.append(
(have_sent_room.status.value, have_sent_room.last_token)
)
self.db_pool.simple_upsert_many_txn(
txn,
table="sliding_sync_connection_streams",
key_names=(
"connection_position",
"stream",
"room_id",
),
key_values=key_values,
value_names=(
"room_status",
"last_position",
),
value_values=value_values,
)
# ... and upsert changes to the room configs.
keys = []
values = []
for room_id, room_config in per_connection_state.room_configs.items():
keys.append((connection_position, room_id))
values.append((room_config.timeline_limit, room_to_state_ids[room_id]))
self.db_pool.simple_upsert_many_txn(
txn,
table="sliding_sync_connection_room_configs",
key_names=(
"connection_position",
"room_id",
),
key_values=keys,
value_names=(
"timeline_limit",
"required_state_id",
),
value_values=values,
)
# Persist changes to the room lists
for list_name, list_room_ids in per_connection_state.list_to_rooms.items():
self.db_pool.simple_delete_txn(
txn,
table="sliding_sync_connection_room_lists",
keyvalues={"connection_key": connection_key, "list_name": list_name},
)
self.db_pool.simple_insert_many_txn(
txn,
table="sliding_sync_connection_room_lists",
keys=("connection_key", "list_name", "room_id"),
values=[
(connection_key, list_name, room_id) for room_id in list_room_ids
],
)
return connection_position
@cached(iterable=True, max_entries=100000)
async def get_per_connection_state(
self, user_id: str, device_id: str, conn_id: str, connection_position: int
) -> "PerConnectionState":
"""Get the per-connection state for the given connection position."""
per_connection_state_db = await self.db_pool.runInteraction(
"get_per_connection_state",
self._get_per_connection_state_txn,
user_id=user_id,
device_id=device_id,
conn_id=conn_id,
connection_position=connection_position,
)
store = cast("DataStore", self)
return await per_connection_state_db.to_state(store)
def _get_per_connection_state_txn(
self,
txn: LoggingTransaction,
user_id: str,
device_id: str,
conn_id: str,
connection_position: int,
) -> "PerConnectionStateDB":
# The `previous_connection_position` is a user-supplied value, so we
# need to make sure that the one they supplied is actually theirs.
sql = """
SELECT connection_key
FROM sliding_sync_connection_positions
INNER JOIN sliding_sync_connections USING (connection_key)
WHERE
connection_position = ?
AND user_id = ? AND device_id = ? AND conn_id = ?
"""
txn.execute(sql, (connection_position, user_id, device_id, conn_id))
row = txn.fetchone()
if row is None:
raise SlidingSyncUnknownPosition()
(connection_key,) = row
# Now that we have seen the client has received and used the connection
# position, we can delete all the other connection positions.
sql = """
DELETE FROM sliding_sync_connection_positions
WHERE connection_key = ? AND connection_position != ?
"""
txn.execute(sql, (connection_key, connection_position))
# Fetch and create a mapping from required state ID to the actual
# required state for the connection.
rows = self.db_pool.simple_select_list_txn(
txn,
table="sliding_sync_connection_required_state",
keyvalues={"connection_key": connection_key},
retcols=(
"required_state_id",
"required_state",
),
)
required_state_map: Dict[int, Dict[str, Set[str]]] = {}
for row in rows:
state = required_state_map[row[0]] = {}
for event_type, state_keys in db_to_json(row[1]):
state[event_type] = set(state_keys)
# Get all the room configs, looking up the required state from the map
# above.
room_config_rows = self.db_pool.simple_select_list_txn(
txn,
table="sliding_sync_connection_room_configs",
keyvalues={"connection_position": connection_position},
retcols=(
"room_id",
"timeline_limit",
"required_state_id",
),
)
room_configs: Dict[str, RoomSyncConfig] = {}
for (
room_id,
timeline_limit,
required_state_id,
) in room_config_rows:
room_configs[room_id] = RoomSyncConfig(
timeline_limit=timeline_limit,
required_state_map=required_state_map[required_state_id],
)
# Now look up the per-room stream data.
rooms: Dict[str, HaveSentRoom[str]] = {}
receipts: Dict[str, HaveSentRoom[str]] = {}
receipt_rows = self.db_pool.simple_select_list_txn(
txn,
table="sliding_sync_connection_streams",
keyvalues={"connection_position": connection_position},
retcols=(
"stream",
"room_id",
"room_status",
"last_position",
),
)
for stream, room_id, room_status, last_position in receipt_rows:
have_sent_room: HaveSentRoom[str] = HaveSentRoom(
status=HaveSentRoomFlag(room_status), last_token=last_position
)
if stream == "rooms":
rooms[room_id] = have_sent_room
elif stream == "receipts":
receipts[room_id] = have_sent_room
# Fetch any stored lists for the connection
rows = self.db_pool.simple_select_list_txn(
txn,
table="sliding_sync_connection_room_lists",
keyvalues={
connection_key: connection_key,
},
retcols=("list_name", "room_id"),
)
list_to_rooms: Dict[str, Set[str]] = {}
for list_name, room_id in rows:
list_to_rooms.setdefault(list_name, set()).add(room_id)
return PerConnectionStateDB(
rooms=RoomStatusMap(rooms),
receipts=RoomStatusMap(receipts),
room_configs=room_configs,
list_to_rooms=list_to_rooms,
)
@attr.s(auto_attribs=True, frozen=True)
class PerConnectionStateDB:
"""An equivalent to `PerConnectionState` that holds data in a format stored
in the DB.
The principle difference is that the tokens for the different streams are
serialized to strings.
When persisting this *only* contains updates to the state.
"""
rooms: "RoomStatusMap[str]"
receipts: "RoomStatusMap[str]"
room_configs: Mapping[str, "RoomSyncConfig"]
list_to_rooms: Mapping[str, AbstractSet[str]]
@staticmethod
async def from_state(
per_connection_state: "MutablePerConnectionState", store: "DataStore"
) -> "PerConnectionStateDB":
"""Convert from a standard `PerConnectionState`"""
rooms = {
room_id: HaveSentRoom(
status=status.status,
last_token=(
await status.last_token.to_string(store)
if status.last_token is not None
else None
),
)
for room_id, status in per_connection_state.rooms.get_updates().items()
}
receipts = {
room_id: HaveSentRoom(
status=status.status,
last_token=(
await status.last_token.to_string(store)
if status.last_token is not None
else None
),
)
for room_id, status in per_connection_state.receipts.get_updates().items()
}
log_kv(
{
"rooms": rooms,
"receipts": receipts,
"room_configs": per_connection_state.room_configs.maps[0],
}
)
return PerConnectionStateDB(
rooms=RoomStatusMap(rooms),
receipts=RoomStatusMap(receipts),
room_configs=per_connection_state.room_configs.maps[0],
list_to_rooms=per_connection_state.list_to_rooms.maps[0],
)
async def to_state(self, store: "DataStore") -> "PerConnectionState":
"""Convert into a standard `PerConnectionState`"""
rooms = {
room_id: HaveSentRoom(
status=status.status,
last_token=(
await RoomStreamToken.parse(store, status.last_token)
if status.last_token is not None
else None
),
)
for room_id, status in self.rooms._statuses.items()
}
receipts = {
room_id: HaveSentRoom(
status=status.status,
last_token=(
await MultiWriterStreamToken.parse(store, status.last_token)
if status.last_token is not None
else None
),
)
for room_id, status in self.receipts._statuses.items()
}
return PerConnectionState(
rooms=RoomStatusMap(rooms),
receipts=RoomStatusMap(receipts),
room_configs=self.room_configs,
list_to_rooms=self.list_to_rooms,
)

View File

@@ -161,45 +161,80 @@ class StateDeltasStore(SQLBaseStore):
self._get_max_stream_id_in_current_state_deltas_txn,
)
def get_current_state_deltas_for_room_txn(
self,
txn: LoggingTransaction,
room_id: str,
*,
from_token: Optional[RoomStreamToken],
to_token: Optional[RoomStreamToken],
) -> List[StateDelta]:
"""
Get the state deltas between two tokens.
(> `from_token` and <= `to_token`)
"""
from_clause = ""
from_args = []
if from_token is not None:
from_clause = "AND ? < stream_id"
from_args = [from_token.stream]
to_clause = ""
to_args = []
if to_token is not None:
to_clause = "AND stream_id <= ?"
to_args = [to_token.get_max_stream_pos()]
sql = f"""
SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
FROM current_state_delta_stream
WHERE room_id = ? {from_clause} {to_clause}
ORDER BY stream_id ASC
"""
txn.execute(sql, [room_id] + from_args + to_args)
return [
StateDelta(
stream_id=row[1],
room_id=room_id,
event_type=row[2],
state_key=row[3],
event_id=row[4],
prev_event_id=row[5],
)
for row in txn
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
]
@trace
async def get_current_state_deltas_for_room(
self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
self,
room_id: str,
*,
from_token: Optional[RoomStreamToken],
to_token: Optional[RoomStreamToken],
) -> List[StateDelta]:
"""Get the state deltas between two tokens."""
"""
Get the state deltas between two tokens.
if not self._curr_state_delta_stream_cache.has_entity_changed(
room_id, from_token.stream
(> `from_token` and <= `to_token`)
"""
if (
from_token is not None
and not self._curr_state_delta_stream_cache.has_entity_changed(
room_id, from_token.stream
)
):
return []
def get_current_state_deltas_for_room_txn(
txn: LoggingTransaction,
) -> List[StateDelta]:
sql = """
SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
FROM current_state_delta_stream
WHERE room_id = ? AND ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC
"""
txn.execute(
sql, (room_id, from_token.stream, to_token.get_max_stream_pos())
)
return [
StateDelta(
stream_id=row[1],
room_id=room_id,
event_type=row[2],
state_key=row[3],
event_id=row[4],
prev_event_id=row[5],
)
for row in txn
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
]
return await self.db_pool.runInteraction(
"get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
"get_current_state_deltas_for_room",
self.get_current_state_deltas_for_room_txn,
room_id,
from_token=from_token,
to_token=to_token,
)
@trace

View File

@@ -50,6 +50,7 @@ from typing import (
Dict,
Iterable,
List,
Mapping,
Optional,
Protocol,
Set,
@@ -80,7 +81,7 @@ from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection
from synapse.util.caches.descriptors import cached
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
@@ -1263,12 +1264,76 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return None
async def get_last_event_pos_in_room(
self,
room_id: str,
event_types: Optional[StrCollection] = None,
) -> Optional[Tuple[str, PersistedEventPosition]]:
"""
Returns the ID and event position of the last event in a room.
Based on `get_last_event_pos_in_room_before_stream_ordering(...)`
Args:
room_id
event_types: Optional allowlist of event types to filter by
Returns:
The ID of the most recent event and it's position, or None if there are no
events in the room that match the given event types.
"""
def _get_last_event_pos_in_room_txn(
txn: LoggingTransaction,
) -> Optional[Tuple[str, PersistedEventPosition]]:
event_type_clause = ""
event_type_args: List[str] = []
if event_types is not None and len(event_types) > 0:
event_type_clause, event_type_args = make_in_list_sql_clause(
txn.database_engine, "type", event_types
)
event_type_clause = f"AND {event_type_clause}"
sql = f"""
SELECT event_id, stream_ordering, instance_name
FROM events
LEFT JOIN rejections USING (event_id)
WHERE room_id = ?
{event_type_clause}
AND NOT outlier
AND rejections.event_id IS NULL
ORDER BY stream_ordering DESC
LIMIT 1
"""
txn.execute(
sql,
[room_id] + event_type_args,
)
row = cast(Optional[Tuple[str, int, str]], txn.fetchone())
if row is not None:
event_id, stream_ordering, instance_name = row
return event_id, PersistedEventPosition(
# If instance_name is null we default to "master"
instance_name or "master",
stream_ordering,
)
return None
return await self.db_pool.runInteraction(
"get_last_event_pos_in_room",
_get_last_event_pos_in_room_txn,
)
@trace
async def get_last_event_pos_in_room_before_stream_ordering(
self,
room_id: str,
end_token: RoomStreamToken,
event_types: Optional[Collection[str]] = None,
event_types: Optional[StrCollection] = None,
) -> Optional[Tuple[str, PersistedEventPosition]]:
"""
Returns the ID and event position of the last event in a room at or before a
@@ -1382,7 +1447,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"""
min_token = end_token.stream
max_token = end_token.get_max_stream_pos()
results: Dict[str, int] = {}
# First, we check for the rooms in the stream change cache to see if we
@@ -1395,26 +1459,76 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
else:
missing_room_ids.add(room_id)
if not missing_room_ids:
return results
# Next, we query the stream position from the DB. At first we fetch all
# positions less than the *max* stream pos in the token, then filter
# them down. We do this as a) this is a cheaper query, and b) the vast
# majority of rooms will have a latest token from before the min stream
# pos.
def bulk_get_last_event_pos_txn(
txn: LoggingTransaction, batch_room_ids: StrCollection
uncapped_results = await self._bulk_get_max_event_pos(missing_room_ids)
# Check that the stream position for the rooms are from before the
# minimum position of the token. If not then we need to fetch more
# rows.
recheck_rooms: Set[str] = set()
for room_id, stream in uncapped_results.items():
if stream <= min_token:
results[room_id] = stream
else:
recheck_rooms.add(room_id)
if not recheck_rooms:
return results
for room_id in recheck_rooms:
result = await self.get_last_event_pos_in_room_before_stream_ordering(
room_id, end_token
)
if result is not None:
results[room_id] = result[1].stream
return results
@cached()
async def _get_max_event_pos(self, room_id: str) -> int:
raise NotImplementedError()
@cachedList(cached_method_name="_get_max_event_pos", list_name="room_ids")
async def _bulk_get_max_event_pos(
self, room_ids: StrCollection
) -> Mapping[str, int]:
"""Fetch the max position of a persisted event in the room."""
now_token = self.get_room_max_token()
max_pos = now_token.get_max_stream_pos()
results: Dict[str, int] = {}
missing_room_ids: Set[str] = set()
for room_id in room_ids:
stream_pos = self._events_stream_cache.get_max_pos_of_last_change(room_id)
if stream_pos is not None:
results[room_id] = stream_pos
else:
missing_room_ids.add(room_id)
if not missing_room_ids:
return results
def bulk_get_max_event_pos_txn(
txn: LoggingTransaction, batched_room_ids: StrCollection
) -> Dict[str, int]:
# This query fetches the latest stream position in the rooms before
# the given max position.
clause, args = make_in_list_sql_clause(
self.database_engine, "room_id", batch_room_ids
self.database_engine, "room_id", batched_room_ids
)
sql = f"""
SELECT room_id, (
SELECT stream_ordering FROM events AS e
LEFT JOIN rejections USING (event_id)
WHERE e.room_id = r.room_id
AND stream_ordering <= ?
AND e.stream_ordering <= ?
AND NOT outlier
AND rejection_reason IS NULL
ORDER BY stream_ordering DESC
@@ -1423,72 +1537,26 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
FROM rooms AS r
WHERE {clause}
"""
txn.execute(sql, [max_token] + args)
txn.execute(sql, [max_pos] + args)
return {row[0]: row[1] for row in txn}
recheck_rooms: Set[str] = set()
for batched in batch_iter(missing_room_ids, 1000):
result = await self.db_pool.runInteraction(
"bulk_get_last_event_pos_in_room_before_stream_ordering",
bulk_get_last_event_pos_txn,
batched,
for batched in batch_iter(room_ids, 1000):
batch_results = await self.db_pool.runInteraction(
"_bulk_get_max_event_pos", bulk_get_max_event_pos_txn, batched
)
# Check that the stream position for the rooms are from before the
# minimum position of the token. If not then we need to fetch more
# rows.
for room_id, stream in result.items():
if stream <= min_token:
results[room_id] = stream
for room_id, stream_ordering in batch_results.items():
if stream_ordering <= now_token.stream:
results.update(batch_results)
else:
recheck_rooms.add(room_id)
if not recheck_rooms:
return results
# For the remaining rooms we need to fetch all rows between the min and
# max stream positions in the end token, and filter out the rows that
# are after the end token.
#
# This query should be fast as the range between the min and max should
# be small.
def bulk_get_last_event_pos_recheck_txn(
txn: LoggingTransaction, batch_room_ids: StrCollection
) -> Dict[str, int]:
clause, args = make_in_list_sql_clause(
self.database_engine, "room_id", batch_room_ids
for room_id in recheck_rooms:
result = await self.get_last_event_pos_in_room_before_stream_ordering(
room_id, now_token
)
sql = f"""
SELECT room_id, instance_name, stream_ordering
FROM events
WHERE ? < stream_ordering AND stream_ordering <= ?
AND NOT outlier
AND rejection_reason IS NULL
AND {clause}
ORDER BY stream_ordering ASC
"""
txn.execute(sql, [min_token, max_token] + args)
# We take the max stream ordering that is less than the token. Since
# we ordered by stream ordering we just need to iterate through and
# take the last matching stream ordering.
txn_results: Dict[str, int] = {}
for row in txn:
room_id = row[0]
event_pos = PersistedEventPosition(row[1], row[2])
if not event_pos.persisted_after(end_token):
txn_results[room_id] = event_pos.stream
return txn_results
for batched in batch_iter(recheck_rooms, 1000):
recheck_result = await self.db_pool.runInteraction(
"bulk_get_last_event_pos_in_room_before_stream_ordering_recheck",
bulk_get_last_event_pos_recheck_txn,
batched,
)
results.update(recheck_result)
if result is not None:
results[room_id] = result[1].stream
return results

View File

@@ -28,6 +28,11 @@ if TYPE_CHECKING:
from synapse.storage.database import LoggingDatabaseConnection
# A string that will be replaced with the appropriate auto increment directive
# for the database engine, expands to an auto incrementing integer primary key.
AUTO_INCREMENT_PRIMARY_KEYPLACEHOLDER = "$%AUTO_INCREMENT_PRIMARY_KEY%$"
class IsolationLevel(IntEnum):
READ_COMMITTED: int = 1
REPEATABLE_READ: int = 2

View File

@@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, Any, Mapping, NoReturn, Optional, Tuple, cast
import psycopg2.extensions
from synapse.storage.engines._base import (
AUTO_INCREMENT_PRIMARY_KEYPLACEHOLDER,
BaseDatabaseEngine,
IncorrectDatabaseSetup,
IsolationLevel,
@@ -256,4 +257,10 @@ class PostgresEngine(
executing the script in its own transaction. The script transaction is
left open and it is the responsibility of the caller to commit it.
"""
# Replace auto increment placeholder with the appropriate directive
script = script.replace(
AUTO_INCREMENT_PRIMARY_KEYPLACEHOLDER,
"BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY",
)
cursor.execute(f"COMMIT; BEGIN TRANSACTION; {script}")

View File

@@ -25,6 +25,7 @@ import threading
from typing import TYPE_CHECKING, Any, List, Mapping, Optional
from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.engines._base import AUTO_INCREMENT_PRIMARY_KEYPLACEHOLDER
from synapse.storage.types import Cursor
if TYPE_CHECKING:
@@ -168,6 +169,11 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]):
> first. No other implicit transaction control is performed; any transaction
> control must be added to sql_script.
"""
# Replace auto increment placeholder with the appropriate directive
script = script.replace(
AUTO_INCREMENT_PRIMARY_KEYPLACEHOLDER, "INTEGER PRIMARY KEY AUTOINCREMENT"
)
# The implementation of `executescript` can be found at
# https://github.com/python/cpython/blob/3.11/Modules/_sqlite/cursor.c#L1035.
cursor.executescript(f"BEGIN TRANSACTION; {script}")

View File

@@ -19,7 +19,7 @@
#
#
SCHEMA_VERSION = 86 # remember to update the list below when updating
SCHEMA_VERSION = 87 # remember to update the list below when updating
"""Represents the expectations made by the codebase about the database schema
This should be incremented whenever the codebase changes its requirements on the
@@ -142,6 +142,10 @@ Changes in SCHEMA_VERSION = 85
Changes in SCHEMA_VERSION = 86
- Add a column `authenticated` to the tables `local_media_repository` and `remote_media_cache`
Changes in SCHEMA_VERSION = 87
- Add tables to store Sliding Sync data for quick filtering/sorting
(`sliding_sync_joined_rooms`, `sliding_sync_membership_snapshots`)
"""

View File

@@ -0,0 +1,153 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2024 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- This table is a list/queue used to keep track of which rooms need to be inserted into
-- `sliding_sync_joined_rooms`. We do this to avoid reading from `current_state_events`
-- during the background update to populate `sliding_sync_joined_rooms` which works but
-- it takes a lot of work for the database to grab `DISTINCT` room_ids given how many
-- state events there are for each room.
CREATE TABLE IF NOT EXISTS sliding_sync_joined_rooms_to_recalculate(
room_id TEXT NOT NULL REFERENCES rooms(room_id),
PRIMARY KEY (room_id)
);
-- A table for storing room meta data (current state relevant to sliding sync) that the
-- local server is still participating in (someone local is joined to the room).
--
-- We store the joined rooms in separate table from `sliding_sync_membership_snapshots`
-- because we need up-to-date information for joined rooms and it can be shared across
-- everyone who is joined.
--
-- This table is kept in sync with `current_state_events` which means if the server is
-- no longer participating in a room, the row will be deleted.
CREATE TABLE IF NOT EXISTS sliding_sync_joined_rooms(
room_id TEXT NOT NULL REFERENCES rooms(room_id),
-- The `stream_ordering` of the most-recent/latest event in the room
event_stream_ordering BIGINT NOT NULL REFERENCES events(stream_ordering),
-- The `stream_ordering` of the last event according to the `bump_event_types`
bump_stamp BIGINT,
-- `m.room.create` -> `content.type` (current state)
--
-- Useful for the `spaces`/`not_spaces` filter in the Sliding Sync API
room_type TEXT,
-- `m.room.name` -> `content.name` (current state)
--
-- Useful for the room meta data and `room_name_like` filter in the Sliding Sync API
room_name TEXT,
-- `m.room.encryption` -> `content.algorithm` (current state)
--
-- Useful for the `is_encrypted` filter in the Sliding Sync API
is_encrypted BOOLEAN DEFAULT FALSE NOT NULL,
-- `m.room.tombstone` -> `content.replacement_room` (according to the current state at the
-- time of the membership).
--
-- Useful for the `include_old_rooms` functionality in the Sliding Sync API
tombstone_successor_room_id TEXT,
PRIMARY KEY (room_id)
);
-- So we can purge rooms easily.
--
-- The primary key is already `room_id`
-- So we can sort by `stream_ordering
CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_joined_rooms_event_stream_ordering ON sliding_sync_joined_rooms(event_stream_ordering);
-- A table for storing a snapshot of room meta data (historical current state relevant
-- for sliding sync) at the time of a local user's membership. Only has rows for the
-- latest membership event for a given local user in a room which matches
-- `local_current_membership` .
--
-- We store all memberships including joins. This makes it easy to reference this table
-- to find all membership for a given user and shares the same semantics as
-- `local_current_membership`. And we get to avoid some table maintenance; if we only
-- stored non-joins, we would have to delete the row for the user when the user joins
-- the room. Stripped state doesn't include the `m.room.tombstone` event, so we just
-- assume that the room doesn't have a tombstone.
--
-- For remote invite/knocks where the server is not participating in the room, we will
-- use stripped state events to populate this table. We assume that if any stripped
-- state is given, it will include all possible stripped state events types. For
-- example, if stripped state is given but `m.room.encryption` isn't included, we will
-- assume that the room is not encrypted.
--
-- We don't include `bump_stamp` here because we can just use the `stream_ordering` from
-- the membership event itself as the `bump_stamp`.
CREATE TABLE IF NOT EXISTS sliding_sync_membership_snapshots(
room_id TEXT NOT NULL REFERENCES rooms(room_id),
user_id TEXT NOT NULL,
-- Useful to be able to tell leaves from kicks (where the `user_id` is different from the `sender`)
sender TEXT NOT NULL,
membership_event_id TEXT NOT NULL REFERENCES events(event_id),
membership TEXT NOT NULL,
-- This is an integer just to match `room_memberships` and also means we don't need
-- to do any casting.
forgotten INTEGER DEFAULT 0 NOT NULL,
-- `stream_ordering` of the `membership_event_id`
event_stream_ordering BIGINT NOT NULL REFERENCES events(stream_ordering),
-- For remote invites/knocks that don't include any stripped state, we want to be
-- able to distinguish between a room with `None` as valid value for some state and
-- room where the state is completely unknown. Basically, this should be True unless
-- no stripped state was provided for a remote invite/knock (False).
has_known_state BOOLEAN DEFAULT FALSE NOT NULL,
-- `m.room.create` -> `content.type` (according to the current state at the time of
-- the membership).
--
-- Useful for the `spaces`/`not_spaces` filter in the Sliding Sync API
room_type TEXT,
-- `m.room.name` -> `content.name` (according to the current state at the time of
-- the membership).
--
-- Useful for the room meta data and `room_name_like` filter in the Sliding Sync API
room_name TEXT,
-- `m.room.encryption` -> `content.algorithm` (according to the current state at the
-- time of the membership).
--
-- Useful for the `is_encrypted` filter in the Sliding Sync API
is_encrypted BOOLEAN DEFAULT FALSE NOT NULL,
-- `m.room.tombstone` -> `content.replacement_room` (according to the current state at the
-- time of the membership).
--
-- Useful for the `include_old_rooms` functionality in the Sliding Sync API
tombstone_successor_room_id TEXT,
PRIMARY KEY (room_id, user_id)
);
-- So we can purge rooms easily.
--
-- Since we're using a multi-column index as the primary key (room_id, user_id), the
-- first index column (room_id) is always usable for searching so we don't need to
-- create a separate index for it.
--
-- CREATE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_room_id ON sliding_sync_membership_snapshots(room_id);
-- So we can fetch all rooms for a given user
CREATE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_user_id ON sliding_sync_membership_snapshots(user_id);
-- So we can sort by `stream_ordering
CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_event_stream_ordering ON sliding_sync_membership_snapshots(event_stream_ordering);
-- Add a series of background updates to populate the new `sliding_sync_joined_rooms` table:
--
-- 1. Add a background update to prefill `sliding_sync_joined_rooms_to_recalculate`.
-- We do a one-shot bulk insert from the `rooms` table to prefill.
-- 2. Add a background update to populate the new `sliding_sync_joined_rooms` table
--
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(8701, 'sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update', '{}');
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(8701, 'sliding_sync_joined_rooms_bg_update', '{}', 'sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update');
-- Add a background updates to populate the new `sliding_sync_membership_snapshots` table
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(8701, 'sliding_sync_membership_snapshots_bg_update', '{}');

View File

@@ -0,0 +1,78 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2024 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- Table to track active sliding sync connections.
--
-- A new connection will be created for every sliding sync request without a
-- `since` token for a given `conn_id` for a device.#
--
-- Once a new connection is created and used we delete all other connections for
-- the `conn_id`.
CREATE TABLE sliding_sync_connections(
connection_key $%AUTO_INCREMENT_PRIMARY_KEY%$,
user_id TEXT NOT NULL,
device_id TEXT NOT NULL,
conn_id TEXT NOT NULL,
created_ts BIGINT NOT NULL
);
CREATE INDEX sliding_sync_connections_idx ON sliding_sync_connections(user_id, device_id, conn_id);
-- We track per-connection state by associating changes to the state with
-- connection positions. This ensures that we correctly track state even if we
-- see retries of requests.
--
-- If the client starts a "new" connection (by not specifying a since token),
-- we'll clear out the other connections (to ensure that we don't end up with
-- lots of connection keys).
CREATE TABLE sliding_sync_connection_positions(
connection_position $%AUTO_INCREMENT_PRIMARY_KEY%$,
connection_key BIGINT NOT NULL REFERENCES sliding_sync_connections(connection_key) ON DELETE CASCADE,
created_ts BIGINT NOT NULL
);
CREATE INDEX sliding_sync_connection_positions_key ON sliding_sync_connection_positions(connection_key);
-- To save space we deduplicate the `required_state` json by assigning IDs to
-- different values.
CREATE TABLE sliding_sync_connection_required_state(
required_state_id $%AUTO_INCREMENT_PRIMARY_KEY%$,
connection_key BIGINT NOT NULL REFERENCES sliding_sync_connections(connection_key) ON DELETE CASCADE,
required_state TEXT NOT NULL -- We store this as a json list of event type / state key tuples.
);
CREATE INDEX sliding_sync_connection_required_state_conn_pos ON sliding_sync_connections(connection_key);
-- Stores the room configs we have seen for rooms in a connection.
CREATE TABLE sliding_sync_connection_room_configs(
connection_position BIGINT NOT NULL REFERENCES sliding_sync_connection_positions(connection_position) ON DELETE CASCADE,
room_id TEXT NOT NULL,
timeline_limit BIGINT NOT NULL,
required_state_id BIGINT NOT NULL REFERENCES sliding_sync_connection_required_state(required_state_id)
);
CREATE UNIQUE INDEX sliding_sync_connection_room_configs_idx ON sliding_sync_connection_room_configs(connection_position, room_id);
-- Stores what data we have sent for given streams down given connections.
CREATE TABLE sliding_sync_connection_streams(
connection_position BIGINT NOT NULL REFERENCES sliding_sync_connection_positions(connection_position) ON DELETE CASCADE,
stream TEXT NOT NULL, -- e.g. "events" or "receipts"
room_id TEXT NOT NULL,
room_status TEXT NOT NULL, -- "live" or "previously", i.e. the `HaveSentRoomFlag` value
last_position TEXT -- For "previously" the token for the stream we have sent up to.
);
CREATE UNIQUE INDEX sliding_sync_connection_streams_idx ON sliding_sync_connection_streams(connection_position, room_id, stream);

View File

@@ -0,0 +1,22 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2024 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- Stores the room lists for a connection
CREATE TABLE sliding_sync_connection_room_lists(
connection_key BIGINT NOT NULL REFERENCES sliding_sync_connections(connection_key) ON DELETE CASCADE,
list_name TEXT NOT NULL,
room_id TEXT NOT NULL
);
CREATE INDEX sliding_sync_connection_room_lists_idx ON sliding_sync_connection_room_lists(connection_key);

View File

@@ -17,33 +17,9 @@
# [This file includes modifications made by New Vector Limited]
#
#
from enum import Enum
from typing import TYPE_CHECKING, Dict, Final, List, Mapping, Optional, Sequence, Tuple
import attr
from typing_extensions import TypedDict
from synapse._pydantic_compat import HAS_PYDANTIC_V2
if TYPE_CHECKING or HAS_PYDANTIC_V2:
from pydantic.v1 import Extra
else:
from pydantic import Extra
from synapse.events import EventBase
from synapse.types import (
DeviceListUpdates,
JsonDict,
JsonMapping,
Requester,
SlidingSyncStreamToken,
StreamToken,
UserID,
)
from synapse.types.rest.client import SlidingSyncBody
if TYPE_CHECKING:
from synapse.handlers.relations import BundledAggregations
from typing import List, Optional, TypedDict
class ShutdownRoomParams(TypedDict):
@@ -101,335 +77,3 @@ class ShutdownRoomResponse(TypedDict):
failed_to_kick_users: List[str]
local_aliases: List[str]
new_room_id: Optional[str]
class SlidingSyncConfig(SlidingSyncBody):
"""
Inherit from `SlidingSyncBody` since we need all of the same fields and add a few
extra fields that we need in the handler
"""
user: UserID
requester: Requester
# Pydantic config
class Config:
# By default, ignore fields that we don't recognise.
extra = Extra.ignore
# By default, don't allow fields to be reassigned after parsing.
allow_mutation = False
# Allow custom types like `UserID` to be used in the model
arbitrary_types_allowed = True
class OperationType(Enum):
"""
Represents the operation types in a Sliding Sync window.
Attributes:
SYNC: Sets a range of entries. Clients SHOULD discard what they previous knew about
entries in this range.
INSERT: Sets a single entry. If the position is not empty then clients MUST move
entries to the left or the right depending on where the closest empty space is.
DELETE: Remove a single entry. Often comes before an INSERT to allow entries to move
places.
INVALIDATE: Remove a range of entries. Clients MAY persist the invalidated range for
offline support, but they should be treated as empty when additional operations
which concern indexes in the range arrive from the server.
"""
SYNC: Final = "SYNC"
INSERT: Final = "INSERT"
DELETE: Final = "DELETE"
INVALIDATE: Final = "INVALIDATE"
@attr.s(slots=True, frozen=True, auto_attribs=True)
class SlidingSyncResult:
"""
The Sliding Sync result to be serialized to JSON for a response.
Attributes:
next_pos: The next position token in the sliding window to request (next_batch).
lists: Sliding window API. A map of list key to list results.
rooms: Room subscription API. A map of room ID to room results.
extensions: Extensions API. A map of extension key to extension results.
"""
@attr.s(slots=True, frozen=True, auto_attribs=True)
class RoomResult:
"""
Attributes:
name: Room name or calculated room name.
avatar: Room avatar
heroes: List of stripped membership events (containing `user_id` and optionally
`avatar_url` and `displayname`) for the users used to calculate the room name.
is_dm: Flag to specify whether the room is a direct-message room (most likely
between two people).
initial: Flag which is set when this is the first time the server is sending this
data on this connection. Clients can use this flag to replace or update
their local state. When there is an update, servers MUST omit this flag
entirely and NOT send "initial":false as this is wasteful on bandwidth. The
absence of this flag means 'false'.
unstable_expanded_timeline: Flag which is set if we're returning more historic
events due to the timeline limit having increased. See "XXX: Odd behavior"
comment ing `synapse.handlers.sliding_sync`.
required_state: The current state of the room
timeline: Latest events in the room. The last event is the most recent.
bundled_aggregations: A mapping of event ID to the bundled aggregations for
the timeline events above. This allows clients to show accurate reaction
counts (or edits, threads), even if some of the reaction events were skipped
over in a gappy sync.
stripped_state: Stripped state events (for rooms where the usre is
invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2,
absent on joined/left rooms
prev_batch: A token that can be passed as a start parameter to the
`/rooms/<room_id>/messages` API to retrieve earlier messages.
limited: True if there are more events than `timeline_limit` looking
backwards from the `response.pos` to the `request.pos`.
num_live: The number of timeline events which have just occurred and are not historical.
The last N events are 'live' and should be treated as such. This is mostly
useful to determine whether a given @mention event should make a noise or not.
Clients cannot rely solely on the absence of `initial: true` to determine live
events because if a room not in the sliding window bumps into the window because
of an @mention it will have `initial: true` yet contain a single live event
(with potentially other old events in the timeline).
bump_stamp: The `stream_ordering` of the last event according to the
`bump_event_types`. This helps clients sort more readily without them
needing to pull in a bunch of the timeline to determine the last activity.
`bump_event_types` is a thing because for example, we don't want display
name changes to mark the room as unread and bump it to the top. For
encrypted rooms, we just have to consider any activity as a bump because we
can't see the content and the client has to figure it out for themselves.
joined_count: The number of users with membership of join, including the client's
own user ID. (same as sync `v2 m.joined_member_count`)
invited_count: The number of users with membership of invite. (same as sync v2
`m.invited_member_count`)
notification_count: The total number of unread notifications for this room. (same
as sync v2)
highlight_count: The number of unread notifications for this room with the highlight
flag set. (same as sync v2)
"""
@attr.s(slots=True, frozen=True, auto_attribs=True)
class StrippedHero:
user_id: str
display_name: Optional[str]
avatar_url: Optional[str]
name: Optional[str]
avatar: Optional[str]
heroes: Optional[List[StrippedHero]]
is_dm: bool
initial: bool
unstable_expanded_timeline: bool
# Should be empty for invite/knock rooms with `stripped_state`
required_state: List[EventBase]
# Should be empty for invite/knock rooms with `stripped_state`
timeline_events: List[EventBase]
bundled_aggregations: Optional[Dict[str, "BundledAggregations"]]
# Optional because it's only relevant to invite/knock rooms
stripped_state: List[JsonDict]
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
prev_batch: Optional[StreamToken]
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
limited: Optional[bool]
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
num_live: Optional[int]
bump_stamp: int
joined_count: int
invited_count: int
notification_count: int
highlight_count: int
def __bool__(self) -> bool:
return (
# If this is the first time the client is seeing the room, we should not filter it out
# under any circumstance.
self.initial
# We need to let the client know if there are any new events
or bool(self.required_state)
or bool(self.timeline_events)
or bool(self.stripped_state)
)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class SlidingWindowList:
"""
Attributes:
count: The total number of entries in the list. Always present if this list
is.
ops: The sliding list operations to perform.
"""
@attr.s(slots=True, frozen=True, auto_attribs=True)
class Operation:
"""
Attributes:
op: The operation type to perform.
range: Which index positions are affected by this operation. These are
both inclusive.
room_ids: Which room IDs are affected by this operation. These IDs match
up to the positions in the `range`, so the last room ID in this list
matches the 9th index. The room data is held in a separate object.
"""
op: OperationType
range: Tuple[int, int]
room_ids: List[str]
count: int
ops: List[Operation]
@attr.s(slots=True, frozen=True, auto_attribs=True)
class Extensions:
"""Responses for extensions
Attributes:
to_device: The to-device extension (MSC3885)
e2ee: The E2EE device extension (MSC3884)
"""
@attr.s(slots=True, frozen=True, auto_attribs=True)
class ToDeviceExtension:
"""The to-device extension (MSC3885)
Attributes:
next_batch: The to-device stream token the client should use
to get more results
events: A list of to-device messages for the client
"""
next_batch: str
events: Sequence[JsonMapping]
def __bool__(self) -> bool:
return bool(self.events)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class E2eeExtension:
"""The E2EE device extension (MSC3884)
Attributes:
device_list_updates: List of user_ids whose devices have changed or left (only
present on incremental syncs).
device_one_time_keys_count: Map from key algorithm to the number of
unclaimed one-time keys currently held on the server for this device. If
an algorithm is unlisted, the count for that algorithm is assumed to be
zero. If this entire parameter is missing, the count for all algorithms
is assumed to be zero.
device_unused_fallback_key_types: List of unused fallback key algorithms
for this device.
"""
# Only present on incremental syncs
device_list_updates: Optional[DeviceListUpdates]
device_one_time_keys_count: Mapping[str, int]
device_unused_fallback_key_types: Sequence[str]
def __bool__(self) -> bool:
# Note that "signed_curve25519" is always returned in key count responses
# regardless of whether we uploaded any keys for it. This is necessary until
# https://github.com/matrix-org/matrix-doc/issues/3298 is fixed.
#
# Also related:
# https://github.com/element-hq/element-android/issues/3725 and
# https://github.com/matrix-org/synapse/issues/10456
default_otk = self.device_one_time_keys_count.get("signed_curve25519")
more_than_default_otk = len(self.device_one_time_keys_count) > 1 or (
default_otk is not None and default_otk > 0
)
return bool(
more_than_default_otk
or self.device_list_updates
or self.device_unused_fallback_key_types
)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class AccountDataExtension:
"""The Account Data extension (MSC3959)
Attributes:
global_account_data_map: Mapping from `type` to `content` of global account
data events.
account_data_by_room_map: Mapping from room_id to mapping of `type` to
`content` of room account data events.
"""
global_account_data_map: Mapping[str, JsonMapping]
account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]]
def __bool__(self) -> bool:
return bool(
self.global_account_data_map or self.account_data_by_room_map
)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class ReceiptsExtension:
"""The Receipts extension (MSC3960)
Attributes:
room_id_to_receipt_map: Mapping from room_id to `m.receipt` ephemeral
event (type, content)
"""
room_id_to_receipt_map: Mapping[str, JsonMapping]
def __bool__(self) -> bool:
return bool(self.room_id_to_receipt_map)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class TypingExtension:
"""The Typing Notification extension (MSC3961)
Attributes:
room_id_to_typing_map: Mapping from room_id to `m.typing` ephemeral
event (type, content)
"""
room_id_to_typing_map: Mapping[str, JsonMapping]
def __bool__(self) -> bool:
return bool(self.room_id_to_typing_map)
to_device: Optional[ToDeviceExtension] = None
e2ee: Optional[E2eeExtension] = None
account_data: Optional[AccountDataExtension] = None
receipts: Optional[ReceiptsExtension] = None
typing: Optional[TypingExtension] = None
def __bool__(self) -> bool:
return bool(
self.to_device
or self.e2ee
or self.account_data
or self.receipts
or self.typing
)
next_pos: SlidingSyncStreamToken
lists: Dict[str, SlidingWindowList]
rooms: Dict[str, RoomResult]
extensions: Extensions
def __bool__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
to tell if the notifier needs to wait for more events when polling for
events.
"""
# We don't include `self.lists` here, as a) `lists` is always non-empty even if
# there are no changes, and b) since we're sorting rooms by `stream_ordering` of
# the latest activity, anything that would cause the order to change would end
# up in `self.rooms` and cause us to send down the change.
return bool(self.rooms or self.extensions)
@staticmethod
def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
"Return a new empty result"
return SlidingSyncResult(
next_pos=next_pos,
lists={},
rooms={},
extensions=SlidingSyncResult.Extensions(),
)

View File

@@ -18,29 +18,393 @@ from collections import ChainMap
from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
Callable,
Dict,
Final,
Generic,
List,
Mapping,
MutableMapping,
Optional,
Sequence,
Set,
Tuple,
TypeVar,
cast,
)
import attr
from synapse._pydantic_compat import HAS_PYDANTIC_V2
from synapse.api.constants import EventTypes
from synapse.types import MultiWriterStreamToken, RoomStreamToken, StrCollection, UserID
from synapse.types.handlers import SlidingSyncConfig
if TYPE_CHECKING or HAS_PYDANTIC_V2:
from pydantic.v1 import Extra
else:
from pydantic import Extra
from synapse.events import EventBase
from synapse.types import (
DeviceListUpdates,
JsonDict,
JsonMapping,
Requester,
SlidingSyncStreamToken,
StreamToken,
)
from synapse.types.rest.client import SlidingSyncBody
if TYPE_CHECKING:
pass
from synapse.handlers.relations import BundledAggregations
logger = logging.getLogger(__name__)
# Sliding Sync: The event types that clients should consider as new activity and affect
# the `bump_stamp`
SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES = {
EventTypes.Create,
EventTypes.Message,
EventTypes.Encrypted,
EventTypes.Sticker,
EventTypes.CallInvite,
EventTypes.PollStart,
EventTypes.LiveLocationShareStart,
}
class SlidingSyncConfig(SlidingSyncBody):
"""
Inherit from `SlidingSyncBody` since we need all of the same fields and add a few
extra fields that we need in the handler
"""
user: UserID
requester: Requester
# Pydantic config
class Config:
# By default, ignore fields that we don't recognise.
extra = Extra.ignore
# By default, don't allow fields to be reassigned after parsing.
allow_mutation = False
# Allow custom types like `UserID` to be used in the model
arbitrary_types_allowed = True
class OperationType(Enum):
"""
Represents the operation types in a Sliding Sync window.
Attributes:
SYNC: Sets a range of entries. Clients SHOULD discard what they previous knew about
entries in this range.
INSERT: Sets a single entry. If the position is not empty then clients MUST move
entries to the left or the right depending on where the closest empty space is.
DELETE: Remove a single entry. Often comes before an INSERT to allow entries to move
places.
INVALIDATE: Remove a range of entries. Clients MAY persist the invalidated range for
offline support, but they should be treated as empty when additional operations
which concern indexes in the range arrive from the server.
"""
SYNC: Final = "SYNC"
INSERT: Final = "INSERT"
DELETE: Final = "DELETE"
INVALIDATE: Final = "INVALIDATE"
@attr.s(slots=True, frozen=True, auto_attribs=True)
class SlidingSyncResult:
"""
The Sliding Sync result to be serialized to JSON for a response.
Attributes:
next_pos: The next position token in the sliding window to request (next_batch).
lists: Sliding window API. A map of list key to list results.
rooms: Room subscription API. A map of room ID to room results.
extensions: Extensions API. A map of extension key to extension results.
"""
@attr.s(slots=True, frozen=True, auto_attribs=True)
class RoomResult:
"""
Attributes:
name: Room name or calculated room name.
avatar: Room avatar
heroes: List of stripped membership events (containing `user_id` and optionally
`avatar_url` and `displayname`) for the users used to calculate the room name.
is_dm: Flag to specify whether the room is a direct-message room (most likely
between two people).
initial: Flag which is set when this is the first time the server is sending this
data on this connection. Clients can use this flag to replace or update
their local state. When there is an update, servers MUST omit this flag
entirely and NOT send "initial":false as this is wasteful on bandwidth. The
absence of this flag means 'false'.
unstable_expanded_timeline: Flag which is set if we're returning more historic
events due to the timeline limit having increased. See "XXX: Odd behavior"
comment ing `synapse.handlers.sliding_sync`.
required_state: The current state of the room
timeline: Latest events in the room. The last event is the most recent.
bundled_aggregations: A mapping of event ID to the bundled aggregations for
the timeline events above. This allows clients to show accurate reaction
counts (or edits, threads), even if some of the reaction events were skipped
over in a gappy sync.
stripped_state: Stripped state events (for rooms where the usre is
invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2,
absent on joined/left rooms
prev_batch: A token that can be passed as a start parameter to the
`/rooms/<room_id>/messages` API to retrieve earlier messages.
limited: True if there are more events than `timeline_limit` looking
backwards from the `response.pos` to the `request.pos`.
num_live: The number of timeline events which have just occurred and are not historical.
The last N events are 'live' and should be treated as such. This is mostly
useful to determine whether a given @mention event should make a noise or not.
Clients cannot rely solely on the absence of `initial: true` to determine live
events because if a room not in the sliding window bumps into the window because
of an @mention it will have `initial: true` yet contain a single live event
(with potentially other old events in the timeline).
bump_stamp: The `stream_ordering` of the last event according to the
`bump_event_types`. This helps clients sort more readily without them
needing to pull in a bunch of the timeline to determine the last activity.
`bump_event_types` is a thing because for example, we don't want display
name changes to mark the room as unread and bump it to the top. For
encrypted rooms, we just have to consider any activity as a bump because we
can't see the content and the client has to figure it out for themselves.
joined_count: The number of users with membership of join, including the client's
own user ID. (same as sync `v2 m.joined_member_count`)
invited_count: The number of users with membership of invite. (same as sync v2
`m.invited_member_count`)
notification_count: The total number of unread notifications for this room. (same
as sync v2)
highlight_count: The number of unread notifications for this room with the highlight
flag set. (same as sync v2)
"""
@attr.s(slots=True, frozen=True, auto_attribs=True)
class StrippedHero:
user_id: str
display_name: Optional[str]
avatar_url: Optional[str]
name: Optional[str]
avatar: Optional[str]
heroes: Optional[List[StrippedHero]]
is_dm: bool
initial: bool
unstable_expanded_timeline: bool
# Should be empty for invite/knock rooms with `stripped_state`
required_state: List[EventBase]
# Should be empty for invite/knock rooms with `stripped_state`
timeline_events: List[EventBase]
bundled_aggregations: Optional[Dict[str, "BundledAggregations"]]
# Optional because it's only relevant to invite/knock rooms
stripped_state: List[JsonDict]
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
prev_batch: Optional[StreamToken]
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
limited: Optional[bool]
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
num_live: Optional[int]
bump_stamp: int
joined_count: int
invited_count: int
notification_count: int
highlight_count: int
def __bool__(self) -> bool:
return (
# If this is the first time the client is seeing the room, we should not filter it out
# under any circumstance.
self.initial
# We need to let the client know if there are any new events
or bool(self.required_state)
or bool(self.timeline_events)
or bool(self.stripped_state)
)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class SlidingWindowList:
"""
Attributes:
count: The total number of entries in the list. Always present if this list
is.
ops: The sliding list operations to perform.
"""
@attr.s(slots=True, frozen=True, auto_attribs=True)
class Operation:
"""
Attributes:
op: The operation type to perform.
range: Which index positions are affected by this operation. These are
both inclusive.
room_ids: Which room IDs are affected by this operation. These IDs match
up to the positions in the `range`, so the last room ID in this list
matches the 9th index. The room data is held in a separate object.
"""
op: OperationType
range: Tuple[int, int]
room_ids: List[str]
count: int
ops: List[Operation]
@attr.s(slots=True, frozen=True, auto_attribs=True)
class Extensions:
"""Responses for extensions
Attributes:
to_device: The to-device extension (MSC3885)
e2ee: The E2EE device extension (MSC3884)
"""
@attr.s(slots=True, frozen=True, auto_attribs=True)
class ToDeviceExtension:
"""The to-device extension (MSC3885)
Attributes:
next_batch: The to-device stream token the client should use
to get more results
events: A list of to-device messages for the client
"""
next_batch: str
events: Sequence[JsonMapping]
def __bool__(self) -> bool:
return bool(self.events)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class E2eeExtension:
"""The E2EE device extension (MSC3884)
Attributes:
device_list_updates: List of user_ids whose devices have changed or left (only
present on incremental syncs).
device_one_time_keys_count: Map from key algorithm to the number of
unclaimed one-time keys currently held on the server for this device. If
an algorithm is unlisted, the count for that algorithm is assumed to be
zero. If this entire parameter is missing, the count for all algorithms
is assumed to be zero.
device_unused_fallback_key_types: List of unused fallback key algorithms
for this device.
"""
# Only present on incremental syncs
device_list_updates: Optional[DeviceListUpdates]
device_one_time_keys_count: Mapping[str, int]
device_unused_fallback_key_types: Sequence[str]
def __bool__(self) -> bool:
# Note that "signed_curve25519" is always returned in key count responses
# regardless of whether we uploaded any keys for it. This is necessary until
# https://github.com/matrix-org/matrix-doc/issues/3298 is fixed.
#
# Also related:
# https://github.com/element-hq/element-android/issues/3725 and
# https://github.com/matrix-org/synapse/issues/10456
default_otk = self.device_one_time_keys_count.get("signed_curve25519")
more_than_default_otk = len(self.device_one_time_keys_count) > 1 or (
default_otk is not None and default_otk > 0
)
return bool(
more_than_default_otk
or self.device_list_updates
or self.device_unused_fallback_key_types
)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class AccountDataExtension:
"""The Account Data extension (MSC3959)
Attributes:
global_account_data_map: Mapping from `type` to `content` of global account
data events.
account_data_by_room_map: Mapping from room_id to mapping of `type` to
`content` of room account data events.
"""
global_account_data_map: Mapping[str, JsonMapping]
account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]]
def __bool__(self) -> bool:
return bool(
self.global_account_data_map or self.account_data_by_room_map
)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class ReceiptsExtension:
"""The Receipts extension (MSC3960)
Attributes:
room_id_to_receipt_map: Mapping from room_id to `m.receipt` ephemeral
event (type, content)
"""
room_id_to_receipt_map: Mapping[str, JsonMapping]
def __bool__(self) -> bool:
return bool(self.room_id_to_receipt_map)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class TypingExtension:
"""The Typing Notification extension (MSC3961)
Attributes:
room_id_to_typing_map: Mapping from room_id to `m.typing` ephemeral
event (type, content)
"""
room_id_to_typing_map: Mapping[str, JsonMapping]
def __bool__(self) -> bool:
return bool(self.room_id_to_typing_map)
to_device: Optional[ToDeviceExtension] = None
e2ee: Optional[E2eeExtension] = None
account_data: Optional[AccountDataExtension] = None
receipts: Optional[ReceiptsExtension] = None
typing: Optional[TypingExtension] = None
def __bool__(self) -> bool:
return bool(
self.to_device
or self.e2ee
or self.account_data
or self.receipts
or self.typing
)
next_pos: SlidingSyncStreamToken
lists: Dict[str, SlidingWindowList]
rooms: Dict[str, RoomResult]
extensions: Extensions
def __bool__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
to tell if the notifier needs to wait for more events when polling for
events.
"""
# We don't include `self.lists` here, as a) `lists` is always non-empty even if
# there are no changes, and b) since we're sorting rooms by `stream_ordering` of
# the latest activity, anything that would cause the order to change would end
# up in `self.rooms` and cause us to send down the change.
return bool(self.rooms or self.extensions)
@staticmethod
def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
"Return a new empty result"
return SlidingSyncResult(
next_pos=next_pos,
lists={},
rooms={},
extensions=SlidingSyncResult.Extensions(),
)
class StateValues:
"""
@@ -60,7 +424,7 @@ class StateValues:
# We can't freeze this class because we want to update it in place with the
# de-duplicated data.
@attr.s(slots=True, auto_attribs=True)
@attr.s(slots=True, auto_attribs=True, frozen=True)
class RoomSyncConfig:
"""
Holds the config for what data we should fetch for a room in the sync response.
@@ -74,7 +438,7 @@ class RoomSyncConfig:
"""
timeline_limit: int
required_state_map: Dict[str, Set[str]]
required_state_map: Mapping[str, AbstractSet[str]]
@classmethod
def from_room_config(
@@ -148,7 +512,7 @@ class RoomSyncConfig:
def deep_copy(self) -> "RoomSyncConfig":
required_state_map: Dict[str, Set[str]] = {
state_type: state_key_set.copy()
state_type: set(state_key_set)
for state_type, state_key_set in self.required_state_map.items()
}
@@ -159,14 +523,20 @@ class RoomSyncConfig:
def combine_room_sync_config(
self, other_room_sync_config: "RoomSyncConfig"
) -> None:
) -> "RoomSyncConfig":
"""
Combine this `RoomSyncConfig` with another `RoomSyncConfig` and take the
Combine this `RoomSyncConfig` with another `RoomSyncConfig` and return the
superset union of the two.
"""
timeline_limit = self.timeline_limit
required_state_map = {
event_type: set(state_keys)
for event_type, state_keys in self.required_state_map.items()
}
# Take the highest timeline limit
if self.timeline_limit < other_room_sync_config.timeline_limit:
self.timeline_limit = other_room_sync_config.timeline_limit
timeline_limit = other_room_sync_config.timeline_limit
# Union the required state
for (
@@ -175,14 +545,14 @@ class RoomSyncConfig:
) in other_room_sync_config.required_state_map.items():
# If we already have a wildcard for everything, we don't need to add
# anything else
if StateValues.WILDCARD in self.required_state_map.get(
if StateValues.WILDCARD in required_state_map.get(
StateValues.WILDCARD, set()
):
break
# If we already have a wildcard `state_key` for this `state_type`, we don't need
# to add anything else
if StateValues.WILDCARD in self.required_state_map.get(state_type, set()):
if StateValues.WILDCARD in required_state_map.get(state_type, set()):
continue
# If we're getting wildcards for the `state_type` and `state_key`, that's
@@ -191,16 +561,14 @@ class RoomSyncConfig:
state_type == StateValues.WILDCARD
and StateValues.WILDCARD in state_key_set
):
self.required_state_map = {state_type: {StateValues.WILDCARD}}
required_state_map = {state_type: {StateValues.WILDCARD}}
# We can break, since we don't need to add anything else
break
for state_key in state_key_set:
# If we already have a wildcard for this specific `state_key`, we don't need
# to add it since the wildcard already covers it.
if state_key in self.required_state_map.get(
StateValues.WILDCARD, set()
):
if state_key in required_state_map.get(StateValues.WILDCARD, set()):
continue
# If we're getting a wildcard for the `state_type`, get rid of any other
@@ -211,7 +579,7 @@ class RoomSyncConfig:
# Make a copy so we don't run into an error: `dictionary changed size
# during iteration`, when we remove items
for existing_state_type, existing_state_key_set in list(
self.required_state_map.items()
required_state_map.items()
):
# Make a copy so we don't run into an error: `Set changed size during
# iteration`, when we filter out and remove items
@@ -221,19 +589,21 @@ class RoomSyncConfig:
# If we've the left the `set()` empty, remove it from the map
if existing_state_key_set == set():
self.required_state_map.pop(existing_state_type, None)
required_state_map.pop(existing_state_type, None)
# If we're getting a wildcard `state_key`, get rid of any other state_keys
# for this `state_type` since the wildcard will cover it already.
if state_key == StateValues.WILDCARD:
self.required_state_map[state_type] = {state_key}
required_state_map[state_type] = {state_key}
break
# Otherwise, just add it to the set
else:
if self.required_state_map.get(state_type) is None:
self.required_state_map[state_type] = {state_key}
if required_state_map.get(state_type) is None:
required_state_map[state_type] = {state_key}
else:
self.required_state_map[state_type].add(state_key)
required_state_map[state_type].add(state_key)
return RoomSyncConfig(timeline_limit, required_state_map)
def must_await_full_state(
self,
@@ -324,7 +694,7 @@ class HaveSentRoomFlag(Enum):
LIVE = "live"
T = TypeVar("T")
T = TypeVar("T", str, RoomStreamToken, MultiWriterStreamToken)
@attr.s(auto_attribs=True, slots=True, frozen=True)
@@ -383,6 +753,9 @@ class RoomStatusMap(Generic[T]):
return RoomStatusMap(statuses=dict(self._statuses))
def __len__(self) -> int:
return len(self._statuses)
class MutableRoomStatusMap(RoomStatusMap[T]):
"""A mutable version of `RoomStatusMap`"""
@@ -439,7 +812,7 @@ class MutableRoomStatusMap(RoomStatusMap[T]):
self._statuses[room_id] = HaveSentRoom.previously(from_token)
@attr.s(auto_attribs=True)
@attr.s(auto_attribs=True, frozen=True)
class PerConnectionState:
"""The per-connection state. A snapshot of what we've sent down the
connection before.
@@ -467,14 +840,18 @@ class PerConnectionState:
room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict)
list_to_rooms: Mapping[str, AbstractSet[str]] = attr.Factory(dict)
def get_mutable(self) -> "MutablePerConnectionState":
"""Get a mutable copy of this state."""
room_configs = cast(MutableMapping[str, RoomSyncConfig], self.room_configs)
list_to_rooms = cast(MutableMapping[str, Set[str]], self.list_to_rooms)
return MutablePerConnectionState(
rooms=self.rooms.get_mutable(),
receipts=self.receipts.get_mutable(),
room_configs=ChainMap({}, room_configs),
list_to_rooms=ChainMap({}, list_to_rooms),
)
def copy(self) -> "PerConnectionState":
@@ -482,6 +859,15 @@ class PerConnectionState:
rooms=self.rooms.copy(),
receipts=self.receipts.copy(),
room_configs=dict(self.room_configs),
list_to_rooms=dict(self.list_to_rooms),
)
def __len__(self) -> int:
return (
len(self.rooms)
+ len(self.receipts)
+ len(self.room_configs)
+ len(self.list_to_rooms)
)
@@ -494,13 +880,20 @@ class MutablePerConnectionState(PerConnectionState):
room_configs: typing.ChainMap[str, RoomSyncConfig]
list_to_rooms: typing.ChainMap[str, Set[str]]
def has_updates(self) -> bool:
return (
bool(self.rooms.get_updates())
or bool(self.receipts.get_updates())
or bool(self.get_room_config_updates())
or bool(self.list_to_rooms.maps[0])
)
def get_room_config_updates(self) -> Mapping[str, RoomSyncConfig]:
"""Get updates to the room sync config"""
return self.room_configs.maps[0]
def get_list_to_rooms_updates(self) -> Mapping[str, StrCollection]:
"""Get updates to the `list_to_rooms`"""
return self.list_to_rooms.maps[0]

View File

@@ -18,7 +18,6 @@
#
#
import logging
from copy import deepcopy
from typing import Dict, List, Optional
from unittest.mock import patch
@@ -47,7 +46,7 @@ from synapse.rest.client import knock, login, room
from synapse.server import HomeServer
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import JsonDict, StreamToken, UserID
from synapse.types.handlers import SlidingSyncConfig
from synapse.types.handlers.sliding_sync import SlidingSyncConfig
from synapse.util import Clock
from tests.replication._base import BaseMultiWorkerStreamTestCase
@@ -566,23 +565,11 @@ class RoomSyncConfigTestCase(TestCase):
"""
Combine A into B and B into A to make sure we get the same result.
"""
# Since we're mutating these in place, make a copy for each of our trials
room_sync_config_a = deepcopy(a)
room_sync_config_b = deepcopy(b)
combined_config = a.combine_room_sync_config(b)
self._assert_room_config_equal(combined_config, expected, "B into A")
# Combine B into A
room_sync_config_a.combine_room_sync_config(room_sync_config_b)
self._assert_room_config_equal(room_sync_config_a, expected, "B into A")
# Since we're mutating these in place, make a copy for each of our trials
room_sync_config_a = deepcopy(a)
room_sync_config_b = deepcopy(b)
# Combine A into B
room_sync_config_b.combine_room_sync_config(room_sync_config_a)
self._assert_room_config_equal(room_sync_config_b, expected, "A into B")
combined_config = a.combine_room_sync_config(b)
self._assert_room_config_equal(combined_config, expected, "A into B")
class GetRoomMembershipForUserAtToTokenTestCase(HomeserverTestCase):

View File

@@ -16,7 +16,7 @@ import logging
from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
from synapse.api.constants import EventTypes, Membership
from synapse.api.constants import EventContentFields, EventTypes, Membership
from synapse.api.room_versions import RoomVersions
from synapse.rest.client import login, room, sync
from synapse.server import HomeServer
@@ -44,6 +44,10 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.storage_controllers = hs.get_storage_controllers()
self.state_handler = self.hs.get_state_handler()
persistence = self.hs.get_storage_controllers().persistence
assert persistence is not None
self.persistence = persistence
def test_rooms_meta_when_joined(self) -> None:
"""
@@ -600,16 +604,16 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
Test that `bump_stamp` ignores backfilled events, i.e. events with a
negative stream ordering.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
# Create a remote room
creator = "@user:other"
room_id = "!foo:other"
room_version = RoomVersions.V10
shared_kwargs = {
"room_id": room_id,
"room_version": "10",
"room_version": room_version.identifier,
}
create_tuple = self.get_success(
@@ -618,6 +622,12 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
prev_event_ids=[],
type=EventTypes.Create,
state_key="",
content={
# The `ROOM_CREATOR` field could be removed if we used a room
# version > 10 (in favor of relying on `sender`)
EventContentFields.ROOM_CREATOR: creator,
EventContentFields.ROOM_VERSION: room_version.identifier,
},
sender=creator,
**shared_kwargs,
)
@@ -667,22 +677,29 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
]
# Ensure the local HS knows the room version
self.get_success(
self.store.store_room(room_id, creator, False, RoomVersions.V10)
)
self.get_success(self.store.store_room(room_id, creator, False, room_version))
# Persist these events as backfilled events.
persistence = self.hs.get_storage_controllers().persistence
assert persistence is not None
for event, context in remote_events_and_contexts:
self.get_success(persistence.persist_event(event, context, backfilled=True))
self.get_success(
self.persistence.persist_event(event, context, backfilled=True)
)
# Now we join the local user to the room
join_tuple = self.get_success(
# Now we join the local user to the room. We want to make this feel as close to
# the real `process_remote_join()` as possible but we'd like to avoid some of
# the auth checks that would be done in the real code.
#
# FIXME: The test was originally written using this less-real
# `persist_event(...)` shortcut but it would be nice to use the real remote join
# process in a `FederatingHomeserverTestCase`.
flawed_join_tuple = self.get_success(
create_event(
self.hs,
prev_event_ids=[invite_tuple[0].event_id],
# This doesn't work correctly to create an `EventContext` that includes
# both of these state events. I assume it's because we're working on our
# local homeserver which has the remote state set as `outlier`. We have
# to create our own EventContext below to get this right.
auth_event_ids=[create_tuple[0].event_id, invite_tuple[0].event_id],
type=EventTypes.Member,
state_key=user1_id,
@@ -691,7 +708,22 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
**shared_kwargs,
)
)
self.get_success(persistence.persist_event(*join_tuple))
# We have to create our own context to get the state set correctly. If we use
# the `EventContext` from the `flawed_join_tuple`, the `current_state_events`
# table will only have the join event in it which should never happen in our
# real server.
join_event = flawed_join_tuple[0]
join_context = self.get_success(
self.state_handler.compute_event_context(
join_event,
state_ids_before_event={
(e.type, e.state_key): e.event_id
for e in [create_tuple[0], invite_tuple[0]]
},
partial_state=False,
)
)
self.get_success(self.persistence.persist_event(join_event, join_context))
# Doing an SS request should return a positive `bump_stamp`, even though
# the only event that matches the bump types has as negative stream

View File

@@ -191,8 +191,14 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
}
_, from_token = self.do_sync(sync_body, tok=user1_tok)
# Reset the in-memory cache
self.hs.get_sliding_sync_handler().connection_store._connections.clear()
# Reset the positions
self.get_success(
self.store.db_pool.simple_delete(
table="sliding_sync_connections",
keyvalues={"user_id": user1_id},
desc="clear_cache",
)
)
# Make the Sliding Sync request
channel = self.make_request(

View File

@@ -112,6 +112,24 @@ class UpdateUpsertManyTests(unittest.HomeserverTestCase):
{(1, "user1", "hello"), (2, "user2", "bleb")},
)
self.get_success(
self.storage.db_pool.runInteraction(
"test",
self.storage.db_pool.simple_upsert_many_txn,
self.table_name,
key_names=key_names,
key_values=[[2, "user2"]],
value_names=[],
value_values=[],
)
)
# Check results are what we expect
self.assertEqual(
set(self._dump_table_to_tuple()),
{(1, "user1", "hello"), (2, "user2", "bleb")},
)
def test_simple_update_many(self) -> None:
"""
simple_update_many performs many updates at once.

View File

@@ -19,6 +19,7 @@
#
#
import logging
from typing import List, Optional
from twisted.test.proto_helpers import MemoryReactor
@@ -35,6 +36,8 @@ from synapse.util import Clock
from tests.unittest import HomeserverTestCase
logger = logging.getLogger(__name__)
class ExtremPruneTestCase(HomeserverTestCase):
servlets = [

View File

@@ -24,7 +24,7 @@ from typing import List, Optional, Tuple, cast
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.api.constants import EventContentFields, EventTypes, JoinRules, Membership
from synapse.api.room_versions import RoomVersions
from synapse.rest import admin
from synapse.rest.admin import register_servlets_for_client_rest_resource
@@ -38,6 +38,7 @@ from synapse.util import Clock
from tests import unittest
from tests.server import TestHomeServer
from tests.test_utils import event_injection
from tests.test_utils.event_injection import create_event
from tests.unittest import skip_unless
logger = logging.getLogger(__name__)
@@ -54,6 +55,10 @@ class RoomMemberStoreTestCase(unittest.HomeserverTestCase):
# We can't test the RoomMemberStore on its own without the other event
# storage logic
self.store = hs.get_datastores().main
self.state_handler = self.hs.get_state_handler()
persistence = self.hs.get_storage_controllers().persistence
assert persistence is not None
self.persistence = persistence
self.u_alice = self.register_user("alice", "pass")
self.t_alice = self.login("alice", "pass")
@@ -220,31 +225,166 @@ class RoomMemberStoreTestCase(unittest.HomeserverTestCase):
)
def test_join_locally_forgotten_room(self) -> None:
"""Tests if a user joins a forgotten room the room is not forgotten anymore."""
self.room = self.helper.create_room_as(self.u_alice, tok=self.t_alice)
self.assertFalse(
self.get_success(self.store.is_locally_forgotten_room(self.room))
)
"""
Tests if a user joins a forgotten room, the room is not forgotten anymore.
# after leaving and forget the room, it is forgotten
self.get_success(
event_injection.inject_member_event(
self.hs, self.room, self.u_alice, "leave"
Since a room can't be re-joined if everyone has left. This can only happen with
a room with remote users in it.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
# Create a remote room
creator = "@user:other"
room_id = "!foo:other"
room_version = RoomVersions.V10
shared_kwargs = {
"room_id": room_id,
"room_version": room_version.identifier,
}
create_tuple = self.get_success(
create_event(
self.hs,
prev_event_ids=[],
type=EventTypes.Create,
state_key="",
content={
# The `ROOM_CREATOR` field could be removed if we used a room
# version > 10 (in favor of relying on `sender`)
EventContentFields.ROOM_CREATOR: creator,
EventContentFields.ROOM_VERSION: room_version.identifier,
},
sender=creator,
**shared_kwargs,
)
)
self.get_success(self.store.forget(self.u_alice, self.room))
self.assertTrue(
self.get_success(self.store.is_locally_forgotten_room(self.room))
)
# after rejoin the room is not forgotten anymore
self.get_success(
event_injection.inject_member_event(
self.hs, self.room, self.u_alice, "join"
creator_tuple = self.get_success(
create_event(
self.hs,
prev_event_ids=[create_tuple[0].event_id],
auth_event_ids=[create_tuple[0].event_id],
type=EventTypes.Member,
state_key=creator,
content={"membership": Membership.JOIN},
sender=creator,
**shared_kwargs,
)
)
remote_events_and_contexts = [
create_tuple,
creator_tuple,
]
# Ensure the local HS knows the room version
self.get_success(self.store.store_room(room_id, creator, False, room_version))
# Persist these events as backfilled events.
for event, context in remote_events_and_contexts:
self.get_success(
self.persistence.persist_event(event, context, backfilled=True)
)
# Now we join the local user to the room. We want to make this feel as close to
# the real `process_remote_join()` as possible but we'd like to avoid some of
# the auth checks that would be done in the real code.
#
# FIXME: The test was originally written using this less-real
# `persist_event(...)` shortcut but it would be nice to use the real remote join
# process in a `FederatingHomeserverTestCase`.
flawed_join_tuple = self.get_success(
create_event(
self.hs,
prev_event_ids=[creator_tuple[0].event_id],
# This doesn't work correctly to create an `EventContext` that includes
# both of these state events. I assume it's because we're working on our
# local homeserver which has the remote state set as `outlier`. We have
# to create our own EventContext below to get this right.
auth_event_ids=[create_tuple[0].event_id],
type=EventTypes.Member,
state_key=user1_id,
content={"membership": Membership.JOIN},
sender=user1_id,
**shared_kwargs,
)
)
# We have to create our own context to get the state set correctly. If we use
# the `EventContext` from the `flawed_join_tuple`, the `current_state_events`
# table will only have the join event in it which should never happen in our
# real server.
join_event = flawed_join_tuple[0]
join_context = self.get_success(
self.state_handler.compute_event_context(
join_event,
state_ids_before_event={
(e.type, e.state_key): e.event_id for e in [create_tuple[0]]
},
partial_state=False,
)
)
self.get_success(self.persistence.persist_event(join_event, join_context))
# The room shouldn't be forgotten because the local user just joined
self.assertFalse(
self.get_success(self.store.is_locally_forgotten_room(self.room))
self.get_success(self.store.is_locally_forgotten_room(room_id))
)
# After all of the local users (there is only user1) leave and forgetting the
# room, it is forgotten
user1_leave_response = self.helper.leave(room_id, user1_id, tok=user1_tok)
user1_leave_event = self.get_success(
self.store.get_event(user1_leave_response["event_id"])
)
self.get_success(self.store.forget(user1_id, room_id))
self.assertTrue(self.get_success(self.store.is_locally_forgotten_room(room_id)))
# Join the local user to the room (again). We want to make this feel as close to
# the real `process_remote_join()` as possible but we'd like to avoid some of
# the auth checks that would be done in the real code.
#
# FIXME: The test was originally written using this less-real
# `event_injection.inject_member_event(...)` shortcut but it would be nice to
# use the real remote join process in a `FederatingHomeserverTestCase`.
flawed_join_tuple = self.get_success(
create_event(
self.hs,
prev_event_ids=[user1_leave_response["event_id"]],
# This doesn't work correctly to create an `EventContext` that includes
# both of these state events. I assume it's because we're working on our
# local homeserver which has the remote state set as `outlier`. We have
# to create our own EventContext below to get this right.
auth_event_ids=[
create_tuple[0].event_id,
user1_leave_response["event_id"],
],
type=EventTypes.Member,
state_key=user1_id,
content={"membership": Membership.JOIN},
sender=user1_id,
**shared_kwargs,
)
)
# We have to create our own context to get the state set correctly. If we use
# the `EventContext` from the `flawed_join_tuple`, the `current_state_events`
# table will only have the join event in it which should never happen in our
# real server.
join_event = flawed_join_tuple[0]
join_context = self.get_success(
self.state_handler.compute_event_context(
join_event,
state_ids_before_event={
(e.type, e.state_key): e.event_id
for e in [create_tuple[0], user1_leave_event]
},
partial_state=False,
)
)
self.get_success(self.persistence.persist_event(join_event, join_context))
# After the local user rejoins the remote room, it isn't forgotten anymore
self.assertFalse(
self.get_success(self.store.is_locally_forgotten_room(room_id))
)

File diff suppressed because it is too large Load Diff

View File

@@ -272,8 +272,8 @@ class TestCase(unittest.TestCase):
def assertIncludes(
self,
actual_items: AbstractSet[str],
expected_items: AbstractSet[str],
actual_items: AbstractSet[TV],
expected_items: AbstractSet[TV],
exact: bool = False,
message: Optional[str] = None,
) -> None: