mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-09 01:30:18 +00:00
Compare commits
5 Commits
erikj/ss_h
...
s7evink/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3306a02e70 | ||
|
|
61d24a34c0 | ||
|
|
f7bca60920 | ||
|
|
9a3cb5a245 | ||
|
|
208630bbb0 |
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -485,18 +485,18 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.208"
|
||||
version = "1.0.206"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cff085d2cb684faa248efb494c39b68e522822ac0de72ccf08109abde717cfb2"
|
||||
checksum = "5b3e4cd94123dd520a128bcd11e34d9e9e423e7e3e50425cb1b4b1e3549d0284"
|
||||
dependencies = [
|
||||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.208"
|
||||
version = "1.0.206"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "24008e81ff7613ed8e5ba0cfaf24e2c2f1e5b8a0495711e44fcd4882fca62bcf"
|
||||
checksum = "fabfb6138d2383ea8208cf98ccf69cdfb1aff4088460681d84189aa259762f97"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -505,9 +505,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde_json"
|
||||
version = "1.0.125"
|
||||
version = "1.0.124"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "83c8e735a073ccf5be70aa8066aa984eaf2fa000db6c8d0100ae605b366d31ed"
|
||||
checksum = "66ad62847a56b3dba58cc891acd13884b9c61138d330c0d7b6181713d4fce38d"
|
||||
dependencies = [
|
||||
"itoa",
|
||||
"memchr",
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.
|
||||
1
changelog.d/17543.bugfix
Normal file
1
changelog.d/17543.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix authenticated media responses using a wrong limit when following redirects over federation.
|
||||
@@ -1 +0,0 @@
|
||||
Store sliding sync per-connection state in the database.
|
||||
@@ -1 +0,0 @@
|
||||
Make the sliding sync `PerConnectionState` class immutable.
|
||||
93
poetry.lock
generated
93
poetry.lock
generated
@@ -403,38 +403,43 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "cryptography"
|
||||
version = "43.0.0"
|
||||
version = "42.0.8"
|
||||
description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers."
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
{file = "cryptography-43.0.0-cp37-abi3-macosx_10_9_universal2.whl", hash = "sha256:64c3f16e2a4fc51c0d06af28441881f98c5d91009b8caaff40cf3548089e9c74"},
|
||||
{file = "cryptography-43.0.0-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3dcdedae5c7710b9f97ac6bba7e1052b95c7083c9d0e9df96e02a1932e777895"},
|
||||
{file = "cryptography-43.0.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3d9a1eca329405219b605fac09ecfc09ac09e595d6def650a437523fcd08dd22"},
|
||||
{file = "cryptography-43.0.0-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:ea9e57f8ea880eeea38ab5abf9fbe39f923544d7884228ec67d666abd60f5a47"},
|
||||
{file = "cryptography-43.0.0-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:9a8d6802e0825767476f62aafed40532bd435e8a5f7d23bd8b4f5fd04cc80ecf"},
|
||||
{file = "cryptography-43.0.0-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:cc70b4b581f28d0a254d006f26949245e3657d40d8857066c2ae22a61222ef55"},
|
||||
{file = "cryptography-43.0.0-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:4a997df8c1c2aae1e1e5ac49c2e4f610ad037fc5a3aadc7b64e39dea42249431"},
|
||||
{file = "cryptography-43.0.0-cp37-abi3-win32.whl", hash = "sha256:6e2b11c55d260d03a8cf29ac9b5e0608d35f08077d8c087be96287f43af3ccdc"},
|
||||
{file = "cryptography-43.0.0-cp37-abi3-win_amd64.whl", hash = "sha256:31e44a986ceccec3d0498e16f3d27b2ee5fdf69ce2ab89b52eaad1d2f33d8778"},
|
||||
{file = "cryptography-43.0.0-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:7b3f5fe74a5ca32d4d0f302ffe6680fcc5c28f8ef0dc0ae8f40c0f3a1b4fca66"},
|
||||
{file = "cryptography-43.0.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ac1955ce000cb29ab40def14fd1bbfa7af2017cca696ee696925615cafd0dce5"},
|
||||
{file = "cryptography-43.0.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:299d3da8e00b7e2b54bb02ef58d73cd5f55fb31f33ebbf33bd00d9aa6807df7e"},
|
||||
{file = "cryptography-43.0.0-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:ee0c405832ade84d4de74b9029bedb7b31200600fa524d218fc29bfa371e97f5"},
|
||||
{file = "cryptography-43.0.0-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:cb013933d4c127349b3948aa8aaf2f12c0353ad0eccd715ca789c8a0f671646f"},
|
||||
{file = "cryptography-43.0.0-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:fdcb265de28585de5b859ae13e3846a8e805268a823a12a4da2597f1f5afc9f0"},
|
||||
{file = "cryptography-43.0.0-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:2905ccf93a8a2a416f3ec01b1a7911c3fe4073ef35640e7ee5296754e30b762b"},
|
||||
{file = "cryptography-43.0.0-cp39-abi3-win32.whl", hash = "sha256:47ca71115e545954e6c1d207dd13461ab81f4eccfcb1345eac874828b5e3eaaf"},
|
||||
{file = "cryptography-43.0.0-cp39-abi3-win_amd64.whl", hash = "sha256:0663585d02f76929792470451a5ba64424acc3cd5227b03921dab0e2f27b1709"},
|
||||
{file = "cryptography-43.0.0-pp310-pypy310_pp73-macosx_10_9_x86_64.whl", hash = "sha256:2c6d112bf61c5ef44042c253e4859b3cbbb50df2f78fa8fae6747a7814484a70"},
|
||||
{file = "cryptography-43.0.0-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:844b6d608374e7d08f4f6e6f9f7b951f9256db41421917dfb2d003dde4cd6b66"},
|
||||
{file = "cryptography-43.0.0-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:51956cf8730665e2bdf8ddb8da0056f699c1a5715648c1b0144670c1ba00b48f"},
|
||||
{file = "cryptography-43.0.0-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:aae4d918f6b180a8ab8bf6511a419473d107df4dbb4225c7b48c5c9602c38c7f"},
|
||||
{file = "cryptography-43.0.0-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:232ce02943a579095a339ac4b390fbbe97f5b5d5d107f8a08260ea2768be8cc2"},
|
||||
{file = "cryptography-43.0.0-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:5bcb8a5620008a8034d39bce21dc3e23735dfdb6a33a06974739bfa04f853947"},
|
||||
{file = "cryptography-43.0.0-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:08a24a7070b2b6804c1940ff0f910ff728932a9d0e80e7814234269f9d46d069"},
|
||||
{file = "cryptography-43.0.0-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:e9c5266c432a1e23738d178e51c2c7a5e2ddf790f248be939448c0ba2021f9d1"},
|
||||
{file = "cryptography-43.0.0.tar.gz", hash = "sha256:b88075ada2d51aa9f18283532c9f60e72170041bba88d7f37e49cbb10275299e"},
|
||||
{file = "cryptography-42.0.8-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:81d8a521705787afe7a18d5bfb47ea9d9cc068206270aad0b96a725022e18d2e"},
|
||||
{file = "cryptography-42.0.8-cp37-abi3-macosx_10_12_x86_64.whl", hash = "sha256:961e61cefdcb06e0c6d7e3a1b22ebe8b996eb2bf50614e89384be54c48c6b63d"},
|
||||
{file = "cryptography-42.0.8-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e3ec3672626e1b9e55afd0df6d774ff0e953452886e06e0f1eb7eb0c832e8902"},
|
||||
{file = "cryptography-42.0.8-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e599b53fd95357d92304510fb7bda8523ed1f79ca98dce2f43c115950aa78801"},
|
||||
{file = "cryptography-42.0.8-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:5226d5d21ab681f432a9c1cf8b658c0cb02533eece706b155e5fbd8a0cdd3949"},
|
||||
{file = "cryptography-42.0.8-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:6b7c4f03ce01afd3b76cf69a5455caa9cfa3de8c8f493e0d3ab7d20611c8dae9"},
|
||||
{file = "cryptography-42.0.8-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:2346b911eb349ab547076f47f2e035fc8ff2c02380a7cbbf8d87114fa0f1c583"},
|
||||
{file = "cryptography-42.0.8-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:ad803773e9df0b92e0a817d22fd8a3675493f690b96130a5e24f1b8fabbea9c7"},
|
||||
{file = "cryptography-42.0.8-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:2f66d9cd9147ee495a8374a45ca445819f8929a3efcd2e3df6428e46c3cbb10b"},
|
||||
{file = "cryptography-42.0.8-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:d45b940883a03e19e944456a558b67a41160e367a719833c53de6911cabba2b7"},
|
||||
{file = "cryptography-42.0.8-cp37-abi3-win32.whl", hash = "sha256:a0c5b2b0585b6af82d7e385f55a8bc568abff8923af147ee3c07bd8b42cda8b2"},
|
||||
{file = "cryptography-42.0.8-cp37-abi3-win_amd64.whl", hash = "sha256:57080dee41209e556a9a4ce60d229244f7a66ef52750f813bfbe18959770cfba"},
|
||||
{file = "cryptography-42.0.8-cp39-abi3-macosx_10_12_universal2.whl", hash = "sha256:dea567d1b0e8bc5764b9443858b673b734100c2871dc93163f58c46a97a83d28"},
|
||||
{file = "cryptography-42.0.8-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c4783183f7cb757b73b2ae9aed6599b96338eb957233c58ca8f49a49cc32fd5e"},
|
||||
{file = "cryptography-42.0.8-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a0608251135d0e03111152e41f0cc2392d1e74e35703960d4190b2e0f4ca9c70"},
|
||||
{file = "cryptography-42.0.8-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:dc0fdf6787f37b1c6b08e6dfc892d9d068b5bdb671198c72072828b80bd5fe4c"},
|
||||
{file = "cryptography-42.0.8-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:9c0c1716c8447ee7dbf08d6db2e5c41c688544c61074b54fc4564196f55c25a7"},
|
||||
{file = "cryptography-42.0.8-cp39-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:fff12c88a672ab9c9c1cf7b0c80e3ad9e2ebd9d828d955c126be4fd3e5578c9e"},
|
||||
{file = "cryptography-42.0.8-cp39-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:cafb92b2bc622cd1aa6a1dce4b93307792633f4c5fe1f46c6b97cf67073ec961"},
|
||||
{file = "cryptography-42.0.8-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:31f721658a29331f895a5a54e7e82075554ccfb8b163a18719d342f5ffe5ecb1"},
|
||||
{file = "cryptography-42.0.8-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:b297f90c5723d04bcc8265fc2a0f86d4ea2e0f7ab4b6994459548d3a6b992a14"},
|
||||
{file = "cryptography-42.0.8-cp39-abi3-win32.whl", hash = "sha256:2f88d197e66c65be5e42cd72e5c18afbfae3f741742070e3019ac8f4ac57262c"},
|
||||
{file = "cryptography-42.0.8-cp39-abi3-win_amd64.whl", hash = "sha256:fa76fbb7596cc5839320000cdd5d0955313696d9511debab7ee7278fc8b5c84a"},
|
||||
{file = "cryptography-42.0.8-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:ba4f0a211697362e89ad822e667d8d340b4d8d55fae72cdd619389fb5912eefe"},
|
||||
{file = "cryptography-42.0.8-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:81884c4d096c272f00aeb1f11cf62ccd39763581645b0812e99a91505fa48e0c"},
|
||||
{file = "cryptography-42.0.8-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:c9bb2ae11bfbab395bdd072985abde58ea9860ed84e59dbc0463a5d0159f5b71"},
|
||||
{file = "cryptography-42.0.8-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:7016f837e15b0a1c119d27ecd89b3515f01f90a8615ed5e9427e30d9cdbfed3d"},
|
||||
{file = "cryptography-42.0.8-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:5a94eccb2a81a309806027e1670a358b99b8fe8bfe9f8d329f27d72c094dde8c"},
|
||||
{file = "cryptography-42.0.8-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:dec9b018df185f08483f294cae6ccac29e7a6e0678996587363dc352dc65c842"},
|
||||
{file = "cryptography-42.0.8-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:343728aac38decfdeecf55ecab3264b015be68fc2816ca800db649607aeee648"},
|
||||
{file = "cryptography-42.0.8-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:013629ae70b40af70c9a7a5db40abe5d9054e6f4380e50ce769947b73bf3caad"},
|
||||
{file = "cryptography-42.0.8.tar.gz", hash = "sha256:8d09d05439ce7baa8e9e95b07ec5b6c886f548deb7e0f69ef25f64b3bce842f2"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@@ -447,7 +452,7 @@ nox = ["nox"]
|
||||
pep8test = ["check-sdist", "click", "mypy", "ruff"]
|
||||
sdist = ["build"]
|
||||
ssh = ["bcrypt (>=3.1.5)"]
|
||||
test = ["certifi", "cryptography-vectors (==43.0.0)", "pretend", "pytest (>=6.2.0)", "pytest-benchmark", "pytest-cov", "pytest-xdist"]
|
||||
test = ["certifi", "pretend", "pytest (>=6.2.0)", "pytest-benchmark", "pytest-cov", "pytest-xdist"]
|
||||
test-randomorder = ["pytest-randomly"]
|
||||
|
||||
[[package]]
|
||||
@@ -2094,7 +2099,6 @@ files = [
|
||||
{file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:69b023b2b4daa7548bcfbd4aa3da05b3a74b772db9e23b982788168117739938"},
|
||||
{file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:81e0b275a9ecc9c0c0c07b4b90ba548307583c125f54d5b6946cfee6360c733d"},
|
||||
{file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ba336e390cd8e4d1739f42dfe9bb83a3cc2e80f567d8805e11b46f4a943f5515"},
|
||||
{file = "PyYAML-6.0.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:326c013efe8048858a6d312ddd31d56e468118ad4cdeda36c719bf5bb6192290"},
|
||||
{file = "PyYAML-6.0.1-cp310-cp310-win32.whl", hash = "sha256:bd4af7373a854424dabd882decdc5579653d7868b8fb26dc7d0e99f823aa5924"},
|
||||
{file = "PyYAML-6.0.1-cp310-cp310-win_amd64.whl", hash = "sha256:fd1592b3fdf65fff2ad0004b5e363300ef59ced41c2e6b3a99d4089fa8c5435d"},
|
||||
{file = "PyYAML-6.0.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6965a7bc3cf88e5a1c3bd2e0b5c22f8d677dc88a455344035f03399034eb3007"},
|
||||
@@ -2102,16 +2106,8 @@ files = [
|
||||
{file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:42f8152b8dbc4fe7d96729ec2b99c7097d656dc1213a3229ca5383f973a5ed6d"},
|
||||
{file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:062582fca9fabdd2c8b54a3ef1c978d786e0f6b3a1510e0ac93ef59e0ddae2bc"},
|
||||
{file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d2b04aac4d386b172d5b9692e2d2da8de7bfb6c387fa4f801fbf6fb2e6ba4673"},
|
||||
{file = "PyYAML-6.0.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:e7d73685e87afe9f3b36c799222440d6cf362062f78be1013661b00c5c6f678b"},
|
||||
{file = "PyYAML-6.0.1-cp311-cp311-win32.whl", hash = "sha256:1635fd110e8d85d55237ab316b5b011de701ea0f29d07611174a1b42f1444741"},
|
||||
{file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"},
|
||||
{file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"},
|
||||
{file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"},
|
||||
{file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"},
|
||||
{file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"},
|
||||
{file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"},
|
||||
{file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"},
|
||||
{file = "PyYAML-6.0.1-cp312-cp312-win_amd64.whl", hash = "sha256:0d3304d8c0adc42be59c5f8a4d9e3d7379e6955ad754aa9d6ab7a398b59dd1df"},
|
||||
{file = "PyYAML-6.0.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:50550eb667afee136e9a77d6dc71ae76a44df8b3e51e41b77f6de2932bfe0f47"},
|
||||
{file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1fe35611261b29bd1de0070f0b2f47cb6ff71fa6595c077e42bd0c419fa27b98"},
|
||||
{file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:704219a11b772aea0d8ecd7058d0082713c3562b4e271b849ad7dc4a5c90c13c"},
|
||||
@@ -2128,7 +2124,6 @@ files = [
|
||||
{file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a0cd17c15d3bb3fa06978b4e8958dcdc6e0174ccea823003a106c7d4d7899ac5"},
|
||||
{file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:28c119d996beec18c05208a8bd78cbe4007878c6dd15091efb73a30e90539696"},
|
||||
{file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7e07cbde391ba96ab58e532ff4803f79c4129397514e1413a7dc761ccd755735"},
|
||||
{file = "PyYAML-6.0.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:49a183be227561de579b4a36efbb21b3eab9651dd81b1858589f796549873dd6"},
|
||||
{file = "PyYAML-6.0.1-cp38-cp38-win32.whl", hash = "sha256:184c5108a2aca3c5b3d3bf9395d50893a7ab82a38004c8f61c258d4428e80206"},
|
||||
{file = "PyYAML-6.0.1-cp38-cp38-win_amd64.whl", hash = "sha256:1e2722cc9fbb45d9b87631ac70924c11d3a401b2d7f410cc0e3bbf249f2dca62"},
|
||||
{file = "PyYAML-6.0.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:9eb6caa9a297fc2c2fb8862bc5370d0303ddba53ba97e71f08023b6cd73d16a8"},
|
||||
@@ -2136,7 +2131,6 @@ files = [
|
||||
{file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5773183b6446b2c99bb77e77595dd486303b4faab2b086e7b17bc6bef28865f6"},
|
||||
{file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:b786eecbdf8499b9ca1d697215862083bd6d2a99965554781d0d8d1ad31e13a0"},
|
||||
{file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc1bf2925a1ecd43da378f4db9e4f799775d6367bdb94671027b73b393a7c42c"},
|
||||
{file = "PyYAML-6.0.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:04ac92ad1925b2cff1db0cfebffb6ffc43457495c9b3c39d3fcae417d7125dc5"},
|
||||
{file = "PyYAML-6.0.1-cp39-cp39-win32.whl", hash = "sha256:faca3bdcf85b2fc05d06ff3fbc1f83e1391b3e724afa3feba7d13eeab355484c"},
|
||||
{file = "PyYAML-6.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:510c9deebc5c0225e8c96813043e62b680ba2f9c50a08d3724c7f28a747d1486"},
|
||||
{file = "PyYAML-6.0.1.tar.gz", hash = "sha256:bfdf460b1736c775f2ba9f6a92bca30bc2095067b8a9d77876d1fad6cc3b4a43"},
|
||||
@@ -2409,13 +2403,13 @@ doc = ["Sphinx", "sphinx-rtd-theme"]
|
||||
|
||||
[[package]]
|
||||
name = "sentry-sdk"
|
||||
version = "2.13.0"
|
||||
version = "2.12.0"
|
||||
description = "Python client for Sentry (https://sentry.io)"
|
||||
optional = true
|
||||
python-versions = ">=3.6"
|
||||
files = [
|
||||
{file = "sentry_sdk-2.13.0-py2.py3-none-any.whl", hash = "sha256:6beede8fc2ab4043da7f69d95534e320944690680dd9a963178a49de71d726c6"},
|
||||
{file = "sentry_sdk-2.13.0.tar.gz", hash = "sha256:8d4a576f7a98eb2fdb40e13106e41f330e5c79d72a68be1316e7852cf4995260"},
|
||||
{file = "sentry_sdk-2.12.0-py2.py3-none-any.whl", hash = "sha256:7a8d5163d2ba5c5f4464628c6b68f85e86972f7c636acc78aed45c61b98b7a5e"},
|
||||
{file = "sentry_sdk-2.12.0.tar.gz", hash = "sha256:8763840497b817d44c49b3fe3f5f7388d083f2337ffedf008b2cdb63b5c86dc6"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@@ -2442,7 +2436,6 @@ httpx = ["httpx (>=0.16.0)"]
|
||||
huey = ["huey (>=2)"]
|
||||
huggingface-hub = ["huggingface-hub (>=0.22)"]
|
||||
langchain = ["langchain (>=0.0.210)"]
|
||||
litestar = ["litestar (>=2.0.0)"]
|
||||
loguru = ["loguru (>=0.5)"]
|
||||
openai = ["openai (>=1.0.0)", "tiktoken (>=0.3.0)"]
|
||||
opentelemetry = ["opentelemetry-distro (>=0.35b0)"]
|
||||
@@ -2809,13 +2802,13 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "types-jsonschema"
|
||||
version = "4.23.0.20240813"
|
||||
version = "4.23.0.20240712"
|
||||
description = "Typing stubs for jsonschema"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "types-jsonschema-4.23.0.20240813.tar.gz", hash = "sha256:c93f48206f209a5bc4608d295ac39f172fb98b9e24159ce577dbd25ddb79a1c0"},
|
||||
{file = "types_jsonschema-4.23.0.20240813-py3-none-any.whl", hash = "sha256:be283e23f0b87547316c2ee6b0fd36d95ea30e921db06478029e10b5b6aa6ac3"},
|
||||
{file = "types-jsonschema-4.23.0.20240712.tar.gz", hash = "sha256:b20db728dcf7ea3e80e9bdeb55e8b8420c6c040cda14e8cf284465adee71d217"},
|
||||
{file = "types_jsonschema-4.23.0.20240712-py3-none-any.whl", hash = "sha256:8c33177ce95336241c1d61ccb56a9964d4361b99d5f1cd81a1ab4909b0dd7cf4"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@@ -2907,13 +2900,13 @@ urllib3 = ">=2"
|
||||
|
||||
[[package]]
|
||||
name = "types-setuptools"
|
||||
version = "71.1.0.20240818"
|
||||
version = "71.1.0.20240726"
|
||||
description = "Typing stubs for setuptools"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "types-setuptools-71.1.0.20240818.tar.gz", hash = "sha256:f62eaffaa39774462c65fbb49368c4dc1d91a90a28371cb14e1af090ff0e41e3"},
|
||||
{file = "types_setuptools-71.1.0.20240818-py3-none-any.whl", hash = "sha256:c4f95302f88369ac0ac46c67ddbfc70c6c4dbbb184d9fed356244217a2934025"},
|
||||
{file = "types-setuptools-71.1.0.20240726.tar.gz", hash = "sha256:85ba28e9461bb1be86ebba4db0f1c2408f2b11115b1966334ea9dc464e29303e"},
|
||||
{file = "types_setuptools-71.1.0.20240726-py3-none-any.whl", hash = "sha256:a7775376f36e0ff09bcad236bf265777590a66b11623e48c20bfc30f1444ea36"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -38,7 +38,6 @@ from mypy.types import (
|
||||
NoneType,
|
||||
TupleType,
|
||||
TypeAliasType,
|
||||
TypeVarType,
|
||||
UninhabitedType,
|
||||
UnionType,
|
||||
)
|
||||
@@ -234,7 +233,6 @@ IMMUTABLE_CUSTOM_TYPES = {
|
||||
"synapse.synapse_rust.push.FilteredPushRules",
|
||||
# This is technically not immutable, but close enough.
|
||||
"signedjson.types.VerifyKey",
|
||||
"synapse.types.StrCollection",
|
||||
}
|
||||
|
||||
# Immutable containers only if the values are also immutable.
|
||||
@@ -300,7 +298,7 @@ def is_cacheable(
|
||||
|
||||
elif rt.type.fullname in MUTABLE_CONTAINER_TYPES:
|
||||
# Mutable containers are mutable regardless of their underlying type.
|
||||
return False, f"container {rt.type.fullname} is mutable"
|
||||
return False, None
|
||||
|
||||
elif "attrs" in rt.type.metadata:
|
||||
# attrs classes are only cachable iff it is frozen (immutable itself)
|
||||
@@ -320,9 +318,6 @@ def is_cacheable(
|
||||
else:
|
||||
return False, "non-frozen attrs class"
|
||||
|
||||
elif rt.type.is_enum:
|
||||
# We assume Enum values are immutable
|
||||
return True, None
|
||||
else:
|
||||
# Ensure we fail for unknown types, these generally means that the
|
||||
# above code is not complete.
|
||||
@@ -331,18 +326,6 @@ def is_cacheable(
|
||||
f"Don't know how to handle {rt.type.fullname} return type instance",
|
||||
)
|
||||
|
||||
elif isinstance(rt, TypeVarType):
|
||||
# We consider TypeVars immutable if they are bound to a set of immutable
|
||||
# types.
|
||||
if rt.values:
|
||||
for value in rt.values:
|
||||
ok, note = is_cacheable(value, signature, verbose)
|
||||
if not ok:
|
||||
return False, f"TypeVar bound not cacheable {value}"
|
||||
return True, None
|
||||
|
||||
return False, "TypeVar is unbound"
|
||||
|
||||
elif isinstance(rt, NoneType):
|
||||
# None is cachable.
|
||||
return True, None
|
||||
|
||||
@@ -129,11 +129,6 @@ BOOLEAN_COLUMNS = {
|
||||
"remote_media_cache": ["authenticated"],
|
||||
"room_stats_state": ["is_federatable"],
|
||||
"rooms": ["is_public", "has_auth_chain_index"],
|
||||
"sliding_sync_joined_rooms": ["is_encrypted"],
|
||||
"sliding_sync_membership_snapshots": [
|
||||
"has_known_state",
|
||||
"is_encrypted",
|
||||
],
|
||||
"users": ["shadow_banned", "approved", "locked", "suspended"],
|
||||
"un_partial_stated_event_stream": ["rejection_status_changed"],
|
||||
"users_who_share_rooms": ["share_private"],
|
||||
|
||||
@@ -245,8 +245,6 @@ class EventContentFields:
|
||||
# `m.room.encryption`` algorithm field
|
||||
ENCRYPTION_ALGORITHM: Final = "algorithm"
|
||||
|
||||
TOMBSTONE_SUCCESSOR_ROOM: Final = "replacement_room"
|
||||
|
||||
|
||||
class EventUnsignedContentFields:
|
||||
"""Fields found inside the 'unsigned' data on events"""
|
||||
|
||||
@@ -98,7 +98,6 @@ from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
|
||||
from synapse.storage.databases.main.search import SearchStore
|
||||
from synapse.storage.databases.main.session import SessionStore
|
||||
from synapse.storage.databases.main.signatures import SignatureWorkerStore
|
||||
from synapse.storage.databases.main.sliding_sync import SlidingSyncStore
|
||||
from synapse.storage.databases.main.state import StateGroupWorkerStore
|
||||
from synapse.storage.databases.main.stats import StatsStore
|
||||
from synapse.storage.databases.main.stream import StreamWorkerStore
|
||||
@@ -160,7 +159,6 @@ class GenericWorkerStore(
|
||||
SessionStore,
|
||||
TaskSchedulerWorkerStore,
|
||||
ExperimentalFeaturesStore,
|
||||
SlidingSyncStore,
|
||||
):
|
||||
# Properties that multiple storage classes define. Tell mypy what the
|
||||
# expected type is.
|
||||
|
||||
@@ -17,7 +17,6 @@ import logging
|
||||
from itertools import chain
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
AbstractSet,
|
||||
Any,
|
||||
Dict,
|
||||
List,
|
||||
@@ -46,6 +45,13 @@ from synapse.events.utils import parse_stripped_state_event, strip_event
|
||||
from synapse.handlers.relations import BundledAggregations
|
||||
from synapse.handlers.sliding_sync.extensions import SlidingSyncExtensionHandler
|
||||
from synapse.handlers.sliding_sync.store import SlidingSyncConnectionStore
|
||||
from synapse.handlers.sliding_sync.types import (
|
||||
HaveSentRoomFlag,
|
||||
MutablePerConnectionState,
|
||||
PerConnectionState,
|
||||
RoomSyncConfig,
|
||||
StateValues,
|
||||
)
|
||||
from synapse.logging.opentracing import (
|
||||
SynapseTags,
|
||||
log_kv,
|
||||
@@ -77,17 +83,7 @@ from synapse.types import (
|
||||
StreamToken,
|
||||
UserID,
|
||||
)
|
||||
from synapse.types.handlers.sliding_sync import (
|
||||
SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
|
||||
HaveSentRoomFlag,
|
||||
MutablePerConnectionState,
|
||||
OperationType,
|
||||
PerConnectionState,
|
||||
RoomSyncConfig,
|
||||
SlidingSyncConfig,
|
||||
SlidingSyncResult,
|
||||
StateValues,
|
||||
)
|
||||
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.visibility import filter_events_for_client
|
||||
@@ -111,6 +107,18 @@ class Sentinel(enum.Enum):
|
||||
UNSET_SENTINEL = object()
|
||||
|
||||
|
||||
# The event types that clients should consider as new activity.
|
||||
DEFAULT_BUMP_EVENT_TYPES = {
|
||||
EventTypes.Create,
|
||||
EventTypes.Message,
|
||||
EventTypes.Encrypted,
|
||||
EventTypes.Sticker,
|
||||
EventTypes.CallInvite,
|
||||
EventTypes.PollStart,
|
||||
EventTypes.LiveLocationShareStart,
|
||||
}
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class _RoomMembershipForUser:
|
||||
"""
|
||||
@@ -198,7 +206,7 @@ class SlidingSyncHandler:
|
||||
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
|
||||
self.connection_store = SlidingSyncConnectionStore(self.store)
|
||||
self.connection_store = SlidingSyncConnectionStore()
|
||||
self.extensions = SlidingSyncExtensionHandler(hs)
|
||||
|
||||
async def wait_for_sync_for_user(
|
||||
@@ -322,7 +330,11 @@ class SlidingSyncHandler:
|
||||
sync_config, from_token
|
||||
)
|
||||
)
|
||||
new_connection_state = previous_connection_state.get_mutable()
|
||||
|
||||
await self.connection_store.mark_token_seen(
|
||||
sync_config=sync_config,
|
||||
from_token=from_token,
|
||||
)
|
||||
|
||||
# Get all of the room IDs that the user should be able to see in the sync
|
||||
# response
|
||||
@@ -340,10 +352,6 @@ class SlidingSyncHandler:
|
||||
)
|
||||
)
|
||||
|
||||
lists_to_rooms: Mapping[str, AbstractSet[str]] = {}
|
||||
if previous_connection_state is not None:
|
||||
lists_to_rooms = previous_connection_state.list_to_rooms
|
||||
|
||||
# Assemble sliding window lists
|
||||
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
|
||||
# Keep track of the rooms that we can display and need to fetch more info about
|
||||
@@ -360,26 +368,13 @@ class SlidingSyncHandler:
|
||||
|
||||
for list_key, list_config in sync_config.lists.items():
|
||||
# Apply filters
|
||||
previous_found_rooms = lists_to_rooms.get(list_key)
|
||||
if previous_found_rooms:
|
||||
filtered_sync_room_map = {
|
||||
room_id: sync_room_map[room_id]
|
||||
for room_id in previous_found_rooms
|
||||
}
|
||||
|
||||
# TODO: Record changes to the list.
|
||||
else:
|
||||
filtered_sync_room_map = sync_room_map
|
||||
if list_config.filters is not None:
|
||||
filtered_sync_room_map = await self.filter_rooms(
|
||||
sync_config.user,
|
||||
sync_room_map,
|
||||
list_config.filters,
|
||||
to_token,
|
||||
)
|
||||
|
||||
new_connection_state.list_to_rooms[list_key] = set(
|
||||
filtered_sync_room_map.keys()
|
||||
filtered_sync_room_map = sync_room_map
|
||||
if list_config.filters is not None:
|
||||
filtered_sync_room_map = await self.filter_rooms(
|
||||
sync_config.user,
|
||||
sync_room_map,
|
||||
list_config.filters,
|
||||
to_token,
|
||||
)
|
||||
|
||||
# Find which rooms are partially stated and may need to be filtered out
|
||||
@@ -435,11 +430,15 @@ class SlidingSyncHandler:
|
||||
room_id
|
||||
)
|
||||
if existing_room_sync_config is not None:
|
||||
room_sync_config = existing_room_sync_config.combine_room_sync_config(
|
||||
existing_room_sync_config.combine_room_sync_config(
|
||||
room_sync_config
|
||||
)
|
||||
|
||||
relevant_room_map[room_id] = room_sync_config
|
||||
else:
|
||||
# Make a copy so if we modify it later, it doesn't
|
||||
# affect all references.
|
||||
relevant_room_map[room_id] = (
|
||||
room_sync_config.deep_copy()
|
||||
)
|
||||
|
||||
room_ids_in_list.append(room_id)
|
||||
|
||||
@@ -504,13 +503,11 @@ class SlidingSyncHandler:
|
||||
# and need to fetch more info about.
|
||||
existing_room_sync_config = relevant_room_map.get(room_id)
|
||||
if existing_room_sync_config is not None:
|
||||
room_sync_config = (
|
||||
existing_room_sync_config.combine_room_sync_config(
|
||||
room_sync_config
|
||||
)
|
||||
existing_room_sync_config.combine_room_sync_config(
|
||||
room_sync_config
|
||||
)
|
||||
|
||||
relevant_room_map[room_id] = room_sync_config
|
||||
else:
|
||||
relevant_room_map[room_id] = room_sync_config
|
||||
|
||||
# Fetch room data
|
||||
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
|
||||
@@ -572,6 +569,8 @@ class SlidingSyncHandler:
|
||||
if room_id in rooms_should_send
|
||||
}
|
||||
|
||||
new_connection_state = previous_connection_state.get_mutable()
|
||||
|
||||
@trace
|
||||
@tag_args
|
||||
async def handle_room(room_id: str) -> None:
|
||||
@@ -2230,9 +2229,7 @@ class SlidingSyncHandler:
|
||||
# Figure out the last bump event in the room
|
||||
last_bump_event_result = (
|
||||
await self.store.get_last_event_pos_in_room_before_stream_ordering(
|
||||
room_id,
|
||||
to_token.room_key,
|
||||
event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
|
||||
room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -19,6 +19,11 @@ from typing_extensions import assert_never
|
||||
|
||||
from synapse.api.constants import AccountDataTypes
|
||||
from synapse.handlers.receipts import ReceiptEventSource
|
||||
from synapse.handlers.sliding_sync.types import (
|
||||
HaveSentRoomFlag,
|
||||
MutablePerConnectionState,
|
||||
PerConnectionState,
|
||||
)
|
||||
from synapse.logging.opentracing import trace
|
||||
from synapse.types import (
|
||||
DeviceListUpdates,
|
||||
@@ -27,14 +32,7 @@ from synapse.types import (
|
||||
SlidingSyncStreamToken,
|
||||
StreamToken,
|
||||
)
|
||||
from synapse.types.handlers.sliding_sync import (
|
||||
HaveSentRoomFlag,
|
||||
MutablePerConnectionState,
|
||||
OperationType,
|
||||
PerConnectionState,
|
||||
SlidingSyncConfig,
|
||||
SlidingSyncResult,
|
||||
)
|
||||
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
@@ -13,18 +13,18 @@
|
||||
#
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
from typing import TYPE_CHECKING, Dict, Optional, Tuple
|
||||
|
||||
import attr
|
||||
|
||||
from synapse.logging.opentracing import trace
|
||||
from synapse.storage.databases.main import DataStore
|
||||
from synapse.types import SlidingSyncStreamToken
|
||||
from synapse.types.handlers.sliding_sync import (
|
||||
from synapse.api.errors import SlidingSyncUnknownPosition
|
||||
from synapse.handlers.sliding_sync.types import (
|
||||
MutablePerConnectionState,
|
||||
PerConnectionState,
|
||||
SlidingSyncConfig,
|
||||
)
|
||||
from synapse.logging.opentracing import trace
|
||||
from synapse.types import SlidingSyncStreamToken
|
||||
from synapse.types.handlers import SlidingSyncConfig
|
||||
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
@@ -61,7 +61,20 @@ class SlidingSyncConnectionStore:
|
||||
to mapping of room ID to `HaveSentRoom`.
|
||||
"""
|
||||
|
||||
store: "DataStore"
|
||||
# `(user_id, conn_id)` -> `connection_position` -> `PerConnectionState`
|
||||
_connections: Dict[Tuple[str, str], Dict[int, PerConnectionState]] = attr.Factory(
|
||||
dict
|
||||
)
|
||||
|
||||
async def is_valid_token(
|
||||
self, sync_config: SlidingSyncConfig, connection_token: int
|
||||
) -> bool:
|
||||
"""Return whether the connection token is valid/recognized"""
|
||||
if connection_token == 0:
|
||||
return True
|
||||
|
||||
conn_key = self._get_connection_key(sync_config)
|
||||
return connection_token in self._connections.get(conn_key, {})
|
||||
|
||||
async def get_per_connection_state(
|
||||
self,
|
||||
@@ -73,20 +86,23 @@ class SlidingSyncConnectionStore:
|
||||
Raises:
|
||||
SlidingSyncUnknownPosition if the connection_token is unknown
|
||||
"""
|
||||
if from_token is None or from_token.connection_position == 0:
|
||||
if from_token is None:
|
||||
return PerConnectionState()
|
||||
|
||||
conn_id = sync_config.conn_id or ""
|
||||
connection_position = from_token.connection_position
|
||||
if connection_position == 0:
|
||||
# Initial sync (request without a `from_token`) starts at `0` so
|
||||
# there is no existing per-connection state
|
||||
return PerConnectionState()
|
||||
|
||||
device_id = sync_config.requester.device_id
|
||||
assert device_id is not None
|
||||
conn_key = self._get_connection_key(sync_config)
|
||||
sync_statuses = self._connections.get(conn_key, {})
|
||||
connection_state = sync_statuses.get(connection_position)
|
||||
|
||||
return await self.store.get_per_connection_state(
|
||||
sync_config.user.to_string(),
|
||||
device_id,
|
||||
conn_id,
|
||||
from_token.connection_position,
|
||||
)
|
||||
if connection_state is None:
|
||||
raise SlidingSyncUnknownPosition()
|
||||
|
||||
return connection_state
|
||||
|
||||
@trace
|
||||
async def record_new_state(
|
||||
@@ -100,24 +116,85 @@ class SlidingSyncConnectionStore:
|
||||
If there are no changes to the state this may return the same token as
|
||||
the existing per-connection state.
|
||||
"""
|
||||
prev_connection_token = 0
|
||||
if from_token is not None:
|
||||
prev_connection_token = from_token.connection_position
|
||||
|
||||
if not new_connection_state.has_updates():
|
||||
if from_token is not None:
|
||||
return from_token.connection_position
|
||||
else:
|
||||
return 0
|
||||
return prev_connection_token
|
||||
|
||||
if from_token is not None and from_token.connection_position == 0:
|
||||
from_token = None
|
||||
conn_key = self._get_connection_key(sync_config)
|
||||
sync_statuses = self._connections.setdefault(conn_key, {})
|
||||
|
||||
# Generate a new token, removing any existing entries in that token
|
||||
# (which can happen if requests get resent).
|
||||
new_store_token = prev_connection_token + 1
|
||||
sync_statuses.pop(new_store_token, None)
|
||||
|
||||
# We copy the `MutablePerConnectionState` so that the inner `ChainMap`s
|
||||
# don't grow forever.
|
||||
sync_statuses[new_store_token] = new_connection_state.copy()
|
||||
|
||||
return new_store_token
|
||||
|
||||
@trace
|
||||
async def mark_token_seen(
|
||||
self,
|
||||
sync_config: SlidingSyncConfig,
|
||||
from_token: Optional[SlidingSyncStreamToken],
|
||||
) -> None:
|
||||
"""We have received a request with the given token, so we can clear out
|
||||
any other tokens associated with the connection.
|
||||
|
||||
If there is no from token then we have started afresh, and so we delete
|
||||
all tokens associated with the device.
|
||||
"""
|
||||
# Clear out any tokens for the connection that doesn't match the one
|
||||
# from the request.
|
||||
|
||||
conn_key = self._get_connection_key(sync_config)
|
||||
sync_statuses = self._connections.pop(conn_key, {})
|
||||
if from_token is None:
|
||||
return
|
||||
|
||||
sync_statuses = {
|
||||
connection_token: room_statuses
|
||||
for connection_token, room_statuses in sync_statuses.items()
|
||||
if connection_token == from_token.connection_position
|
||||
}
|
||||
if sync_statuses:
|
||||
self._connections[conn_key] = sync_statuses
|
||||
|
||||
@staticmethod
|
||||
def _get_connection_key(sync_config: SlidingSyncConfig) -> Tuple[str, str]:
|
||||
"""Return a unique identifier for this connection.
|
||||
|
||||
The first part is simply the user ID.
|
||||
|
||||
The second part is generally a combination of device ID and conn_id.
|
||||
However, both these two are optional (e.g. puppet access tokens don't
|
||||
have device IDs), so this handles those edge cases.
|
||||
|
||||
We use this over the raw `conn_id` to avoid clashes between different
|
||||
clients that use the same `conn_id`. Imagine a user uses a web client
|
||||
that uses `conn_id: main_sync_loop` and an Android client that also has
|
||||
a `conn_id: main_sync_loop`.
|
||||
"""
|
||||
|
||||
user_id = sync_config.user.to_string()
|
||||
|
||||
# Only one sliding sync connection is allowed per given conn_id (empty
|
||||
# or not).
|
||||
conn_id = sync_config.conn_id or ""
|
||||
|
||||
device_id = sync_config.requester.device_id
|
||||
assert device_id is not None
|
||||
if sync_config.requester.device_id:
|
||||
return (user_id, f"D/{sync_config.requester.device_id}/{conn_id}")
|
||||
|
||||
return await self.store.persist_per_connection_state(
|
||||
sync_config.user.to_string(),
|
||||
device_id,
|
||||
conn_id,
|
||||
from_token.connection_position if from_token else None,
|
||||
new_connection_state,
|
||||
)
|
||||
if sync_config.requester.access_token_id:
|
||||
# If we don't have a device, then the access token ID should be a
|
||||
# stable ID.
|
||||
return (user_id, f"A/{sync_config.requester.access_token_id}/{conn_id}")
|
||||
|
||||
# If we have neither then its likely an AS or some weird token. Either
|
||||
# way we can just fail here.
|
||||
raise Exception("Cannot use sliding sync with access token type")
|
||||
|
||||
@@ -18,393 +18,29 @@ from collections import ChainMap
|
||||
from enum import Enum
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
AbstractSet,
|
||||
Callable,
|
||||
Dict,
|
||||
Final,
|
||||
Generic,
|
||||
List,
|
||||
Mapping,
|
||||
MutableMapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Set,
|
||||
Tuple,
|
||||
TypeVar,
|
||||
cast,
|
||||
)
|
||||
|
||||
import attr
|
||||
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.types import MultiWriterStreamToken, RoomStreamToken, StrCollection, UserID
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import Extra
|
||||
else:
|
||||
from pydantic import Extra
|
||||
|
||||
from synapse.events import EventBase
|
||||
from synapse.types import (
|
||||
DeviceListUpdates,
|
||||
JsonDict,
|
||||
JsonMapping,
|
||||
Requester,
|
||||
SlidingSyncStreamToken,
|
||||
StreamToken,
|
||||
)
|
||||
from synapse.types.rest.client import SlidingSyncBody
|
||||
from synapse.types.handlers import SlidingSyncConfig
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.handlers.relations import BundledAggregations
|
||||
pass
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Sliding Sync: The event types that clients should consider as new activity and affect
|
||||
# the `bump_stamp`
|
||||
SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES = {
|
||||
EventTypes.Create,
|
||||
EventTypes.Message,
|
||||
EventTypes.Encrypted,
|
||||
EventTypes.Sticker,
|
||||
EventTypes.CallInvite,
|
||||
EventTypes.PollStart,
|
||||
EventTypes.LiveLocationShareStart,
|
||||
}
|
||||
|
||||
|
||||
class SlidingSyncConfig(SlidingSyncBody):
|
||||
"""
|
||||
Inherit from `SlidingSyncBody` since we need all of the same fields and add a few
|
||||
extra fields that we need in the handler
|
||||
"""
|
||||
|
||||
user: UserID
|
||||
requester: Requester
|
||||
|
||||
# Pydantic config
|
||||
class Config:
|
||||
# By default, ignore fields that we don't recognise.
|
||||
extra = Extra.ignore
|
||||
# By default, don't allow fields to be reassigned after parsing.
|
||||
allow_mutation = False
|
||||
# Allow custom types like `UserID` to be used in the model
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
|
||||
class OperationType(Enum):
|
||||
"""
|
||||
Represents the operation types in a Sliding Sync window.
|
||||
|
||||
Attributes:
|
||||
SYNC: Sets a range of entries. Clients SHOULD discard what they previous knew about
|
||||
entries in this range.
|
||||
INSERT: Sets a single entry. If the position is not empty then clients MUST move
|
||||
entries to the left or the right depending on where the closest empty space is.
|
||||
DELETE: Remove a single entry. Often comes before an INSERT to allow entries to move
|
||||
places.
|
||||
INVALIDATE: Remove a range of entries. Clients MAY persist the invalidated range for
|
||||
offline support, but they should be treated as empty when additional operations
|
||||
which concern indexes in the range arrive from the server.
|
||||
"""
|
||||
|
||||
SYNC: Final = "SYNC"
|
||||
INSERT: Final = "INSERT"
|
||||
DELETE: Final = "DELETE"
|
||||
INVALIDATE: Final = "INVALIDATE"
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class SlidingSyncResult:
|
||||
"""
|
||||
The Sliding Sync result to be serialized to JSON for a response.
|
||||
|
||||
Attributes:
|
||||
next_pos: The next position token in the sliding window to request (next_batch).
|
||||
lists: Sliding window API. A map of list key to list results.
|
||||
rooms: Room subscription API. A map of room ID to room results.
|
||||
extensions: Extensions API. A map of extension key to extension results.
|
||||
"""
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class RoomResult:
|
||||
"""
|
||||
Attributes:
|
||||
name: Room name or calculated room name.
|
||||
avatar: Room avatar
|
||||
heroes: List of stripped membership events (containing `user_id` and optionally
|
||||
`avatar_url` and `displayname`) for the users used to calculate the room name.
|
||||
is_dm: Flag to specify whether the room is a direct-message room (most likely
|
||||
between two people).
|
||||
initial: Flag which is set when this is the first time the server is sending this
|
||||
data on this connection. Clients can use this flag to replace or update
|
||||
their local state. When there is an update, servers MUST omit this flag
|
||||
entirely and NOT send "initial":false as this is wasteful on bandwidth. The
|
||||
absence of this flag means 'false'.
|
||||
unstable_expanded_timeline: Flag which is set if we're returning more historic
|
||||
events due to the timeline limit having increased. See "XXX: Odd behavior"
|
||||
comment ing `synapse.handlers.sliding_sync`.
|
||||
required_state: The current state of the room
|
||||
timeline: Latest events in the room. The last event is the most recent.
|
||||
bundled_aggregations: A mapping of event ID to the bundled aggregations for
|
||||
the timeline events above. This allows clients to show accurate reaction
|
||||
counts (or edits, threads), even if some of the reaction events were skipped
|
||||
over in a gappy sync.
|
||||
stripped_state: Stripped state events (for rooms where the usre is
|
||||
invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2,
|
||||
absent on joined/left rooms
|
||||
prev_batch: A token that can be passed as a start parameter to the
|
||||
`/rooms/<room_id>/messages` API to retrieve earlier messages.
|
||||
limited: True if there are more events than `timeline_limit` looking
|
||||
backwards from the `response.pos` to the `request.pos`.
|
||||
num_live: The number of timeline events which have just occurred and are not historical.
|
||||
The last N events are 'live' and should be treated as such. This is mostly
|
||||
useful to determine whether a given @mention event should make a noise or not.
|
||||
Clients cannot rely solely on the absence of `initial: true` to determine live
|
||||
events because if a room not in the sliding window bumps into the window because
|
||||
of an @mention it will have `initial: true` yet contain a single live event
|
||||
(with potentially other old events in the timeline).
|
||||
bump_stamp: The `stream_ordering` of the last event according to the
|
||||
`bump_event_types`. This helps clients sort more readily without them
|
||||
needing to pull in a bunch of the timeline to determine the last activity.
|
||||
`bump_event_types` is a thing because for example, we don't want display
|
||||
name changes to mark the room as unread and bump it to the top. For
|
||||
encrypted rooms, we just have to consider any activity as a bump because we
|
||||
can't see the content and the client has to figure it out for themselves.
|
||||
joined_count: The number of users with membership of join, including the client's
|
||||
own user ID. (same as sync `v2 m.joined_member_count`)
|
||||
invited_count: The number of users with membership of invite. (same as sync v2
|
||||
`m.invited_member_count`)
|
||||
notification_count: The total number of unread notifications for this room. (same
|
||||
as sync v2)
|
||||
highlight_count: The number of unread notifications for this room with the highlight
|
||||
flag set. (same as sync v2)
|
||||
"""
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class StrippedHero:
|
||||
user_id: str
|
||||
display_name: Optional[str]
|
||||
avatar_url: Optional[str]
|
||||
|
||||
name: Optional[str]
|
||||
avatar: Optional[str]
|
||||
heroes: Optional[List[StrippedHero]]
|
||||
is_dm: bool
|
||||
initial: bool
|
||||
unstable_expanded_timeline: bool
|
||||
# Should be empty for invite/knock rooms with `stripped_state`
|
||||
required_state: List[EventBase]
|
||||
# Should be empty for invite/knock rooms with `stripped_state`
|
||||
timeline_events: List[EventBase]
|
||||
bundled_aggregations: Optional[Dict[str, "BundledAggregations"]]
|
||||
# Optional because it's only relevant to invite/knock rooms
|
||||
stripped_state: List[JsonDict]
|
||||
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
|
||||
prev_batch: Optional[StreamToken]
|
||||
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
|
||||
limited: Optional[bool]
|
||||
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
|
||||
num_live: Optional[int]
|
||||
bump_stamp: int
|
||||
joined_count: int
|
||||
invited_count: int
|
||||
notification_count: int
|
||||
highlight_count: int
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return (
|
||||
# If this is the first time the client is seeing the room, we should not filter it out
|
||||
# under any circumstance.
|
||||
self.initial
|
||||
# We need to let the client know if there are any new events
|
||||
or bool(self.required_state)
|
||||
or bool(self.timeline_events)
|
||||
or bool(self.stripped_state)
|
||||
)
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class SlidingWindowList:
|
||||
"""
|
||||
Attributes:
|
||||
count: The total number of entries in the list. Always present if this list
|
||||
is.
|
||||
ops: The sliding list operations to perform.
|
||||
"""
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class Operation:
|
||||
"""
|
||||
Attributes:
|
||||
op: The operation type to perform.
|
||||
range: Which index positions are affected by this operation. These are
|
||||
both inclusive.
|
||||
room_ids: Which room IDs are affected by this operation. These IDs match
|
||||
up to the positions in the `range`, so the last room ID in this list
|
||||
matches the 9th index. The room data is held in a separate object.
|
||||
"""
|
||||
|
||||
op: OperationType
|
||||
range: Tuple[int, int]
|
||||
room_ids: List[str]
|
||||
|
||||
count: int
|
||||
ops: List[Operation]
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class Extensions:
|
||||
"""Responses for extensions
|
||||
|
||||
Attributes:
|
||||
to_device: The to-device extension (MSC3885)
|
||||
e2ee: The E2EE device extension (MSC3884)
|
||||
"""
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class ToDeviceExtension:
|
||||
"""The to-device extension (MSC3885)
|
||||
|
||||
Attributes:
|
||||
next_batch: The to-device stream token the client should use
|
||||
to get more results
|
||||
events: A list of to-device messages for the client
|
||||
"""
|
||||
|
||||
next_batch: str
|
||||
events: Sequence[JsonMapping]
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return bool(self.events)
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class E2eeExtension:
|
||||
"""The E2EE device extension (MSC3884)
|
||||
|
||||
Attributes:
|
||||
device_list_updates: List of user_ids whose devices have changed or left (only
|
||||
present on incremental syncs).
|
||||
device_one_time_keys_count: Map from key algorithm to the number of
|
||||
unclaimed one-time keys currently held on the server for this device. If
|
||||
an algorithm is unlisted, the count for that algorithm is assumed to be
|
||||
zero. If this entire parameter is missing, the count for all algorithms
|
||||
is assumed to be zero.
|
||||
device_unused_fallback_key_types: List of unused fallback key algorithms
|
||||
for this device.
|
||||
"""
|
||||
|
||||
# Only present on incremental syncs
|
||||
device_list_updates: Optional[DeviceListUpdates]
|
||||
device_one_time_keys_count: Mapping[str, int]
|
||||
device_unused_fallback_key_types: Sequence[str]
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
# Note that "signed_curve25519" is always returned in key count responses
|
||||
# regardless of whether we uploaded any keys for it. This is necessary until
|
||||
# https://github.com/matrix-org/matrix-doc/issues/3298 is fixed.
|
||||
#
|
||||
# Also related:
|
||||
# https://github.com/element-hq/element-android/issues/3725 and
|
||||
# https://github.com/matrix-org/synapse/issues/10456
|
||||
default_otk = self.device_one_time_keys_count.get("signed_curve25519")
|
||||
more_than_default_otk = len(self.device_one_time_keys_count) > 1 or (
|
||||
default_otk is not None and default_otk > 0
|
||||
)
|
||||
|
||||
return bool(
|
||||
more_than_default_otk
|
||||
or self.device_list_updates
|
||||
or self.device_unused_fallback_key_types
|
||||
)
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class AccountDataExtension:
|
||||
"""The Account Data extension (MSC3959)
|
||||
|
||||
Attributes:
|
||||
global_account_data_map: Mapping from `type` to `content` of global account
|
||||
data events.
|
||||
account_data_by_room_map: Mapping from room_id to mapping of `type` to
|
||||
`content` of room account data events.
|
||||
"""
|
||||
|
||||
global_account_data_map: Mapping[str, JsonMapping]
|
||||
account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]]
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return bool(
|
||||
self.global_account_data_map or self.account_data_by_room_map
|
||||
)
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class ReceiptsExtension:
|
||||
"""The Receipts extension (MSC3960)
|
||||
|
||||
Attributes:
|
||||
room_id_to_receipt_map: Mapping from room_id to `m.receipt` ephemeral
|
||||
event (type, content)
|
||||
"""
|
||||
|
||||
room_id_to_receipt_map: Mapping[str, JsonMapping]
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return bool(self.room_id_to_receipt_map)
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class TypingExtension:
|
||||
"""The Typing Notification extension (MSC3961)
|
||||
|
||||
Attributes:
|
||||
room_id_to_typing_map: Mapping from room_id to `m.typing` ephemeral
|
||||
event (type, content)
|
||||
"""
|
||||
|
||||
room_id_to_typing_map: Mapping[str, JsonMapping]
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return bool(self.room_id_to_typing_map)
|
||||
|
||||
to_device: Optional[ToDeviceExtension] = None
|
||||
e2ee: Optional[E2eeExtension] = None
|
||||
account_data: Optional[AccountDataExtension] = None
|
||||
receipts: Optional[ReceiptsExtension] = None
|
||||
typing: Optional[TypingExtension] = None
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return bool(
|
||||
self.to_device
|
||||
or self.e2ee
|
||||
or self.account_data
|
||||
or self.receipts
|
||||
or self.typing
|
||||
)
|
||||
|
||||
next_pos: SlidingSyncStreamToken
|
||||
lists: Dict[str, SlidingWindowList]
|
||||
rooms: Dict[str, RoomResult]
|
||||
extensions: Extensions
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
"""Make the result appear empty if there are no updates. This is used
|
||||
to tell if the notifier needs to wait for more events when polling for
|
||||
events.
|
||||
"""
|
||||
# We don't include `self.lists` here, as a) `lists` is always non-empty even if
|
||||
# there are no changes, and b) since we're sorting rooms by `stream_ordering` of
|
||||
# the latest activity, anything that would cause the order to change would end
|
||||
# up in `self.rooms` and cause us to send down the change.
|
||||
return bool(self.rooms or self.extensions)
|
||||
|
||||
@staticmethod
|
||||
def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
|
||||
"Return a new empty result"
|
||||
return SlidingSyncResult(
|
||||
next_pos=next_pos,
|
||||
lists={},
|
||||
rooms={},
|
||||
extensions=SlidingSyncResult.Extensions(),
|
||||
)
|
||||
|
||||
|
||||
class StateValues:
|
||||
"""
|
||||
@@ -424,7 +60,7 @@ class StateValues:
|
||||
|
||||
# We can't freeze this class because we want to update it in place with the
|
||||
# de-duplicated data.
|
||||
@attr.s(slots=True, auto_attribs=True, frozen=True)
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class RoomSyncConfig:
|
||||
"""
|
||||
Holds the config for what data we should fetch for a room in the sync response.
|
||||
@@ -438,7 +74,7 @@ class RoomSyncConfig:
|
||||
"""
|
||||
|
||||
timeline_limit: int
|
||||
required_state_map: Mapping[str, AbstractSet[str]]
|
||||
required_state_map: Dict[str, Set[str]]
|
||||
|
||||
@classmethod
|
||||
def from_room_config(
|
||||
@@ -512,7 +148,7 @@ class RoomSyncConfig:
|
||||
|
||||
def deep_copy(self) -> "RoomSyncConfig":
|
||||
required_state_map: Dict[str, Set[str]] = {
|
||||
state_type: set(state_key_set)
|
||||
state_type: state_key_set.copy()
|
||||
for state_type, state_key_set in self.required_state_map.items()
|
||||
}
|
||||
|
||||
@@ -523,20 +159,14 @@ class RoomSyncConfig:
|
||||
|
||||
def combine_room_sync_config(
|
||||
self, other_room_sync_config: "RoomSyncConfig"
|
||||
) -> "RoomSyncConfig":
|
||||
) -> None:
|
||||
"""
|
||||
Combine this `RoomSyncConfig` with another `RoomSyncConfig` and return the
|
||||
Combine this `RoomSyncConfig` with another `RoomSyncConfig` and take the
|
||||
superset union of the two.
|
||||
"""
|
||||
timeline_limit = self.timeline_limit
|
||||
required_state_map = {
|
||||
event_type: set(state_keys)
|
||||
for event_type, state_keys in self.required_state_map.items()
|
||||
}
|
||||
|
||||
# Take the highest timeline limit
|
||||
if self.timeline_limit < other_room_sync_config.timeline_limit:
|
||||
timeline_limit = other_room_sync_config.timeline_limit
|
||||
self.timeline_limit = other_room_sync_config.timeline_limit
|
||||
|
||||
# Union the required state
|
||||
for (
|
||||
@@ -545,14 +175,14 @@ class RoomSyncConfig:
|
||||
) in other_room_sync_config.required_state_map.items():
|
||||
# If we already have a wildcard for everything, we don't need to add
|
||||
# anything else
|
||||
if StateValues.WILDCARD in required_state_map.get(
|
||||
if StateValues.WILDCARD in self.required_state_map.get(
|
||||
StateValues.WILDCARD, set()
|
||||
):
|
||||
break
|
||||
|
||||
# If we already have a wildcard `state_key` for this `state_type`, we don't need
|
||||
# to add anything else
|
||||
if StateValues.WILDCARD in required_state_map.get(state_type, set()):
|
||||
if StateValues.WILDCARD in self.required_state_map.get(state_type, set()):
|
||||
continue
|
||||
|
||||
# If we're getting wildcards for the `state_type` and `state_key`, that's
|
||||
@@ -561,14 +191,16 @@ class RoomSyncConfig:
|
||||
state_type == StateValues.WILDCARD
|
||||
and StateValues.WILDCARD in state_key_set
|
||||
):
|
||||
required_state_map = {state_type: {StateValues.WILDCARD}}
|
||||
self.required_state_map = {state_type: {StateValues.WILDCARD}}
|
||||
# We can break, since we don't need to add anything else
|
||||
break
|
||||
|
||||
for state_key in state_key_set:
|
||||
# If we already have a wildcard for this specific `state_key`, we don't need
|
||||
# to add it since the wildcard already covers it.
|
||||
if state_key in required_state_map.get(StateValues.WILDCARD, set()):
|
||||
if state_key in self.required_state_map.get(
|
||||
StateValues.WILDCARD, set()
|
||||
):
|
||||
continue
|
||||
|
||||
# If we're getting a wildcard for the `state_type`, get rid of any other
|
||||
@@ -579,7 +211,7 @@ class RoomSyncConfig:
|
||||
# Make a copy so we don't run into an error: `dictionary changed size
|
||||
# during iteration`, when we remove items
|
||||
for existing_state_type, existing_state_key_set in list(
|
||||
required_state_map.items()
|
||||
self.required_state_map.items()
|
||||
):
|
||||
# Make a copy so we don't run into an error: `Set changed size during
|
||||
# iteration`, when we filter out and remove items
|
||||
@@ -589,21 +221,19 @@ class RoomSyncConfig:
|
||||
|
||||
# If we've the left the `set()` empty, remove it from the map
|
||||
if existing_state_key_set == set():
|
||||
required_state_map.pop(existing_state_type, None)
|
||||
self.required_state_map.pop(existing_state_type, None)
|
||||
|
||||
# If we're getting a wildcard `state_key`, get rid of any other state_keys
|
||||
# for this `state_type` since the wildcard will cover it already.
|
||||
if state_key == StateValues.WILDCARD:
|
||||
required_state_map[state_type] = {state_key}
|
||||
self.required_state_map[state_type] = {state_key}
|
||||
break
|
||||
# Otherwise, just add it to the set
|
||||
else:
|
||||
if required_state_map.get(state_type) is None:
|
||||
required_state_map[state_type] = {state_key}
|
||||
if self.required_state_map.get(state_type) is None:
|
||||
self.required_state_map[state_type] = {state_key}
|
||||
else:
|
||||
required_state_map[state_type].add(state_key)
|
||||
|
||||
return RoomSyncConfig(timeline_limit, required_state_map)
|
||||
self.required_state_map[state_type].add(state_key)
|
||||
|
||||
def must_await_full_state(
|
||||
self,
|
||||
@@ -694,7 +324,7 @@ class HaveSentRoomFlag(Enum):
|
||||
LIVE = "live"
|
||||
|
||||
|
||||
T = TypeVar("T", str, RoomStreamToken, MultiWriterStreamToken)
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True, slots=True, frozen=True)
|
||||
@@ -753,9 +383,6 @@ class RoomStatusMap(Generic[T]):
|
||||
|
||||
return RoomStatusMap(statuses=dict(self._statuses))
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._statuses)
|
||||
|
||||
|
||||
class MutableRoomStatusMap(RoomStatusMap[T]):
|
||||
"""A mutable version of `RoomStatusMap`"""
|
||||
@@ -812,7 +439,7 @@ class MutableRoomStatusMap(RoomStatusMap[T]):
|
||||
self._statuses[room_id] = HaveSentRoom.previously(from_token)
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True, frozen=True)
|
||||
@attr.s(auto_attribs=True)
|
||||
class PerConnectionState:
|
||||
"""The per-connection state. A snapshot of what we've sent down the
|
||||
connection before.
|
||||
@@ -840,18 +467,14 @@ class PerConnectionState:
|
||||
|
||||
room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict)
|
||||
|
||||
list_to_rooms: Mapping[str, AbstractSet[str]] = attr.Factory(dict)
|
||||
|
||||
def get_mutable(self) -> "MutablePerConnectionState":
|
||||
"""Get a mutable copy of this state."""
|
||||
room_configs = cast(MutableMapping[str, RoomSyncConfig], self.room_configs)
|
||||
list_to_rooms = cast(MutableMapping[str, Set[str]], self.list_to_rooms)
|
||||
|
||||
return MutablePerConnectionState(
|
||||
rooms=self.rooms.get_mutable(),
|
||||
receipts=self.receipts.get_mutable(),
|
||||
room_configs=ChainMap({}, room_configs),
|
||||
list_to_rooms=ChainMap({}, list_to_rooms),
|
||||
)
|
||||
|
||||
def copy(self) -> "PerConnectionState":
|
||||
@@ -859,15 +482,6 @@ class PerConnectionState:
|
||||
rooms=self.rooms.copy(),
|
||||
receipts=self.receipts.copy(),
|
||||
room_configs=dict(self.room_configs),
|
||||
list_to_rooms=dict(self.list_to_rooms),
|
||||
)
|
||||
|
||||
def __len__(self) -> int:
|
||||
return (
|
||||
len(self.rooms)
|
||||
+ len(self.receipts)
|
||||
+ len(self.room_configs)
|
||||
+ len(self.list_to_rooms)
|
||||
)
|
||||
|
||||
|
||||
@@ -880,20 +494,13 @@ class MutablePerConnectionState(PerConnectionState):
|
||||
|
||||
room_configs: typing.ChainMap[str, RoomSyncConfig]
|
||||
|
||||
list_to_rooms: typing.ChainMap[str, Set[str]]
|
||||
|
||||
def has_updates(self) -> bool:
|
||||
return (
|
||||
bool(self.rooms.get_updates())
|
||||
or bool(self.receipts.get_updates())
|
||||
or bool(self.get_room_config_updates())
|
||||
or bool(self.list_to_rooms.maps[0])
|
||||
)
|
||||
|
||||
def get_room_config_updates(self) -> Mapping[str, RoomSyncConfig]:
|
||||
"""Get updates to the room sync config"""
|
||||
return self.room_configs.maps[0]
|
||||
|
||||
def get_list_to_rooms_updates(self) -> Mapping[str, StrCollection]:
|
||||
"""Get updates to the `list_to_rooms`"""
|
||||
return self.list_to_rooms.maps[0]
|
||||
@@ -464,6 +464,8 @@ class MatrixFederationHttpClient:
|
||||
self.max_long_retries = hs.config.federation.max_long_retries
|
||||
self.max_short_retries = hs.config.federation.max_short_retries
|
||||
|
||||
self.max_download_size = hs.config.media.max_upload_size
|
||||
|
||||
self._cooperator = Cooperator(scheduler=_make_scheduler(self.reactor))
|
||||
|
||||
self._sleeper = AwakenableSleeper(self.reactor)
|
||||
@@ -1756,8 +1758,10 @@ class MatrixFederationHttpClient:
|
||||
request.destination,
|
||||
str_url,
|
||||
)
|
||||
# We don't know how large the response will be upfront, so limit it to
|
||||
# the `max_upload_size` config value.
|
||||
length, headers, _, _ = await self._simple_http_client.get_file(
|
||||
str_url, output_stream, expected_size
|
||||
str_url, output_stream, self.max_download_size
|
||||
)
|
||||
|
||||
logger.info(
|
||||
|
||||
@@ -502,15 +502,8 @@ class EventsPersistenceStorageController:
|
||||
"""
|
||||
state = await self._calculate_current_state(room_id)
|
||||
delta = await self._calculate_state_delta(room_id, state)
|
||||
sliding_sync_table_changes = (
|
||||
await self.persist_events_store._calculate_sliding_sync_table_changes(
|
||||
room_id, [], delta
|
||||
)
|
||||
)
|
||||
|
||||
await self.persist_events_store.update_current_state(
|
||||
room_id, delta, sliding_sync_table_changes
|
||||
)
|
||||
await self.persist_events_store.update_current_state(room_id, delta)
|
||||
|
||||
async def _calculate_current_state(self, room_id: str) -> StateMap[str]:
|
||||
"""Calculate the current state of a room, based on the forward extremities
|
||||
|
||||
@@ -35,7 +35,6 @@ from typing import (
|
||||
Iterable,
|
||||
Iterator,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Tuple,
|
||||
@@ -65,7 +64,6 @@ from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage.background_updates import BackgroundUpdater
|
||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
||||
from synapse.storage.types import Connection, Cursor, SQLQueryParameters
|
||||
from synapse.types import StrCollection
|
||||
from synapse.util.async_helpers import delay_cancellation
|
||||
from synapse.util.iterutils import batch_iter
|
||||
|
||||
@@ -1097,50 +1095,6 @@ class DatabasePool:
|
||||
|
||||
txn.execute(sql, vals)
|
||||
|
||||
@staticmethod
|
||||
def simple_insert_returning_txn(
|
||||
txn: LoggingTransaction,
|
||||
table: str,
|
||||
values: Dict[str, Any],
|
||||
returning: StrCollection,
|
||||
) -> Tuple[Any, ...]:
|
||||
"""Executes a `INSERT INTO... RETURNING...` statement (or equivalent for
|
||||
SQLite versions that don't support it).
|
||||
"""
|
||||
|
||||
if txn.database_engine.supports_returning:
|
||||
keys, vals = zip(*values.items())
|
||||
|
||||
sql = "INSERT INTO %s (%s) VALUES(%s) RETURNING %s" % (
|
||||
table,
|
||||
", ".join(k for k in keys),
|
||||
", ".join("?" for _ in keys),
|
||||
", ".join(k for k in returning),
|
||||
)
|
||||
|
||||
txn.execute(sql, vals)
|
||||
row = txn.fetchone()
|
||||
assert row is not None
|
||||
return row
|
||||
else:
|
||||
# For old versions of SQLite we do a standard insert and then can
|
||||
# use `last_insert_rowid` to get at the row we just inserted
|
||||
DatabasePool.simple_insert_txn(
|
||||
txn,
|
||||
table=table,
|
||||
values=values,
|
||||
)
|
||||
txn.execute("SELECT last_insert_rowid()")
|
||||
row = txn.fetchone()
|
||||
assert row is not None
|
||||
(rowid,) = row
|
||||
|
||||
row = DatabasePool.simple_select_one_txn(
|
||||
txn, table=table, keyvalues={"rowid": rowid}, retcols=returning
|
||||
)
|
||||
assert row is not None
|
||||
return row
|
||||
|
||||
async def simple_insert_many(
|
||||
self,
|
||||
table: str,
|
||||
@@ -1300,9 +1254,9 @@ class DatabasePool:
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
table: str,
|
||||
keyvalues: Mapping[str, Any],
|
||||
values: Mapping[str, Any],
|
||||
insertion_values: Optional[Mapping[str, Any]] = None,
|
||||
keyvalues: Dict[str, Any],
|
||||
values: Dict[str, Any],
|
||||
insertion_values: Optional[Dict[str, Any]] = None,
|
||||
where_clause: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""
|
||||
@@ -1345,9 +1299,9 @@ class DatabasePool:
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
table: str,
|
||||
keyvalues: Mapping[str, Any],
|
||||
values: Mapping[str, Any],
|
||||
insertion_values: Optional[Mapping[str, Any]] = None,
|
||||
keyvalues: Dict[str, Any],
|
||||
values: Dict[str, Any],
|
||||
insertion_values: Optional[Dict[str, Any]] = None,
|
||||
where_clause: Optional[str] = None,
|
||||
lock: bool = True,
|
||||
) -> bool:
|
||||
@@ -1368,7 +1322,7 @@ class DatabasePool:
|
||||
|
||||
if lock:
|
||||
# We need to lock the table :(
|
||||
txn.database_engine.lock_table(txn, table)
|
||||
self.engine.lock_table(txn, table)
|
||||
|
||||
def _getwhere(key: str) -> str:
|
||||
# If the value we're passing in is None (aka NULL), we need to use
|
||||
@@ -1422,13 +1376,13 @@ class DatabasePool:
|
||||
# successfully inserted
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def simple_upsert_txn_native_upsert(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
table: str,
|
||||
keyvalues: Mapping[str, Any],
|
||||
values: Mapping[str, Any],
|
||||
insertion_values: Optional[Mapping[str, Any]] = None,
|
||||
keyvalues: Dict[str, Any],
|
||||
values: Dict[str, Any],
|
||||
insertion_values: Optional[Dict[str, Any]] = None,
|
||||
where_clause: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""
|
||||
@@ -1581,8 +1535,8 @@ class DatabasePool:
|
||||
|
||||
self.simple_upsert_txn_emulated(txn, table, _keys, _vals, lock=False)
|
||||
|
||||
@staticmethod
|
||||
def simple_upsert_many_txn_native_upsert(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
table: str,
|
||||
key_names: Collection[str],
|
||||
@@ -2012,8 +1966,8 @@ class DatabasePool:
|
||||
def simple_update_txn(
|
||||
txn: LoggingTransaction,
|
||||
table: str,
|
||||
keyvalues: Mapping[str, Any],
|
||||
updatevalues: Mapping[str, Any],
|
||||
keyvalues: Dict[str, Any],
|
||||
updatevalues: Dict[str, Any],
|
||||
) -> int:
|
||||
"""
|
||||
Update rows in the given database table.
|
||||
|
||||
@@ -33,7 +33,6 @@ from synapse.storage.database import (
|
||||
LoggingDatabaseConnection,
|
||||
LoggingTransaction,
|
||||
)
|
||||
from synapse.storage.databases.main.sliding_sync import SlidingSyncStore
|
||||
from synapse.storage.databases.main.stats import UserSortOrder
|
||||
from synapse.storage.engines import BaseDatabaseEngine
|
||||
from synapse.storage.types import Cursor
|
||||
@@ -157,7 +156,6 @@ class DataStore(
|
||||
LockStore,
|
||||
SessionStore,
|
||||
TaskSchedulerWorkerStore,
|
||||
SlidingSyncStore,
|
||||
):
|
||||
def __init__(
|
||||
self,
|
||||
|
||||
@@ -313,8 +313,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
"get_unread_event_push_actions_by_room_for_user", (room_id,)
|
||||
)
|
||||
|
||||
self._attempt_to_invalidate_cache("_get_max_event_pos", (room_id,))
|
||||
|
||||
# The `_get_membership_from_event_id` is immutable, except for the
|
||||
# case where we look up an event *before* persisting it.
|
||||
self._attempt_to_invalidate_cache("_get_membership_from_event_id", (event_id,))
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@@ -457,8 +457,6 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
) -> Optional[EventBase]:
|
||||
"""Get an event from the database by event_id.
|
||||
|
||||
Events for unknown room versions will also be filtered out.
|
||||
|
||||
Args:
|
||||
event_id: The event_id of the event to fetch
|
||||
|
||||
@@ -513,10 +511,6 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
) -> Dict[str, EventBase]:
|
||||
"""Get events from the database
|
||||
|
||||
Unknown events will be omitted from the response.
|
||||
|
||||
Events for unknown room versions will also be filtered out.
|
||||
|
||||
Args:
|
||||
event_ids: The event_ids of the events to fetch
|
||||
|
||||
@@ -559,8 +553,6 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
Unknown events will be omitted from the response.
|
||||
|
||||
Events for unknown room versions will also be filtered out.
|
||||
|
||||
Args:
|
||||
event_ids: The event_ids of the events to fetch
|
||||
|
||||
|
||||
@@ -454,10 +454,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||
# so must be deleted first.
|
||||
"local_current_membership",
|
||||
"room_memberships",
|
||||
# Note: the sliding_sync_ tables have foreign keys to the `events` table
|
||||
# so must be deleted first.
|
||||
"sliding_sync_joined_rooms",
|
||||
"sliding_sync_membership_snapshots",
|
||||
"events",
|
||||
"federation_inbound_events_staging",
|
||||
"receipts_graph",
|
||||
|
||||
@@ -1337,12 +1337,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
keyvalues={"user_id": user_id, "room_id": room_id},
|
||||
updatevalues={"forgotten": 1},
|
||||
)
|
||||
self.db_pool.simple_update_txn(
|
||||
txn,
|
||||
table="sliding_sync_membership_snapshots",
|
||||
keyvalues={"user_id": user_id, "room_id": room_id},
|
||||
updatevalues={"forgotten": 1},
|
||||
)
|
||||
|
||||
self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
|
||||
self._invalidate_cache_and_stream(
|
||||
|
||||
@@ -1,506 +0,0 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2023 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
|
||||
|
||||
from typing import TYPE_CHECKING, AbstractSet, Dict, List, Mapping, Optional, Set, cast
|
||||
|
||||
import attr
|
||||
|
||||
from synapse.api.errors import SlidingSyncUnknownPosition
|
||||
from synapse.logging.opentracing import log_kv
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage.database import LoggingTransaction
|
||||
from synapse.types import MultiWriterStreamToken, RoomStreamToken
|
||||
from synapse.types.handlers.sliding_sync import (
|
||||
HaveSentRoom,
|
||||
HaveSentRoomFlag,
|
||||
MutablePerConnectionState,
|
||||
PerConnectionState,
|
||||
RoomStatusMap,
|
||||
RoomSyncConfig,
|
||||
)
|
||||
from synapse.util import json_encoder
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.storage.databases.main import DataStore
|
||||
|
||||
|
||||
class SlidingSyncStore(SQLBaseStore):
|
||||
async def persist_per_connection_state(
|
||||
self,
|
||||
user_id: str,
|
||||
device_id: str,
|
||||
conn_id: str,
|
||||
previous_connection_position: Optional[int],
|
||||
per_connection_state: "MutablePerConnectionState",
|
||||
) -> int:
|
||||
"""Persist updates to the per-connection state for a sliding sync
|
||||
connection.
|
||||
|
||||
Returns:
|
||||
The connection position of the newly persisted state.
|
||||
"""
|
||||
|
||||
store = cast("DataStore", self)
|
||||
return await self.db_pool.runInteraction(
|
||||
"persist_per_connection_state",
|
||||
self.persist_per_connection_state_txn,
|
||||
user_id=user_id,
|
||||
device_id=device_id,
|
||||
conn_id=conn_id,
|
||||
previous_connection_position=previous_connection_position,
|
||||
per_connection_state=await PerConnectionStateDB.from_state(
|
||||
per_connection_state, store
|
||||
),
|
||||
)
|
||||
|
||||
def persist_per_connection_state_txn(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
user_id: str,
|
||||
device_id: str,
|
||||
conn_id: str,
|
||||
previous_connection_position: Optional[int],
|
||||
per_connection_state: "PerConnectionStateDB",
|
||||
) -> int:
|
||||
# First we fetch the (or create) the connection key associated with the
|
||||
# previous connection position.
|
||||
if previous_connection_position is not None:
|
||||
# The `previous_connection_position` is a user-supplied value, so we
|
||||
# need to make sure that the one they supplied is actually theirs.
|
||||
sql = """
|
||||
SELECT connection_key
|
||||
FROM sliding_sync_connection_positions
|
||||
INNER JOIN sliding_sync_connections USING (connection_key)
|
||||
WHERE
|
||||
connection_position = ?
|
||||
AND user_id = ? AND device_id = ? AND conn_id = ?
|
||||
"""
|
||||
txn.execute(
|
||||
sql, (previous_connection_position, user_id, device_id, conn_id)
|
||||
)
|
||||
row = txn.fetchone()
|
||||
if row is None:
|
||||
raise SlidingSyncUnknownPosition()
|
||||
|
||||
(connection_key,) = row
|
||||
else:
|
||||
# We're restarting the connection, so we clear all existing
|
||||
# connections. We do this here to ensure that if we get lots of
|
||||
# one-shot requests we don't stack up lots of entries.
|
||||
self.db_pool.simple_delete_txn(
|
||||
txn,
|
||||
table="sliding_sync_connections",
|
||||
keyvalues={
|
||||
"user_id": user_id,
|
||||
"device_id": device_id,
|
||||
"conn_id": conn_id,
|
||||
},
|
||||
)
|
||||
|
||||
(connection_key,) = self.db_pool.simple_insert_returning_txn(
|
||||
txn,
|
||||
table="sliding_sync_connections",
|
||||
values={
|
||||
"user_id": user_id,
|
||||
"device_id": device_id,
|
||||
"conn_id": conn_id,
|
||||
"created_ts": self._clock.time_msec(),
|
||||
},
|
||||
returning=("connection_key",),
|
||||
)
|
||||
|
||||
# Define a new connection position for the updates
|
||||
(connection_position,) = self.db_pool.simple_insert_returning_txn(
|
||||
txn,
|
||||
table="sliding_sync_connection_positions",
|
||||
values={
|
||||
"connection_key": connection_key,
|
||||
"created_ts": self._clock.time_msec(),
|
||||
},
|
||||
returning=("connection_position",),
|
||||
)
|
||||
|
||||
# We need to deduplicate the `required_state` JSON. We do this by
|
||||
# fetching all JSON associated with the connection and comparing that
|
||||
# with the updates to `required_state`
|
||||
|
||||
# Dict from required state json -> required state ID
|
||||
required_state_to_id: Dict[str, int] = {}
|
||||
if previous_connection_position is not None:
|
||||
rows = self.db_pool.simple_select_list_txn(
|
||||
txn,
|
||||
table="sliding_sync_connection_required_state",
|
||||
keyvalues={"connection_key": connection_key},
|
||||
retcols=("required_state_id", "required_state"),
|
||||
)
|
||||
for required_state_id, required_state in rows:
|
||||
required_state_to_id[required_state] = required_state_id
|
||||
|
||||
room_to_state_ids: Dict[str, int] = {}
|
||||
unique_required_state: Dict[str, List[str]] = {}
|
||||
for room_id, room_state in per_connection_state.room_configs.items():
|
||||
serialized_state = json_encoder.encode(
|
||||
# We store the required state as a sorted list of event type /
|
||||
# state key tuples.
|
||||
sorted(
|
||||
(event_type, state_key)
|
||||
for event_type, state_keys in room_state.required_state_map.items()
|
||||
for state_key in state_keys
|
||||
)
|
||||
)
|
||||
|
||||
existing_state_id = required_state_to_id.get(serialized_state)
|
||||
if existing_state_id is not None:
|
||||
room_to_state_ids[room_id] = existing_state_id
|
||||
else:
|
||||
unique_required_state.setdefault(serialized_state, []).append(room_id)
|
||||
|
||||
# Insert any new `required_state` json we haven't previously seen.
|
||||
for serialized_required_state, room_ids in unique_required_state.items():
|
||||
(required_state_id,) = self.db_pool.simple_insert_returning_txn(
|
||||
txn,
|
||||
table="sliding_sync_connection_required_state",
|
||||
values={
|
||||
"connection_key": connection_key,
|
||||
"required_state": serialized_required_state,
|
||||
},
|
||||
returning=("required_state_id",),
|
||||
)
|
||||
for room_id in room_ids:
|
||||
room_to_state_ids[room_id] = required_state_id
|
||||
|
||||
# Copy over state from the previous connection position (we'll overwrite
|
||||
# these rows with any changes).
|
||||
if previous_connection_position is not None:
|
||||
sql = """
|
||||
INSERT INTO sliding_sync_connection_streams
|
||||
(connection_position, stream, room_id, room_status, last_position)
|
||||
SELECT ?, stream, room_id, room_status, last_position
|
||||
FROM sliding_sync_connection_streams
|
||||
WHERE connection_position = ?
|
||||
"""
|
||||
txn.execute(sql, (connection_position, previous_connection_position))
|
||||
|
||||
sql = """
|
||||
INSERT INTO sliding_sync_connection_room_configs
|
||||
(connection_position, room_id, timeline_limit, required_state_id)
|
||||
SELECT ?, room_id, timeline_limit, required_state_id
|
||||
FROM sliding_sync_connection_room_configs
|
||||
WHERE connection_position = ?
|
||||
"""
|
||||
txn.execute(sql, (connection_position, previous_connection_position))
|
||||
|
||||
# We now upsert the changes to the various streams.
|
||||
key_values = []
|
||||
value_values = []
|
||||
for room_id, have_sent_room in per_connection_state.rooms._statuses.items():
|
||||
key_values.append((connection_position, "rooms", room_id))
|
||||
value_values.append(
|
||||
(have_sent_room.status.value, have_sent_room.last_token)
|
||||
)
|
||||
|
||||
for room_id, have_sent_room in per_connection_state.receipts._statuses.items():
|
||||
key_values.append((connection_position, "receipts", room_id))
|
||||
value_values.append(
|
||||
(have_sent_room.status.value, have_sent_room.last_token)
|
||||
)
|
||||
|
||||
self.db_pool.simple_upsert_many_txn(
|
||||
txn,
|
||||
table="sliding_sync_connection_streams",
|
||||
key_names=(
|
||||
"connection_position",
|
||||
"stream",
|
||||
"room_id",
|
||||
),
|
||||
key_values=key_values,
|
||||
value_names=(
|
||||
"room_status",
|
||||
"last_position",
|
||||
),
|
||||
value_values=value_values,
|
||||
)
|
||||
|
||||
# ... and upsert changes to the room configs.
|
||||
keys = []
|
||||
values = []
|
||||
for room_id, room_config in per_connection_state.room_configs.items():
|
||||
keys.append((connection_position, room_id))
|
||||
values.append((room_config.timeline_limit, room_to_state_ids[room_id]))
|
||||
|
||||
self.db_pool.simple_upsert_many_txn(
|
||||
txn,
|
||||
table="sliding_sync_connection_room_configs",
|
||||
key_names=(
|
||||
"connection_position",
|
||||
"room_id",
|
||||
),
|
||||
key_values=keys,
|
||||
value_names=(
|
||||
"timeline_limit",
|
||||
"required_state_id",
|
||||
),
|
||||
value_values=values,
|
||||
)
|
||||
|
||||
# Persist changes to the room lists
|
||||
for list_name, list_room_ids in per_connection_state.list_to_rooms.items():
|
||||
self.db_pool.simple_delete_txn(
|
||||
txn,
|
||||
table="sliding_sync_connection_room_lists",
|
||||
keyvalues={"connection_key": connection_key, "list_name": list_name},
|
||||
)
|
||||
self.db_pool.simple_insert_many_txn(
|
||||
txn,
|
||||
table="sliding_sync_connection_room_lists",
|
||||
keys=("connection_key", "list_name", "room_id"),
|
||||
values=[
|
||||
(connection_key, list_name, room_id) for room_id in list_room_ids
|
||||
],
|
||||
)
|
||||
|
||||
return connection_position
|
||||
|
||||
@cached(iterable=True, max_entries=100000)
|
||||
async def get_per_connection_state(
|
||||
self, user_id: str, device_id: str, conn_id: str, connection_position: int
|
||||
) -> "PerConnectionState":
|
||||
"""Get the per-connection state for the given connection position."""
|
||||
|
||||
per_connection_state_db = await self.db_pool.runInteraction(
|
||||
"get_per_connection_state",
|
||||
self._get_per_connection_state_txn,
|
||||
user_id=user_id,
|
||||
device_id=device_id,
|
||||
conn_id=conn_id,
|
||||
connection_position=connection_position,
|
||||
)
|
||||
store = cast("DataStore", self)
|
||||
return await per_connection_state_db.to_state(store)
|
||||
|
||||
def _get_per_connection_state_txn(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
user_id: str,
|
||||
device_id: str,
|
||||
conn_id: str,
|
||||
connection_position: int,
|
||||
) -> "PerConnectionStateDB":
|
||||
# The `previous_connection_position` is a user-supplied value, so we
|
||||
# need to make sure that the one they supplied is actually theirs.
|
||||
sql = """
|
||||
SELECT connection_key
|
||||
FROM sliding_sync_connection_positions
|
||||
INNER JOIN sliding_sync_connections USING (connection_key)
|
||||
WHERE
|
||||
connection_position = ?
|
||||
AND user_id = ? AND device_id = ? AND conn_id = ?
|
||||
"""
|
||||
txn.execute(sql, (connection_position, user_id, device_id, conn_id))
|
||||
row = txn.fetchone()
|
||||
if row is None:
|
||||
raise SlidingSyncUnknownPosition()
|
||||
|
||||
(connection_key,) = row
|
||||
|
||||
# Now that we have seen the client has received and used the connection
|
||||
# position, we can delete all the other connection positions.
|
||||
sql = """
|
||||
DELETE FROM sliding_sync_connection_positions
|
||||
WHERE connection_key = ? AND connection_position != ?
|
||||
"""
|
||||
txn.execute(sql, (connection_key, connection_position))
|
||||
|
||||
# Fetch and create a mapping from required state ID to the actual
|
||||
# required state for the connection.
|
||||
rows = self.db_pool.simple_select_list_txn(
|
||||
txn,
|
||||
table="sliding_sync_connection_required_state",
|
||||
keyvalues={"connection_key": connection_key},
|
||||
retcols=(
|
||||
"required_state_id",
|
||||
"required_state",
|
||||
),
|
||||
)
|
||||
|
||||
required_state_map: Dict[int, Dict[str, Set[str]]] = {}
|
||||
for row in rows:
|
||||
state = required_state_map[row[0]] = {}
|
||||
for event_type, state_keys in db_to_json(row[1]):
|
||||
state[event_type] = set(state_keys)
|
||||
|
||||
# Get all the room configs, looking up the required state from the map
|
||||
# above.
|
||||
room_config_rows = self.db_pool.simple_select_list_txn(
|
||||
txn,
|
||||
table="sliding_sync_connection_room_configs",
|
||||
keyvalues={"connection_position": connection_position},
|
||||
retcols=(
|
||||
"room_id",
|
||||
"timeline_limit",
|
||||
"required_state_id",
|
||||
),
|
||||
)
|
||||
|
||||
room_configs: Dict[str, RoomSyncConfig] = {}
|
||||
for (
|
||||
room_id,
|
||||
timeline_limit,
|
||||
required_state_id,
|
||||
) in room_config_rows:
|
||||
room_configs[room_id] = RoomSyncConfig(
|
||||
timeline_limit=timeline_limit,
|
||||
required_state_map=required_state_map[required_state_id],
|
||||
)
|
||||
|
||||
# Now look up the per-room stream data.
|
||||
rooms: Dict[str, HaveSentRoom[str]] = {}
|
||||
receipts: Dict[str, HaveSentRoom[str]] = {}
|
||||
|
||||
receipt_rows = self.db_pool.simple_select_list_txn(
|
||||
txn,
|
||||
table="sliding_sync_connection_streams",
|
||||
keyvalues={"connection_position": connection_position},
|
||||
retcols=(
|
||||
"stream",
|
||||
"room_id",
|
||||
"room_status",
|
||||
"last_position",
|
||||
),
|
||||
)
|
||||
for stream, room_id, room_status, last_position in receipt_rows:
|
||||
have_sent_room: HaveSentRoom[str] = HaveSentRoom(
|
||||
status=HaveSentRoomFlag(room_status), last_token=last_position
|
||||
)
|
||||
if stream == "rooms":
|
||||
rooms[room_id] = have_sent_room
|
||||
elif stream == "receipts":
|
||||
receipts[room_id] = have_sent_room
|
||||
|
||||
# Fetch any stored lists for the connection
|
||||
rows = self.db_pool.simple_select_list_txn(
|
||||
txn,
|
||||
table="sliding_sync_connection_room_lists",
|
||||
keyvalues={
|
||||
connection_key: connection_key,
|
||||
},
|
||||
retcols=("list_name", "room_id"),
|
||||
)
|
||||
list_to_rooms: Dict[str, Set[str]] = {}
|
||||
for list_name, room_id in rows:
|
||||
list_to_rooms.setdefault(list_name, set()).add(room_id)
|
||||
|
||||
return PerConnectionStateDB(
|
||||
rooms=RoomStatusMap(rooms),
|
||||
receipts=RoomStatusMap(receipts),
|
||||
room_configs=room_configs,
|
||||
list_to_rooms=list_to_rooms,
|
||||
)
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True, frozen=True)
|
||||
class PerConnectionStateDB:
|
||||
"""An equivalent to `PerConnectionState` that holds data in a format stored
|
||||
in the DB.
|
||||
|
||||
The principle difference is that the tokens for the different streams are
|
||||
serialized to strings.
|
||||
|
||||
When persisting this *only* contains updates to the state.
|
||||
"""
|
||||
|
||||
rooms: "RoomStatusMap[str]"
|
||||
receipts: "RoomStatusMap[str]"
|
||||
|
||||
room_configs: Mapping[str, "RoomSyncConfig"]
|
||||
list_to_rooms: Mapping[str, AbstractSet[str]]
|
||||
|
||||
@staticmethod
|
||||
async def from_state(
|
||||
per_connection_state: "MutablePerConnectionState", store: "DataStore"
|
||||
) -> "PerConnectionStateDB":
|
||||
"""Convert from a standard `PerConnectionState`"""
|
||||
rooms = {
|
||||
room_id: HaveSentRoom(
|
||||
status=status.status,
|
||||
last_token=(
|
||||
await status.last_token.to_string(store)
|
||||
if status.last_token is not None
|
||||
else None
|
||||
),
|
||||
)
|
||||
for room_id, status in per_connection_state.rooms.get_updates().items()
|
||||
}
|
||||
|
||||
receipts = {
|
||||
room_id: HaveSentRoom(
|
||||
status=status.status,
|
||||
last_token=(
|
||||
await status.last_token.to_string(store)
|
||||
if status.last_token is not None
|
||||
else None
|
||||
),
|
||||
)
|
||||
for room_id, status in per_connection_state.receipts.get_updates().items()
|
||||
}
|
||||
|
||||
log_kv(
|
||||
{
|
||||
"rooms": rooms,
|
||||
"receipts": receipts,
|
||||
"room_configs": per_connection_state.room_configs.maps[0],
|
||||
}
|
||||
)
|
||||
|
||||
return PerConnectionStateDB(
|
||||
rooms=RoomStatusMap(rooms),
|
||||
receipts=RoomStatusMap(receipts),
|
||||
room_configs=per_connection_state.room_configs.maps[0],
|
||||
list_to_rooms=per_connection_state.list_to_rooms.maps[0],
|
||||
)
|
||||
|
||||
async def to_state(self, store: "DataStore") -> "PerConnectionState":
|
||||
"""Convert into a standard `PerConnectionState`"""
|
||||
rooms = {
|
||||
room_id: HaveSentRoom(
|
||||
status=status.status,
|
||||
last_token=(
|
||||
await RoomStreamToken.parse(store, status.last_token)
|
||||
if status.last_token is not None
|
||||
else None
|
||||
),
|
||||
)
|
||||
for room_id, status in self.rooms._statuses.items()
|
||||
}
|
||||
|
||||
receipts = {
|
||||
room_id: HaveSentRoom(
|
||||
status=status.status,
|
||||
last_token=(
|
||||
await MultiWriterStreamToken.parse(store, status.last_token)
|
||||
if status.last_token is not None
|
||||
else None
|
||||
),
|
||||
)
|
||||
for room_id, status in self.receipts._statuses.items()
|
||||
}
|
||||
|
||||
return PerConnectionState(
|
||||
rooms=RoomStatusMap(rooms),
|
||||
receipts=RoomStatusMap(receipts),
|
||||
room_configs=self.room_configs,
|
||||
list_to_rooms=self.list_to_rooms,
|
||||
)
|
||||
@@ -161,80 +161,45 @@ class StateDeltasStore(SQLBaseStore):
|
||||
self._get_max_stream_id_in_current_state_deltas_txn,
|
||||
)
|
||||
|
||||
def get_current_state_deltas_for_room_txn(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
room_id: str,
|
||||
*,
|
||||
from_token: Optional[RoomStreamToken],
|
||||
to_token: Optional[RoomStreamToken],
|
||||
) -> List[StateDelta]:
|
||||
"""
|
||||
Get the state deltas between two tokens.
|
||||
|
||||
(> `from_token` and <= `to_token`)
|
||||
"""
|
||||
from_clause = ""
|
||||
from_args = []
|
||||
if from_token is not None:
|
||||
from_clause = "AND ? < stream_id"
|
||||
from_args = [from_token.stream]
|
||||
|
||||
to_clause = ""
|
||||
to_args = []
|
||||
if to_token is not None:
|
||||
to_clause = "AND stream_id <= ?"
|
||||
to_args = [to_token.get_max_stream_pos()]
|
||||
|
||||
sql = f"""
|
||||
SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
|
||||
FROM current_state_delta_stream
|
||||
WHERE room_id = ? {from_clause} {to_clause}
|
||||
ORDER BY stream_id ASC
|
||||
"""
|
||||
txn.execute(sql, [room_id] + from_args + to_args)
|
||||
|
||||
return [
|
||||
StateDelta(
|
||||
stream_id=row[1],
|
||||
room_id=room_id,
|
||||
event_type=row[2],
|
||||
state_key=row[3],
|
||||
event_id=row[4],
|
||||
prev_event_id=row[5],
|
||||
)
|
||||
for row in txn
|
||||
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
|
||||
]
|
||||
|
||||
@trace
|
||||
async def get_current_state_deltas_for_room(
|
||||
self,
|
||||
room_id: str,
|
||||
*,
|
||||
from_token: Optional[RoomStreamToken],
|
||||
to_token: Optional[RoomStreamToken],
|
||||
self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
|
||||
) -> List[StateDelta]:
|
||||
"""
|
||||
Get the state deltas between two tokens.
|
||||
"""Get the state deltas between two tokens."""
|
||||
|
||||
(> `from_token` and <= `to_token`)
|
||||
"""
|
||||
|
||||
if (
|
||||
from_token is not None
|
||||
and not self._curr_state_delta_stream_cache.has_entity_changed(
|
||||
room_id, from_token.stream
|
||||
)
|
||||
if not self._curr_state_delta_stream_cache.has_entity_changed(
|
||||
room_id, from_token.stream
|
||||
):
|
||||
return []
|
||||
|
||||
def get_current_state_deltas_for_room_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> List[StateDelta]:
|
||||
sql = """
|
||||
SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
|
||||
FROM current_state_delta_stream
|
||||
WHERE room_id = ? AND ? < stream_id AND stream_id <= ?
|
||||
ORDER BY stream_id ASC
|
||||
"""
|
||||
txn.execute(
|
||||
sql, (room_id, from_token.stream, to_token.get_max_stream_pos())
|
||||
)
|
||||
|
||||
return [
|
||||
StateDelta(
|
||||
stream_id=row[1],
|
||||
room_id=room_id,
|
||||
event_type=row[2],
|
||||
state_key=row[3],
|
||||
event_id=row[4],
|
||||
prev_event_id=row[5],
|
||||
)
|
||||
for row in txn
|
||||
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
|
||||
]
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_current_state_deltas_for_room",
|
||||
self.get_current_state_deltas_for_room_txn,
|
||||
room_id,
|
||||
from_token=from_token,
|
||||
to_token=to_token,
|
||||
"get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
|
||||
)
|
||||
|
||||
@trace
|
||||
|
||||
@@ -50,7 +50,6 @@ from typing import (
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Protocol,
|
||||
Set,
|
||||
@@ -81,7 +80,7 @@ from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
||||
from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
from synapse.util.caches.descriptors import cached
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
from synapse.util.cancellation import cancellable
|
||||
from synapse.util.iterutils import batch_iter
|
||||
@@ -1264,76 +1263,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
|
||||
return None
|
||||
|
||||
async def get_last_event_pos_in_room(
|
||||
self,
|
||||
room_id: str,
|
||||
event_types: Optional[StrCollection] = None,
|
||||
) -> Optional[Tuple[str, PersistedEventPosition]]:
|
||||
"""
|
||||
Returns the ID and event position of the last event in a room.
|
||||
|
||||
Based on `get_last_event_pos_in_room_before_stream_ordering(...)`
|
||||
|
||||
Args:
|
||||
room_id
|
||||
event_types: Optional allowlist of event types to filter by
|
||||
|
||||
Returns:
|
||||
The ID of the most recent event and it's position, or None if there are no
|
||||
events in the room that match the given event types.
|
||||
"""
|
||||
|
||||
def _get_last_event_pos_in_room_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Optional[Tuple[str, PersistedEventPosition]]:
|
||||
event_type_clause = ""
|
||||
event_type_args: List[str] = []
|
||||
if event_types is not None and len(event_types) > 0:
|
||||
event_type_clause, event_type_args = make_in_list_sql_clause(
|
||||
txn.database_engine, "type", event_types
|
||||
)
|
||||
event_type_clause = f"AND {event_type_clause}"
|
||||
|
||||
sql = f"""
|
||||
SELECT event_id, stream_ordering, instance_name
|
||||
FROM events
|
||||
LEFT JOIN rejections USING (event_id)
|
||||
WHERE room_id = ?
|
||||
{event_type_clause}
|
||||
AND NOT outlier
|
||||
AND rejections.event_id IS NULL
|
||||
ORDER BY stream_ordering DESC
|
||||
LIMIT 1
|
||||
"""
|
||||
|
||||
txn.execute(
|
||||
sql,
|
||||
[room_id] + event_type_args,
|
||||
)
|
||||
|
||||
row = cast(Optional[Tuple[str, int, str]], txn.fetchone())
|
||||
if row is not None:
|
||||
event_id, stream_ordering, instance_name = row
|
||||
|
||||
return event_id, PersistedEventPosition(
|
||||
# If instance_name is null we default to "master"
|
||||
instance_name or "master",
|
||||
stream_ordering,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_last_event_pos_in_room",
|
||||
_get_last_event_pos_in_room_txn,
|
||||
)
|
||||
|
||||
@trace
|
||||
async def get_last_event_pos_in_room_before_stream_ordering(
|
||||
self,
|
||||
room_id: str,
|
||||
end_token: RoomStreamToken,
|
||||
event_types: Optional[StrCollection] = None,
|
||||
event_types: Optional[Collection[str]] = None,
|
||||
) -> Optional[Tuple[str, PersistedEventPosition]]:
|
||||
"""
|
||||
Returns the ID and event position of the last event in a room at or before a
|
||||
@@ -1447,6 +1382,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
"""
|
||||
|
||||
min_token = end_token.stream
|
||||
max_token = end_token.get_max_stream_pos()
|
||||
results: Dict[str, int] = {}
|
||||
|
||||
# First, we check for the rooms in the stream change cache to see if we
|
||||
@@ -1459,76 +1395,26 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
else:
|
||||
missing_room_ids.add(room_id)
|
||||
|
||||
if not missing_room_ids:
|
||||
return results
|
||||
|
||||
# Next, we query the stream position from the DB. At first we fetch all
|
||||
# positions less than the *max* stream pos in the token, then filter
|
||||
# them down. We do this as a) this is a cheaper query, and b) the vast
|
||||
# majority of rooms will have a latest token from before the min stream
|
||||
# pos.
|
||||
|
||||
uncapped_results = await self._bulk_get_max_event_pos(missing_room_ids)
|
||||
|
||||
# Check that the stream position for the rooms are from before the
|
||||
# minimum position of the token. If not then we need to fetch more
|
||||
# rows.
|
||||
recheck_rooms: Set[str] = set()
|
||||
for room_id, stream in uncapped_results.items():
|
||||
if stream <= min_token:
|
||||
results[room_id] = stream
|
||||
else:
|
||||
recheck_rooms.add(room_id)
|
||||
|
||||
if not recheck_rooms:
|
||||
return results
|
||||
|
||||
for room_id in recheck_rooms:
|
||||
result = await self.get_last_event_pos_in_room_before_stream_ordering(
|
||||
room_id, end_token
|
||||
)
|
||||
if result is not None:
|
||||
results[room_id] = result[1].stream
|
||||
|
||||
return results
|
||||
|
||||
@cached()
|
||||
async def _get_max_event_pos(self, room_id: str) -> int:
|
||||
raise NotImplementedError()
|
||||
|
||||
@cachedList(cached_method_name="_get_max_event_pos", list_name="room_ids")
|
||||
async def _bulk_get_max_event_pos(
|
||||
self, room_ids: StrCollection
|
||||
) -> Mapping[str, int]:
|
||||
"""Fetch the max position of a persisted event in the room."""
|
||||
|
||||
now_token = self.get_room_max_token()
|
||||
max_pos = now_token.get_max_stream_pos()
|
||||
|
||||
results: Dict[str, int] = {}
|
||||
missing_room_ids: Set[str] = set()
|
||||
for room_id in room_ids:
|
||||
stream_pos = self._events_stream_cache.get_max_pos_of_last_change(room_id)
|
||||
if stream_pos is not None:
|
||||
results[room_id] = stream_pos
|
||||
else:
|
||||
missing_room_ids.add(room_id)
|
||||
|
||||
if not missing_room_ids:
|
||||
return results
|
||||
|
||||
def bulk_get_max_event_pos_txn(
|
||||
txn: LoggingTransaction, batched_room_ids: StrCollection
|
||||
def bulk_get_last_event_pos_txn(
|
||||
txn: LoggingTransaction, batch_room_ids: StrCollection
|
||||
) -> Dict[str, int]:
|
||||
# This query fetches the latest stream position in the rooms before
|
||||
# the given max position.
|
||||
clause, args = make_in_list_sql_clause(
|
||||
self.database_engine, "room_id", batched_room_ids
|
||||
self.database_engine, "room_id", batch_room_ids
|
||||
)
|
||||
sql = f"""
|
||||
SELECT room_id, (
|
||||
SELECT stream_ordering FROM events AS e
|
||||
LEFT JOIN rejections USING (event_id)
|
||||
WHERE e.room_id = r.room_id
|
||||
AND e.stream_ordering <= ?
|
||||
AND stream_ordering <= ?
|
||||
AND NOT outlier
|
||||
AND rejection_reason IS NULL
|
||||
ORDER BY stream_ordering DESC
|
||||
@@ -1537,26 +1423,72 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
FROM rooms AS r
|
||||
WHERE {clause}
|
||||
"""
|
||||
txn.execute(sql, [max_pos] + args)
|
||||
txn.execute(sql, [max_token] + args)
|
||||
return {row[0]: row[1] for row in txn}
|
||||
|
||||
recheck_rooms: Set[str] = set()
|
||||
for batched in batch_iter(room_ids, 1000):
|
||||
batch_results = await self.db_pool.runInteraction(
|
||||
"_bulk_get_max_event_pos", bulk_get_max_event_pos_txn, batched
|
||||
for batched in batch_iter(missing_room_ids, 1000):
|
||||
result = await self.db_pool.runInteraction(
|
||||
"bulk_get_last_event_pos_in_room_before_stream_ordering",
|
||||
bulk_get_last_event_pos_txn,
|
||||
batched,
|
||||
)
|
||||
for room_id, stream_ordering in batch_results.items():
|
||||
if stream_ordering <= now_token.stream:
|
||||
results.update(batch_results)
|
||||
|
||||
# Check that the stream position for the rooms are from before the
|
||||
# minimum position of the token. If not then we need to fetch more
|
||||
# rows.
|
||||
for room_id, stream in result.items():
|
||||
if stream <= min_token:
|
||||
results[room_id] = stream
|
||||
else:
|
||||
recheck_rooms.add(room_id)
|
||||
|
||||
for room_id in recheck_rooms:
|
||||
result = await self.get_last_event_pos_in_room_before_stream_ordering(
|
||||
room_id, now_token
|
||||
if not recheck_rooms:
|
||||
return results
|
||||
|
||||
# For the remaining rooms we need to fetch all rows between the min and
|
||||
# max stream positions in the end token, and filter out the rows that
|
||||
# are after the end token.
|
||||
#
|
||||
# This query should be fast as the range between the min and max should
|
||||
# be small.
|
||||
|
||||
def bulk_get_last_event_pos_recheck_txn(
|
||||
txn: LoggingTransaction, batch_room_ids: StrCollection
|
||||
) -> Dict[str, int]:
|
||||
clause, args = make_in_list_sql_clause(
|
||||
self.database_engine, "room_id", batch_room_ids
|
||||
)
|
||||
if result is not None:
|
||||
results[room_id] = result[1].stream
|
||||
sql = f"""
|
||||
SELECT room_id, instance_name, stream_ordering
|
||||
FROM events
|
||||
WHERE ? < stream_ordering AND stream_ordering <= ?
|
||||
AND NOT outlier
|
||||
AND rejection_reason IS NULL
|
||||
AND {clause}
|
||||
ORDER BY stream_ordering ASC
|
||||
"""
|
||||
txn.execute(sql, [min_token, max_token] + args)
|
||||
|
||||
# We take the max stream ordering that is less than the token. Since
|
||||
# we ordered by stream ordering we just need to iterate through and
|
||||
# take the last matching stream ordering.
|
||||
txn_results: Dict[str, int] = {}
|
||||
for row in txn:
|
||||
room_id = row[0]
|
||||
event_pos = PersistedEventPosition(row[1], row[2])
|
||||
if not event_pos.persisted_after(end_token):
|
||||
txn_results[room_id] = event_pos.stream
|
||||
|
||||
return txn_results
|
||||
|
||||
for batched in batch_iter(recheck_rooms, 1000):
|
||||
recheck_result = await self.db_pool.runInteraction(
|
||||
"bulk_get_last_event_pos_in_room_before_stream_ordering_recheck",
|
||||
bulk_get_last_event_pos_recheck_txn,
|
||||
batched,
|
||||
)
|
||||
results.update(recheck_result)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
@@ -28,11 +28,6 @@ if TYPE_CHECKING:
|
||||
from synapse.storage.database import LoggingDatabaseConnection
|
||||
|
||||
|
||||
# A string that will be replaced with the appropriate auto increment directive
|
||||
# for the database engine, expands to an auto incrementing integer primary key.
|
||||
AUTO_INCREMENT_PRIMARY_KEYPLACEHOLDER = "$%AUTO_INCREMENT_PRIMARY_KEY%$"
|
||||
|
||||
|
||||
class IsolationLevel(IntEnum):
|
||||
READ_COMMITTED: int = 1
|
||||
REPEATABLE_READ: int = 2
|
||||
|
||||
@@ -25,7 +25,6 @@ from typing import TYPE_CHECKING, Any, Mapping, NoReturn, Optional, Tuple, cast
|
||||
import psycopg2.extensions
|
||||
|
||||
from synapse.storage.engines._base import (
|
||||
AUTO_INCREMENT_PRIMARY_KEYPLACEHOLDER,
|
||||
BaseDatabaseEngine,
|
||||
IncorrectDatabaseSetup,
|
||||
IsolationLevel,
|
||||
@@ -257,10 +256,4 @@ class PostgresEngine(
|
||||
executing the script in its own transaction. The script transaction is
|
||||
left open and it is the responsibility of the caller to commit it.
|
||||
"""
|
||||
# Replace auto increment placeholder with the appropriate directive
|
||||
script = script.replace(
|
||||
AUTO_INCREMENT_PRIMARY_KEYPLACEHOLDER,
|
||||
"BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY",
|
||||
)
|
||||
|
||||
cursor.execute(f"COMMIT; BEGIN TRANSACTION; {script}")
|
||||
|
||||
@@ -25,7 +25,6 @@ import threading
|
||||
from typing import TYPE_CHECKING, Any, List, Mapping, Optional
|
||||
|
||||
from synapse.storage.engines import BaseDatabaseEngine
|
||||
from synapse.storage.engines._base import AUTO_INCREMENT_PRIMARY_KEYPLACEHOLDER
|
||||
from synapse.storage.types import Cursor
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -169,11 +168,6 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]):
|
||||
> first. No other implicit transaction control is performed; any transaction
|
||||
> control must be added to sql_script.
|
||||
"""
|
||||
# Replace auto increment placeholder with the appropriate directive
|
||||
script = script.replace(
|
||||
AUTO_INCREMENT_PRIMARY_KEYPLACEHOLDER, "INTEGER PRIMARY KEY AUTOINCREMENT"
|
||||
)
|
||||
|
||||
# The implementation of `executescript` can be found at
|
||||
# https://github.com/python/cpython/blob/3.11/Modules/_sqlite/cursor.c#L1035.
|
||||
cursor.executescript(f"BEGIN TRANSACTION; {script}")
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
#
|
||||
#
|
||||
|
||||
SCHEMA_VERSION = 87 # remember to update the list below when updating
|
||||
SCHEMA_VERSION = 86 # remember to update the list below when updating
|
||||
"""Represents the expectations made by the codebase about the database schema
|
||||
|
||||
This should be incremented whenever the codebase changes its requirements on the
|
||||
@@ -142,10 +142,6 @@ Changes in SCHEMA_VERSION = 85
|
||||
|
||||
Changes in SCHEMA_VERSION = 86
|
||||
- Add a column `authenticated` to the tables `local_media_repository` and `remote_media_cache`
|
||||
|
||||
Changes in SCHEMA_VERSION = 87
|
||||
- Add tables to store Sliding Sync data for quick filtering/sorting
|
||||
(`sliding_sync_joined_rooms`, `sliding_sync_membership_snapshots`)
|
||||
"""
|
||||
|
||||
|
||||
|
||||
@@ -1,153 +0,0 @@
|
||||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2024 New Vector, Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
-- This table is a list/queue used to keep track of which rooms need to be inserted into
|
||||
-- `sliding_sync_joined_rooms`. We do this to avoid reading from `current_state_events`
|
||||
-- during the background update to populate `sliding_sync_joined_rooms` which works but
|
||||
-- it takes a lot of work for the database to grab `DISTINCT` room_ids given how many
|
||||
-- state events there are for each room.
|
||||
CREATE TABLE IF NOT EXISTS sliding_sync_joined_rooms_to_recalculate(
|
||||
room_id TEXT NOT NULL REFERENCES rooms(room_id),
|
||||
PRIMARY KEY (room_id)
|
||||
);
|
||||
|
||||
-- A table for storing room meta data (current state relevant to sliding sync) that the
|
||||
-- local server is still participating in (someone local is joined to the room).
|
||||
--
|
||||
-- We store the joined rooms in separate table from `sliding_sync_membership_snapshots`
|
||||
-- because we need up-to-date information for joined rooms and it can be shared across
|
||||
-- everyone who is joined.
|
||||
--
|
||||
-- This table is kept in sync with `current_state_events` which means if the server is
|
||||
-- no longer participating in a room, the row will be deleted.
|
||||
CREATE TABLE IF NOT EXISTS sliding_sync_joined_rooms(
|
||||
room_id TEXT NOT NULL REFERENCES rooms(room_id),
|
||||
-- The `stream_ordering` of the most-recent/latest event in the room
|
||||
event_stream_ordering BIGINT NOT NULL REFERENCES events(stream_ordering),
|
||||
-- The `stream_ordering` of the last event according to the `bump_event_types`
|
||||
bump_stamp BIGINT,
|
||||
-- `m.room.create` -> `content.type` (current state)
|
||||
--
|
||||
-- Useful for the `spaces`/`not_spaces` filter in the Sliding Sync API
|
||||
room_type TEXT,
|
||||
-- `m.room.name` -> `content.name` (current state)
|
||||
--
|
||||
-- Useful for the room meta data and `room_name_like` filter in the Sliding Sync API
|
||||
room_name TEXT,
|
||||
-- `m.room.encryption` -> `content.algorithm` (current state)
|
||||
--
|
||||
-- Useful for the `is_encrypted` filter in the Sliding Sync API
|
||||
is_encrypted BOOLEAN DEFAULT FALSE NOT NULL,
|
||||
-- `m.room.tombstone` -> `content.replacement_room` (according to the current state at the
|
||||
-- time of the membership).
|
||||
--
|
||||
-- Useful for the `include_old_rooms` functionality in the Sliding Sync API
|
||||
tombstone_successor_room_id TEXT,
|
||||
PRIMARY KEY (room_id)
|
||||
);
|
||||
|
||||
-- So we can purge rooms easily.
|
||||
--
|
||||
-- The primary key is already `room_id`
|
||||
|
||||
-- So we can sort by `stream_ordering
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_joined_rooms_event_stream_ordering ON sliding_sync_joined_rooms(event_stream_ordering);
|
||||
|
||||
-- A table for storing a snapshot of room meta data (historical current state relevant
|
||||
-- for sliding sync) at the time of a local user's membership. Only has rows for the
|
||||
-- latest membership event for a given local user in a room which matches
|
||||
-- `local_current_membership` .
|
||||
--
|
||||
-- We store all memberships including joins. This makes it easy to reference this table
|
||||
-- to find all membership for a given user and shares the same semantics as
|
||||
-- `local_current_membership`. And we get to avoid some table maintenance; if we only
|
||||
-- stored non-joins, we would have to delete the row for the user when the user joins
|
||||
-- the room. Stripped state doesn't include the `m.room.tombstone` event, so we just
|
||||
-- assume that the room doesn't have a tombstone.
|
||||
--
|
||||
-- For remote invite/knocks where the server is not participating in the room, we will
|
||||
-- use stripped state events to populate this table. We assume that if any stripped
|
||||
-- state is given, it will include all possible stripped state events types. For
|
||||
-- example, if stripped state is given but `m.room.encryption` isn't included, we will
|
||||
-- assume that the room is not encrypted.
|
||||
--
|
||||
-- We don't include `bump_stamp` here because we can just use the `stream_ordering` from
|
||||
-- the membership event itself as the `bump_stamp`.
|
||||
CREATE TABLE IF NOT EXISTS sliding_sync_membership_snapshots(
|
||||
room_id TEXT NOT NULL REFERENCES rooms(room_id),
|
||||
user_id TEXT NOT NULL,
|
||||
-- Useful to be able to tell leaves from kicks (where the `user_id` is different from the `sender`)
|
||||
sender TEXT NOT NULL,
|
||||
membership_event_id TEXT NOT NULL REFERENCES events(event_id),
|
||||
membership TEXT NOT NULL,
|
||||
-- This is an integer just to match `room_memberships` and also means we don't need
|
||||
-- to do any casting.
|
||||
forgotten INTEGER DEFAULT 0 NOT NULL,
|
||||
-- `stream_ordering` of the `membership_event_id`
|
||||
event_stream_ordering BIGINT NOT NULL REFERENCES events(stream_ordering),
|
||||
-- For remote invites/knocks that don't include any stripped state, we want to be
|
||||
-- able to distinguish between a room with `None` as valid value for some state and
|
||||
-- room where the state is completely unknown. Basically, this should be True unless
|
||||
-- no stripped state was provided for a remote invite/knock (False).
|
||||
has_known_state BOOLEAN DEFAULT FALSE NOT NULL,
|
||||
-- `m.room.create` -> `content.type` (according to the current state at the time of
|
||||
-- the membership).
|
||||
--
|
||||
-- Useful for the `spaces`/`not_spaces` filter in the Sliding Sync API
|
||||
room_type TEXT,
|
||||
-- `m.room.name` -> `content.name` (according to the current state at the time of
|
||||
-- the membership).
|
||||
--
|
||||
-- Useful for the room meta data and `room_name_like` filter in the Sliding Sync API
|
||||
room_name TEXT,
|
||||
-- `m.room.encryption` -> `content.algorithm` (according to the current state at the
|
||||
-- time of the membership).
|
||||
--
|
||||
-- Useful for the `is_encrypted` filter in the Sliding Sync API
|
||||
is_encrypted BOOLEAN DEFAULT FALSE NOT NULL,
|
||||
-- `m.room.tombstone` -> `content.replacement_room` (according to the current state at the
|
||||
-- time of the membership).
|
||||
--
|
||||
-- Useful for the `include_old_rooms` functionality in the Sliding Sync API
|
||||
tombstone_successor_room_id TEXT,
|
||||
PRIMARY KEY (room_id, user_id)
|
||||
);
|
||||
|
||||
-- So we can purge rooms easily.
|
||||
--
|
||||
-- Since we're using a multi-column index as the primary key (room_id, user_id), the
|
||||
-- first index column (room_id) is always usable for searching so we don't need to
|
||||
-- create a separate index for it.
|
||||
--
|
||||
-- CREATE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_room_id ON sliding_sync_membership_snapshots(room_id);
|
||||
|
||||
-- So we can fetch all rooms for a given user
|
||||
CREATE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_user_id ON sliding_sync_membership_snapshots(user_id);
|
||||
-- So we can sort by `stream_ordering
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_event_stream_ordering ON sliding_sync_membership_snapshots(event_stream_ordering);
|
||||
|
||||
|
||||
-- Add a series of background updates to populate the new `sliding_sync_joined_rooms` table:
|
||||
--
|
||||
-- 1. Add a background update to prefill `sliding_sync_joined_rooms_to_recalculate`.
|
||||
-- We do a one-shot bulk insert from the `rooms` table to prefill.
|
||||
-- 2. Add a background update to populate the new `sliding_sync_joined_rooms` table
|
||||
--
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||
(8701, 'sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update', '{}');
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
|
||||
(8701, 'sliding_sync_joined_rooms_bg_update', '{}', 'sliding_sync_prefill_joined_rooms_to_recalculate_table_bg_update');
|
||||
|
||||
-- Add a background updates to populate the new `sliding_sync_membership_snapshots` table
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||
(8701, 'sliding_sync_membership_snapshots_bg_update', '{}');
|
||||
@@ -1,78 +0,0 @@
|
||||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2024 New Vector, Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
|
||||
-- Table to track active sliding sync connections.
|
||||
--
|
||||
-- A new connection will be created for every sliding sync request without a
|
||||
-- `since` token for a given `conn_id` for a device.#
|
||||
--
|
||||
-- Once a new connection is created and used we delete all other connections for
|
||||
-- the `conn_id`.
|
||||
CREATE TABLE sliding_sync_connections(
|
||||
connection_key $%AUTO_INCREMENT_PRIMARY_KEY%$,
|
||||
user_id TEXT NOT NULL,
|
||||
device_id TEXT NOT NULL,
|
||||
conn_id TEXT NOT NULL,
|
||||
created_ts BIGINT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX sliding_sync_connections_idx ON sliding_sync_connections(user_id, device_id, conn_id);
|
||||
|
||||
-- We track per-connection state by associating changes to the state with
|
||||
-- connection positions. This ensures that we correctly track state even if we
|
||||
-- see retries of requests.
|
||||
--
|
||||
-- If the client starts a "new" connection (by not specifying a since token),
|
||||
-- we'll clear out the other connections (to ensure that we don't end up with
|
||||
-- lots of connection keys).
|
||||
CREATE TABLE sliding_sync_connection_positions(
|
||||
connection_position $%AUTO_INCREMENT_PRIMARY_KEY%$,
|
||||
connection_key BIGINT NOT NULL REFERENCES sliding_sync_connections(connection_key) ON DELETE CASCADE,
|
||||
created_ts BIGINT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX sliding_sync_connection_positions_key ON sliding_sync_connection_positions(connection_key);
|
||||
|
||||
|
||||
-- To save space we deduplicate the `required_state` json by assigning IDs to
|
||||
-- different values.
|
||||
CREATE TABLE sliding_sync_connection_required_state(
|
||||
required_state_id $%AUTO_INCREMENT_PRIMARY_KEY%$,
|
||||
connection_key BIGINT NOT NULL REFERENCES sliding_sync_connections(connection_key) ON DELETE CASCADE,
|
||||
required_state TEXT NOT NULL -- We store this as a json list of event type / state key tuples.
|
||||
);
|
||||
|
||||
CREATE INDEX sliding_sync_connection_required_state_conn_pos ON sliding_sync_connections(connection_key);
|
||||
|
||||
|
||||
-- Stores the room configs we have seen for rooms in a connection.
|
||||
CREATE TABLE sliding_sync_connection_room_configs(
|
||||
connection_position BIGINT NOT NULL REFERENCES sliding_sync_connection_positions(connection_position) ON DELETE CASCADE,
|
||||
room_id TEXT NOT NULL,
|
||||
timeline_limit BIGINT NOT NULL,
|
||||
required_state_id BIGINT NOT NULL REFERENCES sliding_sync_connection_required_state(required_state_id)
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX sliding_sync_connection_room_configs_idx ON sliding_sync_connection_room_configs(connection_position, room_id);
|
||||
|
||||
-- Stores what data we have sent for given streams down given connections.
|
||||
CREATE TABLE sliding_sync_connection_streams(
|
||||
connection_position BIGINT NOT NULL REFERENCES sliding_sync_connection_positions(connection_position) ON DELETE CASCADE,
|
||||
stream TEXT NOT NULL, -- e.g. "events" or "receipts"
|
||||
room_id TEXT NOT NULL,
|
||||
room_status TEXT NOT NULL, -- "live" or "previously", i.e. the `HaveSentRoomFlag` value
|
||||
last_position TEXT -- For "previously" the token for the stream we have sent up to.
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX sliding_sync_connection_streams_idx ON sliding_sync_connection_streams(connection_position, room_id, stream);
|
||||
@@ -1,22 +0,0 @@
|
||||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2024 New Vector, Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
|
||||
-- Stores the room lists for a connection
|
||||
CREATE TABLE sliding_sync_connection_room_lists(
|
||||
connection_key BIGINT NOT NULL REFERENCES sliding_sync_connections(connection_key) ON DELETE CASCADE,
|
||||
list_name TEXT NOT NULL,
|
||||
room_id TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX sliding_sync_connection_room_lists_idx ON sliding_sync_connection_room_lists(connection_key);
|
||||
@@ -17,9 +17,33 @@
|
||||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
from enum import Enum
|
||||
from typing import TYPE_CHECKING, Dict, Final, List, Mapping, Optional, Sequence, Tuple
|
||||
|
||||
import attr
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
from typing import List, Optional, TypedDict
|
||||
from synapse._pydantic_compat import HAS_PYDANTIC_V2
|
||||
|
||||
if TYPE_CHECKING or HAS_PYDANTIC_V2:
|
||||
from pydantic.v1 import Extra
|
||||
else:
|
||||
from pydantic import Extra
|
||||
|
||||
from synapse.events import EventBase
|
||||
from synapse.types import (
|
||||
DeviceListUpdates,
|
||||
JsonDict,
|
||||
JsonMapping,
|
||||
Requester,
|
||||
SlidingSyncStreamToken,
|
||||
StreamToken,
|
||||
UserID,
|
||||
)
|
||||
from synapse.types.rest.client import SlidingSyncBody
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.handlers.relations import BundledAggregations
|
||||
|
||||
|
||||
class ShutdownRoomParams(TypedDict):
|
||||
@@ -77,3 +101,335 @@ class ShutdownRoomResponse(TypedDict):
|
||||
failed_to_kick_users: List[str]
|
||||
local_aliases: List[str]
|
||||
new_room_id: Optional[str]
|
||||
|
||||
|
||||
class SlidingSyncConfig(SlidingSyncBody):
|
||||
"""
|
||||
Inherit from `SlidingSyncBody` since we need all of the same fields and add a few
|
||||
extra fields that we need in the handler
|
||||
"""
|
||||
|
||||
user: UserID
|
||||
requester: Requester
|
||||
|
||||
# Pydantic config
|
||||
class Config:
|
||||
# By default, ignore fields that we don't recognise.
|
||||
extra = Extra.ignore
|
||||
# By default, don't allow fields to be reassigned after parsing.
|
||||
allow_mutation = False
|
||||
# Allow custom types like `UserID` to be used in the model
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
|
||||
class OperationType(Enum):
|
||||
"""
|
||||
Represents the operation types in a Sliding Sync window.
|
||||
|
||||
Attributes:
|
||||
SYNC: Sets a range of entries. Clients SHOULD discard what they previous knew about
|
||||
entries in this range.
|
||||
INSERT: Sets a single entry. If the position is not empty then clients MUST move
|
||||
entries to the left or the right depending on where the closest empty space is.
|
||||
DELETE: Remove a single entry. Often comes before an INSERT to allow entries to move
|
||||
places.
|
||||
INVALIDATE: Remove a range of entries. Clients MAY persist the invalidated range for
|
||||
offline support, but they should be treated as empty when additional operations
|
||||
which concern indexes in the range arrive from the server.
|
||||
"""
|
||||
|
||||
SYNC: Final = "SYNC"
|
||||
INSERT: Final = "INSERT"
|
||||
DELETE: Final = "DELETE"
|
||||
INVALIDATE: Final = "INVALIDATE"
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class SlidingSyncResult:
|
||||
"""
|
||||
The Sliding Sync result to be serialized to JSON for a response.
|
||||
|
||||
Attributes:
|
||||
next_pos: The next position token in the sliding window to request (next_batch).
|
||||
lists: Sliding window API. A map of list key to list results.
|
||||
rooms: Room subscription API. A map of room ID to room results.
|
||||
extensions: Extensions API. A map of extension key to extension results.
|
||||
"""
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class RoomResult:
|
||||
"""
|
||||
Attributes:
|
||||
name: Room name or calculated room name.
|
||||
avatar: Room avatar
|
||||
heroes: List of stripped membership events (containing `user_id` and optionally
|
||||
`avatar_url` and `displayname`) for the users used to calculate the room name.
|
||||
is_dm: Flag to specify whether the room is a direct-message room (most likely
|
||||
between two people).
|
||||
initial: Flag which is set when this is the first time the server is sending this
|
||||
data on this connection. Clients can use this flag to replace or update
|
||||
their local state. When there is an update, servers MUST omit this flag
|
||||
entirely and NOT send "initial":false as this is wasteful on bandwidth. The
|
||||
absence of this flag means 'false'.
|
||||
unstable_expanded_timeline: Flag which is set if we're returning more historic
|
||||
events due to the timeline limit having increased. See "XXX: Odd behavior"
|
||||
comment ing `synapse.handlers.sliding_sync`.
|
||||
required_state: The current state of the room
|
||||
timeline: Latest events in the room. The last event is the most recent.
|
||||
bundled_aggregations: A mapping of event ID to the bundled aggregations for
|
||||
the timeline events above. This allows clients to show accurate reaction
|
||||
counts (or edits, threads), even if some of the reaction events were skipped
|
||||
over in a gappy sync.
|
||||
stripped_state: Stripped state events (for rooms where the usre is
|
||||
invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2,
|
||||
absent on joined/left rooms
|
||||
prev_batch: A token that can be passed as a start parameter to the
|
||||
`/rooms/<room_id>/messages` API to retrieve earlier messages.
|
||||
limited: True if there are more events than `timeline_limit` looking
|
||||
backwards from the `response.pos` to the `request.pos`.
|
||||
num_live: The number of timeline events which have just occurred and are not historical.
|
||||
The last N events are 'live' and should be treated as such. This is mostly
|
||||
useful to determine whether a given @mention event should make a noise or not.
|
||||
Clients cannot rely solely on the absence of `initial: true` to determine live
|
||||
events because if a room not in the sliding window bumps into the window because
|
||||
of an @mention it will have `initial: true` yet contain a single live event
|
||||
(with potentially other old events in the timeline).
|
||||
bump_stamp: The `stream_ordering` of the last event according to the
|
||||
`bump_event_types`. This helps clients sort more readily without them
|
||||
needing to pull in a bunch of the timeline to determine the last activity.
|
||||
`bump_event_types` is a thing because for example, we don't want display
|
||||
name changes to mark the room as unread and bump it to the top. For
|
||||
encrypted rooms, we just have to consider any activity as a bump because we
|
||||
can't see the content and the client has to figure it out for themselves.
|
||||
joined_count: The number of users with membership of join, including the client's
|
||||
own user ID. (same as sync `v2 m.joined_member_count`)
|
||||
invited_count: The number of users with membership of invite. (same as sync v2
|
||||
`m.invited_member_count`)
|
||||
notification_count: The total number of unread notifications for this room. (same
|
||||
as sync v2)
|
||||
highlight_count: The number of unread notifications for this room with the highlight
|
||||
flag set. (same as sync v2)
|
||||
"""
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class StrippedHero:
|
||||
user_id: str
|
||||
display_name: Optional[str]
|
||||
avatar_url: Optional[str]
|
||||
|
||||
name: Optional[str]
|
||||
avatar: Optional[str]
|
||||
heroes: Optional[List[StrippedHero]]
|
||||
is_dm: bool
|
||||
initial: bool
|
||||
unstable_expanded_timeline: bool
|
||||
# Should be empty for invite/knock rooms with `stripped_state`
|
||||
required_state: List[EventBase]
|
||||
# Should be empty for invite/knock rooms with `stripped_state`
|
||||
timeline_events: List[EventBase]
|
||||
bundled_aggregations: Optional[Dict[str, "BundledAggregations"]]
|
||||
# Optional because it's only relevant to invite/knock rooms
|
||||
stripped_state: List[JsonDict]
|
||||
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
|
||||
prev_batch: Optional[StreamToken]
|
||||
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
|
||||
limited: Optional[bool]
|
||||
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
|
||||
num_live: Optional[int]
|
||||
bump_stamp: int
|
||||
joined_count: int
|
||||
invited_count: int
|
||||
notification_count: int
|
||||
highlight_count: int
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return (
|
||||
# If this is the first time the client is seeing the room, we should not filter it out
|
||||
# under any circumstance.
|
||||
self.initial
|
||||
# We need to let the client know if there are any new events
|
||||
or bool(self.required_state)
|
||||
or bool(self.timeline_events)
|
||||
or bool(self.stripped_state)
|
||||
)
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class SlidingWindowList:
|
||||
"""
|
||||
Attributes:
|
||||
count: The total number of entries in the list. Always present if this list
|
||||
is.
|
||||
ops: The sliding list operations to perform.
|
||||
"""
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class Operation:
|
||||
"""
|
||||
Attributes:
|
||||
op: The operation type to perform.
|
||||
range: Which index positions are affected by this operation. These are
|
||||
both inclusive.
|
||||
room_ids: Which room IDs are affected by this operation. These IDs match
|
||||
up to the positions in the `range`, so the last room ID in this list
|
||||
matches the 9th index. The room data is held in a separate object.
|
||||
"""
|
||||
|
||||
op: OperationType
|
||||
range: Tuple[int, int]
|
||||
room_ids: List[str]
|
||||
|
||||
count: int
|
||||
ops: List[Operation]
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class Extensions:
|
||||
"""Responses for extensions
|
||||
|
||||
Attributes:
|
||||
to_device: The to-device extension (MSC3885)
|
||||
e2ee: The E2EE device extension (MSC3884)
|
||||
"""
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class ToDeviceExtension:
|
||||
"""The to-device extension (MSC3885)
|
||||
|
||||
Attributes:
|
||||
next_batch: The to-device stream token the client should use
|
||||
to get more results
|
||||
events: A list of to-device messages for the client
|
||||
"""
|
||||
|
||||
next_batch: str
|
||||
events: Sequence[JsonMapping]
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return bool(self.events)
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class E2eeExtension:
|
||||
"""The E2EE device extension (MSC3884)
|
||||
|
||||
Attributes:
|
||||
device_list_updates: List of user_ids whose devices have changed or left (only
|
||||
present on incremental syncs).
|
||||
device_one_time_keys_count: Map from key algorithm to the number of
|
||||
unclaimed one-time keys currently held on the server for this device. If
|
||||
an algorithm is unlisted, the count for that algorithm is assumed to be
|
||||
zero. If this entire parameter is missing, the count for all algorithms
|
||||
is assumed to be zero.
|
||||
device_unused_fallback_key_types: List of unused fallback key algorithms
|
||||
for this device.
|
||||
"""
|
||||
|
||||
# Only present on incremental syncs
|
||||
device_list_updates: Optional[DeviceListUpdates]
|
||||
device_one_time_keys_count: Mapping[str, int]
|
||||
device_unused_fallback_key_types: Sequence[str]
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
# Note that "signed_curve25519" is always returned in key count responses
|
||||
# regardless of whether we uploaded any keys for it. This is necessary until
|
||||
# https://github.com/matrix-org/matrix-doc/issues/3298 is fixed.
|
||||
#
|
||||
# Also related:
|
||||
# https://github.com/element-hq/element-android/issues/3725 and
|
||||
# https://github.com/matrix-org/synapse/issues/10456
|
||||
default_otk = self.device_one_time_keys_count.get("signed_curve25519")
|
||||
more_than_default_otk = len(self.device_one_time_keys_count) > 1 or (
|
||||
default_otk is not None and default_otk > 0
|
||||
)
|
||||
|
||||
return bool(
|
||||
more_than_default_otk
|
||||
or self.device_list_updates
|
||||
or self.device_unused_fallback_key_types
|
||||
)
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class AccountDataExtension:
|
||||
"""The Account Data extension (MSC3959)
|
||||
|
||||
Attributes:
|
||||
global_account_data_map: Mapping from `type` to `content` of global account
|
||||
data events.
|
||||
account_data_by_room_map: Mapping from room_id to mapping of `type` to
|
||||
`content` of room account data events.
|
||||
"""
|
||||
|
||||
global_account_data_map: Mapping[str, JsonMapping]
|
||||
account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]]
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return bool(
|
||||
self.global_account_data_map or self.account_data_by_room_map
|
||||
)
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class ReceiptsExtension:
|
||||
"""The Receipts extension (MSC3960)
|
||||
|
||||
Attributes:
|
||||
room_id_to_receipt_map: Mapping from room_id to `m.receipt` ephemeral
|
||||
event (type, content)
|
||||
"""
|
||||
|
||||
room_id_to_receipt_map: Mapping[str, JsonMapping]
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return bool(self.room_id_to_receipt_map)
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class TypingExtension:
|
||||
"""The Typing Notification extension (MSC3961)
|
||||
|
||||
Attributes:
|
||||
room_id_to_typing_map: Mapping from room_id to `m.typing` ephemeral
|
||||
event (type, content)
|
||||
"""
|
||||
|
||||
room_id_to_typing_map: Mapping[str, JsonMapping]
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return bool(self.room_id_to_typing_map)
|
||||
|
||||
to_device: Optional[ToDeviceExtension] = None
|
||||
e2ee: Optional[E2eeExtension] = None
|
||||
account_data: Optional[AccountDataExtension] = None
|
||||
receipts: Optional[ReceiptsExtension] = None
|
||||
typing: Optional[TypingExtension] = None
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return bool(
|
||||
self.to_device
|
||||
or self.e2ee
|
||||
or self.account_data
|
||||
or self.receipts
|
||||
or self.typing
|
||||
)
|
||||
|
||||
next_pos: SlidingSyncStreamToken
|
||||
lists: Dict[str, SlidingWindowList]
|
||||
rooms: Dict[str, RoomResult]
|
||||
extensions: Extensions
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
"""Make the result appear empty if there are no updates. This is used
|
||||
to tell if the notifier needs to wait for more events when polling for
|
||||
events.
|
||||
"""
|
||||
# We don't include `self.lists` here, as a) `lists` is always non-empty even if
|
||||
# there are no changes, and b) since we're sorting rooms by `stream_ordering` of
|
||||
# the latest activity, anything that would cause the order to change would end
|
||||
# up in `self.rooms` and cause us to send down the change.
|
||||
return bool(self.rooms or self.extensions)
|
||||
|
||||
@staticmethod
|
||||
def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
|
||||
"Return a new empty result"
|
||||
return SlidingSyncResult(
|
||||
next_pos=next_pos,
|
||||
lists={},
|
||||
rooms={},
|
||||
extensions=SlidingSyncResult.Extensions(),
|
||||
)
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
#
|
||||
#
|
||||
import logging
|
||||
from copy import deepcopy
|
||||
from typing import Dict, List, Optional
|
||||
from unittest.mock import patch
|
||||
|
||||
@@ -46,7 +47,7 @@ from synapse.rest.client import knock, login, room
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
||||
from synapse.types import JsonDict, StreamToken, UserID
|
||||
from synapse.types.handlers.sliding_sync import SlidingSyncConfig
|
||||
from synapse.types.handlers import SlidingSyncConfig
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests.replication._base import BaseMultiWorkerStreamTestCase
|
||||
@@ -565,11 +566,23 @@ class RoomSyncConfigTestCase(TestCase):
|
||||
"""
|
||||
Combine A into B and B into A to make sure we get the same result.
|
||||
"""
|
||||
combined_config = a.combine_room_sync_config(b)
|
||||
self._assert_room_config_equal(combined_config, expected, "B into A")
|
||||
# Since we're mutating these in place, make a copy for each of our trials
|
||||
room_sync_config_a = deepcopy(a)
|
||||
room_sync_config_b = deepcopy(b)
|
||||
|
||||
combined_config = a.combine_room_sync_config(b)
|
||||
self._assert_room_config_equal(combined_config, expected, "A into B")
|
||||
# Combine B into A
|
||||
room_sync_config_a.combine_room_sync_config(room_sync_config_b)
|
||||
|
||||
self._assert_room_config_equal(room_sync_config_a, expected, "B into A")
|
||||
|
||||
# Since we're mutating these in place, make a copy for each of our trials
|
||||
room_sync_config_a = deepcopy(a)
|
||||
room_sync_config_b = deepcopy(b)
|
||||
|
||||
# Combine A into B
|
||||
room_sync_config_b.combine_room_sync_config(room_sync_config_a)
|
||||
|
||||
self._assert_room_config_equal(room_sync_config_b, expected, "A into B")
|
||||
|
||||
|
||||
class GetRoomMembershipForUserAtToTokenTestCase(HomeserverTestCase):
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
import io
|
||||
from typing import Any, Dict, Generator
|
||||
from unittest.mock import ANY, Mock, create_autospec
|
||||
|
||||
@@ -32,7 +33,9 @@ from twisted.web.http import HTTPChannel
|
||||
from twisted.web.http_headers import Headers
|
||||
|
||||
from synapse.api.errors import HttpResponseException, RequestSendFailed
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.config.ratelimiting import RatelimitSettings
|
||||
from synapse.http.matrixfederationclient import (
|
||||
ByteParser,
|
||||
MatrixFederationHttpClient,
|
||||
@@ -337,6 +340,71 @@ class FederationClientTests(HomeserverTestCase):
|
||||
r = self.successResultOf(d)
|
||||
self.assertEqual(r.code, 200)
|
||||
|
||||
def test_authed_media_redirect_response(self) -> None:
|
||||
"""
|
||||
Validate that, when following a `Location` redirect, the
|
||||
maximum size is _not_ set to the initial response `Content-Length` and
|
||||
the media file can be downloaded.
|
||||
"""
|
||||
limiter = Ratelimiter(
|
||||
store=self.hs.get_datastores().main,
|
||||
clock=self.clock,
|
||||
cfg=RatelimitSettings(key="", per_second=0.17, burst_count=1048576),
|
||||
)
|
||||
|
||||
output_stream = io.BytesIO()
|
||||
|
||||
d = defer.ensureDeferred(
|
||||
self.cl.federation_get_file(
|
||||
"testserv:8008", "path", output_stream, limiter, "127.0.0.1", 10000
|
||||
)
|
||||
)
|
||||
|
||||
self.pump()
|
||||
|
||||
conn = Mock()
|
||||
clients = self.reactor.tcpClients
|
||||
client = clients[0][2].buildProtocol(None)
|
||||
client.makeConnection(conn)
|
||||
|
||||
# Deferred does not have a result
|
||||
self.assertNoResult(d)
|
||||
|
||||
redirect_data = b"\r\n\r\n--6067d4698f8d40a0a794ea7d7379d53a\r\nContent-Type: application/json\r\n\r\n{}\r\n--6067d4698f8d40a0a794ea7d7379d53a\r\nLocation: http://testserv:8008/ab/c1/2345.txt\r\n\r\n--6067d4698f8d40a0a794ea7d7379d53a--\r\n\r\n"
|
||||
client.dataReceived(
|
||||
b"HTTP/1.1 200 OK\r\n"
|
||||
b"Server: Fake\r\n"
|
||||
b"Content-Length: %i\r\n"
|
||||
b"Content-Type: multipart/mixed; boundary=6067d4698f8d40a0a794ea7d7379d53a\r\n\r\n"
|
||||
% (len(redirect_data))
|
||||
)
|
||||
client.dataReceived(redirect_data)
|
||||
|
||||
# Still no result, not followed the redirect yet
|
||||
self.assertNoResult(d)
|
||||
|
||||
# Now send the response returned by the server at `Location`
|
||||
conn = Mock()
|
||||
clients = self.reactor.tcpClients
|
||||
client = clients[1][2].buildProtocol(None)
|
||||
client.makeConnection(conn)
|
||||
|
||||
# make sure the length is longer than the initial response
|
||||
data = b"Hello world!" * 30
|
||||
client.dataReceived(
|
||||
b"HTTP/1.1 200 OK\r\n"
|
||||
b"Server: Fake\r\n"
|
||||
b"Content-Length: %i\r\n"
|
||||
b"Content-Type: text/plain\r\n"
|
||||
b"\r\n"
|
||||
b"%s\r\n"
|
||||
b"\r\n" % (len(data), data)
|
||||
)
|
||||
|
||||
# We should get a successful response
|
||||
length, _, _ = self.successResultOf(d)
|
||||
self.assertEqual(length, len(data))
|
||||
|
||||
@parameterized.expand(["get_json", "post_json", "delete_json", "put_json"])
|
||||
def test_timeout_reading_body(self, method_name: str) -> None:
|
||||
"""
|
||||
|
||||
@@ -16,7 +16,7 @@ import logging
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
import synapse.rest.admin
|
||||
from synapse.api.constants import EventContentFields, EventTypes, Membership
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.room_versions import RoomVersions
|
||||
from synapse.rest.client import login, room, sync
|
||||
from synapse.server import HomeServer
|
||||
@@ -44,10 +44,6 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.store = hs.get_datastores().main
|
||||
self.storage_controllers = hs.get_storage_controllers()
|
||||
self.state_handler = self.hs.get_state_handler()
|
||||
persistence = self.hs.get_storage_controllers().persistence
|
||||
assert persistence is not None
|
||||
self.persistence = persistence
|
||||
|
||||
def test_rooms_meta_when_joined(self) -> None:
|
||||
"""
|
||||
@@ -604,16 +600,16 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
||||
Test that `bump_stamp` ignores backfilled events, i.e. events with a
|
||||
negative stream ordering.
|
||||
"""
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
# Create a remote room
|
||||
creator = "@user:other"
|
||||
room_id = "!foo:other"
|
||||
room_version = RoomVersions.V10
|
||||
shared_kwargs = {
|
||||
"room_id": room_id,
|
||||
"room_version": room_version.identifier,
|
||||
"room_version": "10",
|
||||
}
|
||||
|
||||
create_tuple = self.get_success(
|
||||
@@ -622,12 +618,6 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
||||
prev_event_ids=[],
|
||||
type=EventTypes.Create,
|
||||
state_key="",
|
||||
content={
|
||||
# The `ROOM_CREATOR` field could be removed if we used a room
|
||||
# version > 10 (in favor of relying on `sender`)
|
||||
EventContentFields.ROOM_CREATOR: creator,
|
||||
EventContentFields.ROOM_VERSION: room_version.identifier,
|
||||
},
|
||||
sender=creator,
|
||||
**shared_kwargs,
|
||||
)
|
||||
@@ -677,29 +667,22 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
||||
]
|
||||
|
||||
# Ensure the local HS knows the room version
|
||||
self.get_success(self.store.store_room(room_id, creator, False, room_version))
|
||||
self.get_success(
|
||||
self.store.store_room(room_id, creator, False, RoomVersions.V10)
|
||||
)
|
||||
|
||||
# Persist these events as backfilled events.
|
||||
for event, context in remote_events_and_contexts:
|
||||
self.get_success(
|
||||
self.persistence.persist_event(event, context, backfilled=True)
|
||||
)
|
||||
persistence = self.hs.get_storage_controllers().persistence
|
||||
assert persistence is not None
|
||||
|
||||
# Now we join the local user to the room. We want to make this feel as close to
|
||||
# the real `process_remote_join()` as possible but we'd like to avoid some of
|
||||
# the auth checks that would be done in the real code.
|
||||
#
|
||||
# FIXME: The test was originally written using this less-real
|
||||
# `persist_event(...)` shortcut but it would be nice to use the real remote join
|
||||
# process in a `FederatingHomeserverTestCase`.
|
||||
flawed_join_tuple = self.get_success(
|
||||
for event, context in remote_events_and_contexts:
|
||||
self.get_success(persistence.persist_event(event, context, backfilled=True))
|
||||
|
||||
# Now we join the local user to the room
|
||||
join_tuple = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=[invite_tuple[0].event_id],
|
||||
# This doesn't work correctly to create an `EventContext` that includes
|
||||
# both of these state events. I assume it's because we're working on our
|
||||
# local homeserver which has the remote state set as `outlier`. We have
|
||||
# to create our own EventContext below to get this right.
|
||||
auth_event_ids=[create_tuple[0].event_id, invite_tuple[0].event_id],
|
||||
type=EventTypes.Member,
|
||||
state_key=user1_id,
|
||||
@@ -708,22 +691,7 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
||||
**shared_kwargs,
|
||||
)
|
||||
)
|
||||
# We have to create our own context to get the state set correctly. If we use
|
||||
# the `EventContext` from the `flawed_join_tuple`, the `current_state_events`
|
||||
# table will only have the join event in it which should never happen in our
|
||||
# real server.
|
||||
join_event = flawed_join_tuple[0]
|
||||
join_context = self.get_success(
|
||||
self.state_handler.compute_event_context(
|
||||
join_event,
|
||||
state_ids_before_event={
|
||||
(e.type, e.state_key): e.event_id
|
||||
for e in [create_tuple[0], invite_tuple[0]]
|
||||
},
|
||||
partial_state=False,
|
||||
)
|
||||
)
|
||||
self.get_success(self.persistence.persist_event(join_event, join_context))
|
||||
self.get_success(persistence.persist_event(*join_tuple))
|
||||
|
||||
# Doing an SS request should return a positive `bump_stamp`, even though
|
||||
# the only event that matches the bump types has as negative stream
|
||||
|
||||
@@ -191,14 +191,8 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
|
||||
}
|
||||
_, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# Reset the positions
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_delete(
|
||||
table="sliding_sync_connections",
|
||||
keyvalues={"user_id": user1_id},
|
||||
desc="clear_cache",
|
||||
)
|
||||
)
|
||||
# Reset the in-memory cache
|
||||
self.hs.get_sliding_sync_handler().connection_store._connections.clear()
|
||||
|
||||
# Make the Sliding Sync request
|
||||
channel = self.make_request(
|
||||
|
||||
@@ -112,24 +112,6 @@ class UpdateUpsertManyTests(unittest.HomeserverTestCase):
|
||||
{(1, "user1", "hello"), (2, "user2", "bleb")},
|
||||
)
|
||||
|
||||
self.get_success(
|
||||
self.storage.db_pool.runInteraction(
|
||||
"test",
|
||||
self.storage.db_pool.simple_upsert_many_txn,
|
||||
self.table_name,
|
||||
key_names=key_names,
|
||||
key_values=[[2, "user2"]],
|
||||
value_names=[],
|
||||
value_values=[],
|
||||
)
|
||||
)
|
||||
|
||||
# Check results are what we expect
|
||||
self.assertEqual(
|
||||
set(self._dump_table_to_tuple()),
|
||||
{(1, "user1", "hello"), (2, "user2", "bleb")},
|
||||
)
|
||||
|
||||
def test_simple_update_many(self) -> None:
|
||||
"""
|
||||
simple_update_many performs many updates at once.
|
||||
|
||||
@@ -19,7 +19,6 @@
|
||||
#
|
||||
#
|
||||
|
||||
import logging
|
||||
from typing import List, Optional
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
@@ -36,8 +35,6 @@ from synapse.util import Clock
|
||||
|
||||
from tests.unittest import HomeserverTestCase
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ExtremPruneTestCase(HomeserverTestCase):
|
||||
servlets = [
|
||||
|
||||
@@ -24,7 +24,7 @@ from typing import List, Optional, Tuple, cast
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.api.constants import EventContentFields, EventTypes, JoinRules, Membership
|
||||
from synapse.api.constants import EventTypes, JoinRules, Membership
|
||||
from synapse.api.room_versions import RoomVersions
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.admin import register_servlets_for_client_rest_resource
|
||||
@@ -38,7 +38,6 @@ from synapse.util import Clock
|
||||
from tests import unittest
|
||||
from tests.server import TestHomeServer
|
||||
from tests.test_utils import event_injection
|
||||
from tests.test_utils.event_injection import create_event
|
||||
from tests.unittest import skip_unless
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -55,10 +54,6 @@ class RoomMemberStoreTestCase(unittest.HomeserverTestCase):
|
||||
# We can't test the RoomMemberStore on its own without the other event
|
||||
# storage logic
|
||||
self.store = hs.get_datastores().main
|
||||
self.state_handler = self.hs.get_state_handler()
|
||||
persistence = self.hs.get_storage_controllers().persistence
|
||||
assert persistence is not None
|
||||
self.persistence = persistence
|
||||
|
||||
self.u_alice = self.register_user("alice", "pass")
|
||||
self.t_alice = self.login("alice", "pass")
|
||||
@@ -225,166 +220,31 @@ class RoomMemberStoreTestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
|
||||
def test_join_locally_forgotten_room(self) -> None:
|
||||
"""
|
||||
Tests if a user joins a forgotten room, the room is not forgotten anymore.
|
||||
|
||||
Since a room can't be re-joined if everyone has left. This can only happen with
|
||||
a room with remote users in it.
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
# Create a remote room
|
||||
creator = "@user:other"
|
||||
room_id = "!foo:other"
|
||||
room_version = RoomVersions.V10
|
||||
shared_kwargs = {
|
||||
"room_id": room_id,
|
||||
"room_version": room_version.identifier,
|
||||
}
|
||||
|
||||
create_tuple = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=[],
|
||||
type=EventTypes.Create,
|
||||
state_key="",
|
||||
content={
|
||||
# The `ROOM_CREATOR` field could be removed if we used a room
|
||||
# version > 10 (in favor of relying on `sender`)
|
||||
EventContentFields.ROOM_CREATOR: creator,
|
||||
EventContentFields.ROOM_VERSION: room_version.identifier,
|
||||
},
|
||||
sender=creator,
|
||||
**shared_kwargs,
|
||||
)
|
||||
)
|
||||
creator_tuple = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=[create_tuple[0].event_id],
|
||||
auth_event_ids=[create_tuple[0].event_id],
|
||||
type=EventTypes.Member,
|
||||
state_key=creator,
|
||||
content={"membership": Membership.JOIN},
|
||||
sender=creator,
|
||||
**shared_kwargs,
|
||||
)
|
||||
)
|
||||
|
||||
remote_events_and_contexts = [
|
||||
create_tuple,
|
||||
creator_tuple,
|
||||
]
|
||||
|
||||
# Ensure the local HS knows the room version
|
||||
self.get_success(self.store.store_room(room_id, creator, False, room_version))
|
||||
|
||||
# Persist these events as backfilled events.
|
||||
for event, context in remote_events_and_contexts:
|
||||
self.get_success(
|
||||
self.persistence.persist_event(event, context, backfilled=True)
|
||||
)
|
||||
|
||||
# Now we join the local user to the room. We want to make this feel as close to
|
||||
# the real `process_remote_join()` as possible but we'd like to avoid some of
|
||||
# the auth checks that would be done in the real code.
|
||||
#
|
||||
# FIXME: The test was originally written using this less-real
|
||||
# `persist_event(...)` shortcut but it would be nice to use the real remote join
|
||||
# process in a `FederatingHomeserverTestCase`.
|
||||
flawed_join_tuple = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=[creator_tuple[0].event_id],
|
||||
# This doesn't work correctly to create an `EventContext` that includes
|
||||
# both of these state events. I assume it's because we're working on our
|
||||
# local homeserver which has the remote state set as `outlier`. We have
|
||||
# to create our own EventContext below to get this right.
|
||||
auth_event_ids=[create_tuple[0].event_id],
|
||||
type=EventTypes.Member,
|
||||
state_key=user1_id,
|
||||
content={"membership": Membership.JOIN},
|
||||
sender=user1_id,
|
||||
**shared_kwargs,
|
||||
)
|
||||
)
|
||||
# We have to create our own context to get the state set correctly. If we use
|
||||
# the `EventContext` from the `flawed_join_tuple`, the `current_state_events`
|
||||
# table will only have the join event in it which should never happen in our
|
||||
# real server.
|
||||
join_event = flawed_join_tuple[0]
|
||||
join_context = self.get_success(
|
||||
self.state_handler.compute_event_context(
|
||||
join_event,
|
||||
state_ids_before_event={
|
||||
(e.type, e.state_key): e.event_id for e in [create_tuple[0]]
|
||||
},
|
||||
partial_state=False,
|
||||
)
|
||||
)
|
||||
self.get_success(self.persistence.persist_event(join_event, join_context))
|
||||
|
||||
# The room shouldn't be forgotten because the local user just joined
|
||||
"""Tests if a user joins a forgotten room the room is not forgotten anymore."""
|
||||
self.room = self.helper.create_room_as(self.u_alice, tok=self.t_alice)
|
||||
self.assertFalse(
|
||||
self.get_success(self.store.is_locally_forgotten_room(room_id))
|
||||
self.get_success(self.store.is_locally_forgotten_room(self.room))
|
||||
)
|
||||
|
||||
# After all of the local users (there is only user1) leave and forgetting the
|
||||
# room, it is forgotten
|
||||
user1_leave_response = self.helper.leave(room_id, user1_id, tok=user1_tok)
|
||||
user1_leave_event = self.get_success(
|
||||
self.store.get_event(user1_leave_response["event_id"])
|
||||
)
|
||||
self.get_success(self.store.forget(user1_id, room_id))
|
||||
self.assertTrue(self.get_success(self.store.is_locally_forgotten_room(room_id)))
|
||||
|
||||
# Join the local user to the room (again). We want to make this feel as close to
|
||||
# the real `process_remote_join()` as possible but we'd like to avoid some of
|
||||
# the auth checks that would be done in the real code.
|
||||
#
|
||||
# FIXME: The test was originally written using this less-real
|
||||
# `event_injection.inject_member_event(...)` shortcut but it would be nice to
|
||||
# use the real remote join process in a `FederatingHomeserverTestCase`.
|
||||
flawed_join_tuple = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=[user1_leave_response["event_id"]],
|
||||
# This doesn't work correctly to create an `EventContext` that includes
|
||||
# both of these state events. I assume it's because we're working on our
|
||||
# local homeserver which has the remote state set as `outlier`. We have
|
||||
# to create our own EventContext below to get this right.
|
||||
auth_event_ids=[
|
||||
create_tuple[0].event_id,
|
||||
user1_leave_response["event_id"],
|
||||
],
|
||||
type=EventTypes.Member,
|
||||
state_key=user1_id,
|
||||
content={"membership": Membership.JOIN},
|
||||
sender=user1_id,
|
||||
**shared_kwargs,
|
||||
# after leaving and forget the room, it is forgotten
|
||||
self.get_success(
|
||||
event_injection.inject_member_event(
|
||||
self.hs, self.room, self.u_alice, "leave"
|
||||
)
|
||||
)
|
||||
# We have to create our own context to get the state set correctly. If we use
|
||||
# the `EventContext` from the `flawed_join_tuple`, the `current_state_events`
|
||||
# table will only have the join event in it which should never happen in our
|
||||
# real server.
|
||||
join_event = flawed_join_tuple[0]
|
||||
join_context = self.get_success(
|
||||
self.state_handler.compute_event_context(
|
||||
join_event,
|
||||
state_ids_before_event={
|
||||
(e.type, e.state_key): e.event_id
|
||||
for e in [create_tuple[0], user1_leave_event]
|
||||
},
|
||||
partial_state=False,
|
||||
self.get_success(self.store.forget(self.u_alice, self.room))
|
||||
self.assertTrue(
|
||||
self.get_success(self.store.is_locally_forgotten_room(self.room))
|
||||
)
|
||||
|
||||
# after rejoin the room is not forgotten anymore
|
||||
self.get_success(
|
||||
event_injection.inject_member_event(
|
||||
self.hs, self.room, self.u_alice, "join"
|
||||
)
|
||||
)
|
||||
self.get_success(self.persistence.persist_event(join_event, join_context))
|
||||
|
||||
# After the local user rejoins the remote room, it isn't forgotten anymore
|
||||
self.assertFalse(
|
||||
self.get_success(self.store.is_locally_forgotten_room(room_id))
|
||||
self.get_success(self.store.is_locally_forgotten_room(self.room))
|
||||
)
|
||||
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -272,8 +272,8 @@ class TestCase(unittest.TestCase):
|
||||
|
||||
def assertIncludes(
|
||||
self,
|
||||
actual_items: AbstractSet[TV],
|
||||
expected_items: AbstractSet[TV],
|
||||
actual_items: AbstractSet[str],
|
||||
expected_items: AbstractSet[str],
|
||||
exact: bool = False,
|
||||
message: Optional[str] = None,
|
||||
) -> None:
|
||||
|
||||
Reference in New Issue
Block a user