mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-11 01:40:27 +00:00
Compare commits
43 Commits
madlittlem
...
erikj/pagi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d2b59f2482 | ||
|
|
6c137b321d | ||
|
|
4b7abedfd9 | ||
|
|
f07f99387e | ||
|
|
4c67e06dfb | ||
|
|
92c58932d1 | ||
|
|
3263e12d73 | ||
|
|
c0b2f33dc2 | ||
|
|
3ace9bdff9 | ||
|
|
434c51d538 | ||
|
|
a72919b748 | ||
|
|
62050d2dfb | ||
|
|
bf0edf7a16 | ||
|
|
9df5f81687 | ||
|
|
a7e6ad9f3e | ||
|
|
6c8c061c2f | ||
|
|
7b3324e252 | ||
|
|
8c3fca8b28 | ||
|
|
a90140358b | ||
|
|
baab93b0dd | ||
|
|
839088e2e7 | ||
|
|
6a101e512f | ||
|
|
cdd379b6df | ||
|
|
3b6027dbc1 | ||
|
|
6992fb9bc1 | ||
|
|
22dea0ca37 | ||
|
|
96d6fff447 | ||
|
|
2b0f9bddcf | ||
|
|
3b52bd1cf6 | ||
|
|
e5b3034fc4 | ||
|
|
43cbde4653 | ||
|
|
26c7f08465 | ||
|
|
4902770e32 | ||
|
|
38d90e0d7d | ||
|
|
99a7205093 | ||
|
|
5941346c5b | ||
|
|
573e51cc0b | ||
|
|
39182c3594 | ||
|
|
b999adcaa2 | ||
|
|
d1e9655f75 | ||
|
|
64df836067 | ||
|
|
32d476d4f1 | ||
|
|
a2decbdd66 |
@@ -20,6 +20,10 @@ export DUMP_COVERAGE_COMMAND="coverage help"
|
|||||||
# UNSTABLE or FAILURE this build.
|
# UNSTABLE or FAILURE this build.
|
||||||
export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?"
|
export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?"
|
||||||
|
|
||||||
|
TOX_BIN=$WORKSPACE/.tox/py27/bin
|
||||||
|
python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install
|
||||||
|
$TOX_BIN/pip install lxml
|
||||||
|
|
||||||
rm .coverage* || echo "No coverage files to remove"
|
rm .coverage* || echo "No coverage files to remove"
|
||||||
|
|
||||||
tox -e py27
|
tox -e py27
|
||||||
|
|||||||
@@ -44,6 +44,7 @@ class Codes(object):
|
|||||||
THREEPID_AUTH_FAILED = "M_THREEPID_AUTH_FAILED"
|
THREEPID_AUTH_FAILED = "M_THREEPID_AUTH_FAILED"
|
||||||
THREEPID_IN_USE = "THREEPID_IN_USE"
|
THREEPID_IN_USE = "THREEPID_IN_USE"
|
||||||
INVALID_USERNAME = "M_INVALID_USERNAME"
|
INVALID_USERNAME = "M_INVALID_USERNAME"
|
||||||
|
CANNOT_PEEK = "M_CANNOT_PEEK"
|
||||||
|
|
||||||
|
|
||||||
class CodeMessageException(RuntimeError):
|
class CodeMessageException(RuntimeError):
|
||||||
|
|||||||
@@ -20,6 +20,9 @@ from synapse.util.metrics import Measure
|
|||||||
from synapse.util.caches.response_cache import ResponseCache
|
from synapse.util.caches.response_cache import ResponseCache
|
||||||
from synapse.push.clientformat import format_push_rules_for_user
|
from synapse.push.clientformat import format_push_rules_for_user
|
||||||
from synapse.visibility import filter_events_for_client
|
from synapse.visibility import filter_events_for_client
|
||||||
|
from synapse.types import SyncNextBatchToken, SyncPaginationState
|
||||||
|
from synapse.api.errors import Codes, SynapseError
|
||||||
|
from synapse.storage.tags import (TAG_CHANGE_NEWLY_TAGGED, TAG_CHANGE_ALL_REMOVED)
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
@@ -35,9 +38,48 @@ SyncConfig = collections.namedtuple("SyncConfig", [
|
|||||||
"filter_collection",
|
"filter_collection",
|
||||||
"is_guest",
|
"is_guest",
|
||||||
"request_key",
|
"request_key",
|
||||||
|
"pagination_config",
|
||||||
])
|
])
|
||||||
|
|
||||||
|
|
||||||
|
class SyncPaginationConfig(collections.namedtuple("SyncPaginationConfig", [
|
||||||
|
"order",
|
||||||
|
"limit",
|
||||||
|
"tags",
|
||||||
|
])):
|
||||||
|
"Initial pagination configuration from initial sync."
|
||||||
|
def __init__(self, order, limit, tags):
|
||||||
|
if order not in SYNC_PAGINATION_VALID_ORDERS:
|
||||||
|
raise SynapseError(400, "Invalid 'order'")
|
||||||
|
if tags not in SYNC_PAGINATION_VALID_TAGS_OPTIONS:
|
||||||
|
raise SynapseError(400, "Invalid 'tags'")
|
||||||
|
|
||||||
|
try:
|
||||||
|
limit = int(limit)
|
||||||
|
except:
|
||||||
|
raise SynapseError(400, "Invalid 'limit'")
|
||||||
|
|
||||||
|
super(SyncPaginationConfig, self).__init__(order, limit, tags)
|
||||||
|
|
||||||
|
|
||||||
|
SYNC_PAGINATION_TAGS_INCLUDE_ALL = "m.include_all"
|
||||||
|
SYNC_PAGINATION_TAGS_IGNORE = "m.ignore"
|
||||||
|
SYNC_PAGINATION_VALID_TAGS_OPTIONS = (
|
||||||
|
SYNC_PAGINATION_TAGS_INCLUDE_ALL, SYNC_PAGINATION_TAGS_IGNORE,
|
||||||
|
)
|
||||||
|
|
||||||
|
SYNC_PAGINATION_ORDER_TS = "m.origin_server_ts"
|
||||||
|
SYNC_PAGINATION_VALID_ORDERS = (SYNC_PAGINATION_ORDER_TS,)
|
||||||
|
|
||||||
|
|
||||||
|
SyncExtras = collections.namedtuple("SyncExtras", [
|
||||||
|
"paginate", # dict with "limit" key
|
||||||
|
"peek", # dict of room_id -> dict
|
||||||
|
])
|
||||||
|
|
||||||
|
DEFAULT_SYNC_EXTRAS = SyncExtras(paginate={}, peek={})
|
||||||
|
|
||||||
|
|
||||||
class TimelineBatch(collections.namedtuple("TimelineBatch", [
|
class TimelineBatch(collections.namedtuple("TimelineBatch", [
|
||||||
"prev_batch",
|
"prev_batch",
|
||||||
"events",
|
"events",
|
||||||
@@ -59,6 +101,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
|
|||||||
"ephemeral",
|
"ephemeral",
|
||||||
"account_data",
|
"account_data",
|
||||||
"unread_notifications",
|
"unread_notifications",
|
||||||
|
"synced", # bool
|
||||||
])):
|
])):
|
||||||
__slots__ = []
|
__slots__ = []
|
||||||
|
|
||||||
@@ -106,6 +149,18 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class ErrorSyncResult(collections.namedtuple("ErrorSyncResult", [
|
||||||
|
"room_id", # str
|
||||||
|
"errcode", # str
|
||||||
|
"error", # str
|
||||||
|
])):
|
||||||
|
__slots__ = []
|
||||||
|
|
||||||
|
def __nonzero__(self):
|
||||||
|
"""Errors should always be reported to the client"""
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
class SyncResult(collections.namedtuple("SyncResult", [
|
class SyncResult(collections.namedtuple("SyncResult", [
|
||||||
"next_batch", # Token for the next sync
|
"next_batch", # Token for the next sync
|
||||||
"presence", # List of presence events for the user.
|
"presence", # List of presence events for the user.
|
||||||
@@ -113,6 +168,9 @@ class SyncResult(collections.namedtuple("SyncResult", [
|
|||||||
"joined", # JoinedSyncResult for each joined room.
|
"joined", # JoinedSyncResult for each joined room.
|
||||||
"invited", # InvitedSyncResult for each invited room.
|
"invited", # InvitedSyncResult for each invited room.
|
||||||
"archived", # ArchivedSyncResult for each archived room.
|
"archived", # ArchivedSyncResult for each archived room.
|
||||||
|
"errors", # ErrorSyncResult
|
||||||
|
"pagination_info",
|
||||||
|
"unread_notifications",
|
||||||
])):
|
])):
|
||||||
__slots__ = []
|
__slots__ = []
|
||||||
|
|
||||||
@@ -140,8 +198,8 @@ class SyncHandler(object):
|
|||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.response_cache = ResponseCache()
|
self.response_cache = ResponseCache()
|
||||||
|
|
||||||
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
|
def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0,
|
||||||
full_state=False):
|
full_state=False, extras=DEFAULT_SYNC_EXTRAS):
|
||||||
"""Get the sync for a client if we have new data for it now. Otherwise
|
"""Get the sync for a client if we have new data for it now. Otherwise
|
||||||
wait for new data to arrive on the server. If the timeout expires, then
|
wait for new data to arrive on the server. If the timeout expires, then
|
||||||
return an empty sync result.
|
return an empty sync result.
|
||||||
@@ -153,48 +211,42 @@ class SyncHandler(object):
|
|||||||
result = self.response_cache.set(
|
result = self.response_cache.set(
|
||||||
sync_config.request_key,
|
sync_config.request_key,
|
||||||
self._wait_for_sync_for_user(
|
self._wait_for_sync_for_user(
|
||||||
sync_config, since_token, timeout, full_state
|
sync_config, batch_token, timeout, full_state, extras,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
|
def _wait_for_sync_for_user(self, sync_config, batch_token, timeout,
|
||||||
full_state):
|
full_state, extras=DEFAULT_SYNC_EXTRAS):
|
||||||
context = LoggingContext.current_context()
|
context = LoggingContext.current_context()
|
||||||
if context:
|
if context:
|
||||||
if since_token is None:
|
if batch_token is None:
|
||||||
context.tag = "initial_sync"
|
context.tag = "initial_sync"
|
||||||
elif full_state:
|
elif full_state:
|
||||||
context.tag = "full_state_sync"
|
context.tag = "full_state_sync"
|
||||||
else:
|
else:
|
||||||
context.tag = "incremental_sync"
|
context.tag = "incremental_sync"
|
||||||
|
|
||||||
if timeout == 0 or since_token is None or full_state:
|
if timeout == 0 or batch_token is None or full_state:
|
||||||
# we are going to return immediately, so don't bother calling
|
# we are going to return immediately, so don't bother calling
|
||||||
# notifier.wait_for_events.
|
# notifier.wait_for_events.
|
||||||
result = yield self.current_sync_for_user(
|
result = yield self.generate_sync_result(
|
||||||
sync_config, since_token, full_state=full_state,
|
sync_config, batch_token, full_state=full_state, extras=extras,
|
||||||
)
|
)
|
||||||
defer.returnValue(result)
|
defer.returnValue(result)
|
||||||
else:
|
else:
|
||||||
def current_sync_callback(before_token, after_token):
|
def current_sync_callback(before_token, after_token):
|
||||||
return self.current_sync_for_user(sync_config, since_token)
|
return self.generate_sync_result(
|
||||||
|
sync_config, batch_token, full_state=False, extras=extras,
|
||||||
|
)
|
||||||
|
|
||||||
result = yield self.notifier.wait_for_events(
|
result = yield self.notifier.wait_for_events(
|
||||||
sync_config.user.to_string(), timeout, current_sync_callback,
|
sync_config.user.to_string(), timeout, current_sync_callback,
|
||||||
from_token=since_token,
|
from_token=batch_token.stream_token,
|
||||||
)
|
)
|
||||||
defer.returnValue(result)
|
defer.returnValue(result)
|
||||||
|
|
||||||
def current_sync_for_user(self, sync_config, since_token=None,
|
|
||||||
full_state=False):
|
|
||||||
"""Get the sync for client needed to match what the server has now.
|
|
||||||
Returns:
|
|
||||||
A Deferred SyncResult.
|
|
||||||
"""
|
|
||||||
return self.generate_sync_result(sync_config, since_token, full_state)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def push_rules_for_user(self, user):
|
def push_rules_for_user(self, user):
|
||||||
user_id = user.to_string()
|
user_id = user.to_string()
|
||||||
@@ -490,13 +542,15 @@ class SyncHandler(object):
|
|||||||
defer.returnValue(None)
|
defer.returnValue(None)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def generate_sync_result(self, sync_config, since_token=None, full_state=False):
|
def generate_sync_result(self, sync_config, batch_token=None, full_state=False,
|
||||||
|
extras=DEFAULT_SYNC_EXTRAS):
|
||||||
"""Generates a sync result.
|
"""Generates a sync result.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
sync_config (SyncConfig)
|
sync_config (SyncConfig)
|
||||||
since_token (StreamToken)
|
since_token (StreamToken)
|
||||||
full_state (bool)
|
full_state (bool)
|
||||||
|
extras (SyncExtras)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Deferred(SyncResult)
|
Deferred(SyncResult)
|
||||||
@@ -508,10 +562,16 @@ class SyncHandler(object):
|
|||||||
# Always use the `now_token` in `SyncResultBuilder`
|
# Always use the `now_token` in `SyncResultBuilder`
|
||||||
now_token = yield self.event_sources.get_current_token()
|
now_token = yield self.event_sources.get_current_token()
|
||||||
|
|
||||||
|
all_joined_rooms = yield self.store.get_rooms_for_user(
|
||||||
|
sync_config.user.to_string()
|
||||||
|
)
|
||||||
|
all_joined_rooms = [room.room_id for room in all_joined_rooms]
|
||||||
|
|
||||||
sync_result_builder = SyncResultBuilder(
|
sync_result_builder = SyncResultBuilder(
|
||||||
sync_config, full_state,
|
sync_config, full_state,
|
||||||
since_token=since_token,
|
batch_token=batch_token,
|
||||||
now_token=now_token,
|
now_token=now_token,
|
||||||
|
all_joined_rooms=all_joined_rooms,
|
||||||
)
|
)
|
||||||
|
|
||||||
account_data_by_room = yield self._generate_sync_entry_for_account_data(
|
account_data_by_room = yield self._generate_sync_entry_for_account_data(
|
||||||
@@ -519,7 +579,7 @@ class SyncHandler(object):
|
|||||||
)
|
)
|
||||||
|
|
||||||
res = yield self._generate_sync_entry_for_rooms(
|
res = yield self._generate_sync_entry_for_rooms(
|
||||||
sync_result_builder, account_data_by_room
|
sync_result_builder, account_data_by_room, extras,
|
||||||
)
|
)
|
||||||
newly_joined_rooms, newly_joined_users = res
|
newly_joined_rooms, newly_joined_users = res
|
||||||
|
|
||||||
@@ -527,15 +587,55 @@ class SyncHandler(object):
|
|||||||
sync_result_builder, newly_joined_rooms, newly_joined_users
|
sync_result_builder, newly_joined_rooms, newly_joined_users
|
||||||
)
|
)
|
||||||
|
|
||||||
|
yield self._generate_notification_counts(sync_result_builder)
|
||||||
|
|
||||||
defer.returnValue(SyncResult(
|
defer.returnValue(SyncResult(
|
||||||
presence=sync_result_builder.presence,
|
presence=sync_result_builder.presence,
|
||||||
account_data=sync_result_builder.account_data,
|
account_data=sync_result_builder.account_data,
|
||||||
joined=sync_result_builder.joined,
|
joined=sync_result_builder.joined,
|
||||||
invited=sync_result_builder.invited,
|
invited=sync_result_builder.invited,
|
||||||
archived=sync_result_builder.archived,
|
archived=sync_result_builder.archived,
|
||||||
next_batch=sync_result_builder.now_token,
|
errors=sync_result_builder.errors,
|
||||||
|
next_batch=SyncNextBatchToken(
|
||||||
|
stream_token=sync_result_builder.now_token,
|
||||||
|
pagination_state=sync_result_builder.pagination_state,
|
||||||
|
),
|
||||||
|
pagination_info=sync_result_builder.pagination_info,
|
||||||
|
unread_notifications=sync_result_builder.unread_notifications,
|
||||||
))
|
))
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _generate_notification_counts(self, sync_result_builder):
|
||||||
|
rooms = sync_result_builder.all_joined_rooms
|
||||||
|
|
||||||
|
total_notif_count = [0]
|
||||||
|
rooms_with_notifs = set()
|
||||||
|
total_highlight_count = [0]
|
||||||
|
rooms_with_highlights = set()
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def notif_for_room(room_id):
|
||||||
|
notifs = yield self.unread_notifs_for_room_id(
|
||||||
|
room_id, sync_result_builder.sync_config
|
||||||
|
)
|
||||||
|
if notifs is not None:
|
||||||
|
total_notif_count[0] += notifs["notify_count"]
|
||||||
|
total_highlight_count[0] += notifs["highlight_count"]
|
||||||
|
|
||||||
|
if notifs["notify_count"]:
|
||||||
|
rooms_with_notifs.add(room_id)
|
||||||
|
if notifs["highlight_count"]:
|
||||||
|
rooms_with_highlights.add(room_id)
|
||||||
|
|
||||||
|
yield concurrently_execute(notif_for_room, rooms, 10)
|
||||||
|
|
||||||
|
sync_result_builder.unread_notifications = {
|
||||||
|
"total_notification_count": total_notif_count[0],
|
||||||
|
"rooms_notification_count": len(rooms_with_notifs),
|
||||||
|
"total_highlight_count": total_highlight_count[0],
|
||||||
|
"rooms_highlight_count": len(rooms_with_highlights),
|
||||||
|
}
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _generate_sync_entry_for_account_data(self, sync_result_builder):
|
def _generate_sync_entry_for_account_data(self, sync_result_builder):
|
||||||
"""Generates the account data portion of the sync response. Populates
|
"""Generates the account data portion of the sync response. Populates
|
||||||
@@ -646,7 +746,8 @@ class SyncHandler(object):
|
|||||||
sync_result_builder.presence = presence
|
sync_result_builder.presence = presence
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room):
|
def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room,
|
||||||
|
extras):
|
||||||
"""Generates the rooms portion of the sync response. Populates the
|
"""Generates the rooms portion of the sync response. Populates the
|
||||||
`sync_result_builder` with the result.
|
`sync_result_builder` with the result.
|
||||||
|
|
||||||
@@ -690,6 +791,12 @@ class SyncHandler(object):
|
|||||||
|
|
||||||
tags_by_room = yield self.store.get_tags_for_user(user_id)
|
tags_by_room = yield self.store.get_tags_for_user(user_id)
|
||||||
|
|
||||||
|
yield self._update_room_entries_for_paginated_sync(
|
||||||
|
sync_result_builder, room_entries, extras
|
||||||
|
)
|
||||||
|
|
||||||
|
sync_result_builder.full_state |= sync_result_builder.since_token is None
|
||||||
|
|
||||||
def handle_room_entries(room_entry):
|
def handle_room_entries(room_entry):
|
||||||
return self._generate_room_entry(
|
return self._generate_room_entry(
|
||||||
sync_result_builder,
|
sync_result_builder,
|
||||||
@@ -698,7 +805,6 @@ class SyncHandler(object):
|
|||||||
ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
|
ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
|
||||||
tags=tags_by_room.get(room_entry.room_id),
|
tags=tags_by_room.get(room_entry.room_id),
|
||||||
account_data=account_data_by_room.get(room_entry.room_id, {}),
|
account_data=account_data_by_room.get(room_entry.room_id, {}),
|
||||||
always_include=sync_result_builder.full_state,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
yield concurrently_execute(handle_room_entries, room_entries, 10)
|
yield concurrently_execute(handle_room_entries, room_entries, 10)
|
||||||
@@ -719,6 +825,162 @@ class SyncHandler(object):
|
|||||||
|
|
||||||
defer.returnValue((newly_joined_rooms, newly_joined_users))
|
defer.returnValue((newly_joined_rooms, newly_joined_users))
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _update_room_entries_for_paginated_sync(self, sync_result_builder,
|
||||||
|
room_entries, extras):
|
||||||
|
"""Works out which room_entries should be synced to the client, which
|
||||||
|
would need to be resynced if they were sent down, etc.
|
||||||
|
|
||||||
|
Mutates room_entries.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
sync_result_builder (SyncResultBuilder)
|
||||||
|
room_entries (list(RoomSyncResultBuilder))
|
||||||
|
extras (SyncExtras)
|
||||||
|
"""
|
||||||
|
user_id = sync_result_builder.sync_config.user.to_string()
|
||||||
|
sync_config = sync_result_builder.sync_config
|
||||||
|
|
||||||
|
if sync_config.pagination_config:
|
||||||
|
pagination_config = sync_config.pagination_config
|
||||||
|
old_pagination_value = 0
|
||||||
|
include_all_tags = pagination_config.tags == SYNC_PAGINATION_TAGS_INCLUDE_ALL
|
||||||
|
elif sync_result_builder.pagination_state:
|
||||||
|
pagination_config = SyncPaginationConfig(
|
||||||
|
order=sync_result_builder.pagination_state.order,
|
||||||
|
limit=sync_result_builder.pagination_state.limit,
|
||||||
|
tags=sync_result_builder.pagination_state.tags,
|
||||||
|
)
|
||||||
|
old_pagination_value = sync_result_builder.pagination_state.value
|
||||||
|
include_all_tags = pagination_config.tags == SYNC_PAGINATION_TAGS_INCLUDE_ALL
|
||||||
|
else:
|
||||||
|
pagination_config = None
|
||||||
|
old_pagination_value = 0
|
||||||
|
include_all_tags = False
|
||||||
|
|
||||||
|
if sync_result_builder.pagination_state:
|
||||||
|
missing_state = yield self._get_rooms_that_need_full_state(
|
||||||
|
room_ids=[r.room_id for r in room_entries],
|
||||||
|
sync_config=sync_config,
|
||||||
|
since_token=sync_result_builder.since_token,
|
||||||
|
pagination_state=sync_result_builder.pagination_state,
|
||||||
|
)
|
||||||
|
|
||||||
|
all_tags = yield self.store.get_tags_for_user(user_id)
|
||||||
|
|
||||||
|
if sync_result_builder.since_token:
|
||||||
|
stream_id = sync_result_builder.since_token.account_data_key
|
||||||
|
now_stream_id = sync_result_builder.now_token.account_data_key
|
||||||
|
tag_changes = yield self.store.get_room_tags_changed(
|
||||||
|
user_id, stream_id, now_stream_id
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
tag_changes = {}
|
||||||
|
|
||||||
|
if missing_state:
|
||||||
|
for r in room_entries:
|
||||||
|
if r.room_id in missing_state:
|
||||||
|
if include_all_tags:
|
||||||
|
# If we're always including tagged rooms, then only
|
||||||
|
# resync rooms which are newly tagged.
|
||||||
|
change = tag_changes.get(r.room_id)
|
||||||
|
if change == TAG_CHANGE_NEWLY_TAGGED:
|
||||||
|
r.always_include = True
|
||||||
|
r.would_require_resync = True
|
||||||
|
r.synced = True
|
||||||
|
continue
|
||||||
|
elif change == TAG_CHANGE_ALL_REMOVED:
|
||||||
|
r.always_include = True
|
||||||
|
r.synced = False
|
||||||
|
continue
|
||||||
|
elif r.room_id in all_tags:
|
||||||
|
r.always_include = True
|
||||||
|
continue
|
||||||
|
|
||||||
|
if r.room_id in extras.peek:
|
||||||
|
since = extras.peek[r.room_id].get("since", None)
|
||||||
|
if since:
|
||||||
|
tok = SyncNextBatchToken.from_string(since)
|
||||||
|
r.since_token = tok.stream_token
|
||||||
|
else:
|
||||||
|
r.always_include = True
|
||||||
|
r.would_require_resync = True
|
||||||
|
r.synced = False
|
||||||
|
else:
|
||||||
|
r.would_require_resync = True
|
||||||
|
|
||||||
|
elif pagination_config and include_all_tags:
|
||||||
|
all_tags = yield self.store.get_tags_for_user(user_id)
|
||||||
|
|
||||||
|
for r in room_entries:
|
||||||
|
if r.room_id in all_tags:
|
||||||
|
r.always_include = True
|
||||||
|
|
||||||
|
for room_id in set(extras.peek.keys()) - {r.room_id for r in room_entries}:
|
||||||
|
sync_result_builder.errors.append(ErrorSyncResult(
|
||||||
|
room_id=room_id,
|
||||||
|
errcode=Codes.CANNOT_PEEK,
|
||||||
|
error="Cannot peek into requested room",
|
||||||
|
))
|
||||||
|
|
||||||
|
if pagination_config:
|
||||||
|
room_ids = [r.room_id for r in room_entries]
|
||||||
|
pagination_limit = pagination_config.limit
|
||||||
|
|
||||||
|
extra_limit = extras.paginate.get("limit", 0)
|
||||||
|
|
||||||
|
room_map = yield self._get_room_timestamps_at_token(
|
||||||
|
room_ids, sync_result_builder.now_token, sync_config,
|
||||||
|
pagination_limit + extra_limit + 1,
|
||||||
|
)
|
||||||
|
|
||||||
|
limited = False
|
||||||
|
if room_map:
|
||||||
|
sorted_list = sorted(
|
||||||
|
room_map.items(),
|
||||||
|
key=lambda item: -item[1]
|
||||||
|
)
|
||||||
|
|
||||||
|
cutoff_list = sorted_list[:pagination_limit + extra_limit]
|
||||||
|
|
||||||
|
if cutoff_list[pagination_limit:]:
|
||||||
|
new_room_ids = set(r[0] for r in cutoff_list[pagination_limit:])
|
||||||
|
for r in room_entries:
|
||||||
|
if r.room_id in new_room_ids:
|
||||||
|
r.always_include = True
|
||||||
|
r.would_require_resync = True
|
||||||
|
|
||||||
|
_, bottom_ts = cutoff_list[-1]
|
||||||
|
new_pagination_value = bottom_ts
|
||||||
|
|
||||||
|
# We're limited if there are any rooms that are after cutoff
|
||||||
|
# in the list, but still have an origin server ts from after
|
||||||
|
# the pagination value from the since token.
|
||||||
|
limited = any(
|
||||||
|
old_pagination_value < r[1]
|
||||||
|
for r in sorted_list[pagination_limit + extra_limit:]
|
||||||
|
)
|
||||||
|
|
||||||
|
sync_result_builder.pagination_state = SyncPaginationState(
|
||||||
|
order=pagination_config.order, value=new_pagination_value,
|
||||||
|
limit=pagination_limit + extra_limit,
|
||||||
|
tags=pagination_config.tags,
|
||||||
|
)
|
||||||
|
|
||||||
|
to_sync_map = dict(cutoff_list)
|
||||||
|
else:
|
||||||
|
to_sync_map = {}
|
||||||
|
|
||||||
|
sync_result_builder.pagination_info["limited"] = limited
|
||||||
|
|
||||||
|
if len(room_map) == len(room_entries):
|
||||||
|
sync_result_builder.pagination_state = None
|
||||||
|
|
||||||
|
room_entries[:] = [
|
||||||
|
r for r in room_entries
|
||||||
|
if r.room_id in to_sync_map or r.always_include
|
||||||
|
]
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _get_rooms_changed(self, sync_result_builder, ignored_users):
|
def _get_rooms_changed(self, sync_result_builder, ignored_users):
|
||||||
"""Gets the the changes that have happened since the last sync.
|
"""Gets the the changes that have happened since the last sync.
|
||||||
@@ -809,7 +1071,6 @@ class SyncHandler(object):
|
|||||||
rtype="archived",
|
rtype="archived",
|
||||||
events=None,
|
events=None,
|
||||||
newly_joined=room_id in newly_joined_rooms,
|
newly_joined=room_id in newly_joined_rooms,
|
||||||
full_state=False,
|
|
||||||
since_token=since_token,
|
since_token=since_token,
|
||||||
upto_token=leave_token,
|
upto_token=leave_token,
|
||||||
))
|
))
|
||||||
@@ -839,7 +1100,6 @@ class SyncHandler(object):
|
|||||||
rtype="joined",
|
rtype="joined",
|
||||||
events=events,
|
events=events,
|
||||||
newly_joined=room_id in newly_joined_rooms,
|
newly_joined=room_id in newly_joined_rooms,
|
||||||
full_state=False,
|
|
||||||
since_token=None if room_id in newly_joined_rooms else since_token,
|
since_token=None if room_id in newly_joined_rooms else since_token,
|
||||||
upto_token=prev_batch_token,
|
upto_token=prev_batch_token,
|
||||||
))
|
))
|
||||||
@@ -849,7 +1109,6 @@ class SyncHandler(object):
|
|||||||
rtype="joined",
|
rtype="joined",
|
||||||
events=[],
|
events=[],
|
||||||
newly_joined=room_id in newly_joined_rooms,
|
newly_joined=room_id in newly_joined_rooms,
|
||||||
full_state=False,
|
|
||||||
since_token=since_token,
|
since_token=since_token,
|
||||||
upto_token=since_token,
|
upto_token=since_token,
|
||||||
))
|
))
|
||||||
@@ -893,7 +1152,6 @@ class SyncHandler(object):
|
|||||||
rtype="joined",
|
rtype="joined",
|
||||||
events=None,
|
events=None,
|
||||||
newly_joined=False,
|
newly_joined=False,
|
||||||
full_state=True,
|
|
||||||
since_token=since_token,
|
since_token=since_token,
|
||||||
upto_token=now_token,
|
upto_token=now_token,
|
||||||
))
|
))
|
||||||
@@ -920,7 +1178,6 @@ class SyncHandler(object):
|
|||||||
rtype="archived",
|
rtype="archived",
|
||||||
events=None,
|
events=None,
|
||||||
newly_joined=False,
|
newly_joined=False,
|
||||||
full_state=True,
|
|
||||||
since_token=since_token,
|
since_token=since_token,
|
||||||
upto_token=leave_token,
|
upto_token=leave_token,
|
||||||
))
|
))
|
||||||
@@ -929,8 +1186,7 @@ class SyncHandler(object):
|
|||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _generate_room_entry(self, sync_result_builder, ignored_users,
|
def _generate_room_entry(self, sync_result_builder, ignored_users,
|
||||||
room_builder, ephemeral, tags, account_data,
|
room_builder, ephemeral, tags, account_data):
|
||||||
always_include=False):
|
|
||||||
"""Populates the `joined` and `archived` section of `sync_result_builder`
|
"""Populates the `joined` and `archived` section of `sync_result_builder`
|
||||||
based on the `room_builder`.
|
based on the `room_builder`.
|
||||||
|
|
||||||
@@ -946,19 +1202,23 @@ class SyncHandler(object):
|
|||||||
even if empty.
|
even if empty.
|
||||||
"""
|
"""
|
||||||
newly_joined = room_builder.newly_joined
|
newly_joined = room_builder.newly_joined
|
||||||
full_state = (
|
always_include = (
|
||||||
room_builder.full_state
|
newly_joined
|
||||||
or newly_joined
|
|
||||||
or sync_result_builder.full_state
|
or sync_result_builder.full_state
|
||||||
|
or room_builder.always_include
|
||||||
|
)
|
||||||
|
full_state = (
|
||||||
|
newly_joined
|
||||||
|
or sync_result_builder.full_state
|
||||||
|
or room_builder.would_require_resync
|
||||||
)
|
)
|
||||||
events = room_builder.events
|
events = room_builder.events
|
||||||
|
|
||||||
# We want to shortcut out as early as possible.
|
# We want to shortcut out as early as possible.
|
||||||
if not (always_include or account_data or ephemeral or full_state):
|
if not (always_include or account_data or ephemeral):
|
||||||
if events == [] and tags is None:
|
if events == [] and tags is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
since_token = sync_result_builder.since_token
|
|
||||||
now_token = sync_result_builder.now_token
|
now_token = sync_result_builder.now_token
|
||||||
sync_config = sync_result_builder.sync_config
|
sync_config = sync_result_builder.sync_config
|
||||||
|
|
||||||
@@ -993,9 +1253,20 @@ class SyncHandler(object):
|
|||||||
|
|
||||||
ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
|
ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
|
||||||
|
|
||||||
if not (always_include or batch or account_data or ephemeral or full_state):
|
if not (always_include or batch or account_data or ephemeral):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# At this point we're guarenteed (?) to send down the room, so if we
|
||||||
|
# need to resync the entire room do so now.
|
||||||
|
if room_builder.would_require_resync:
|
||||||
|
batch = yield self._load_filtered_recents(
|
||||||
|
room_id, sync_config,
|
||||||
|
now_token=upto_token,
|
||||||
|
since_token=None,
|
||||||
|
recents=None,
|
||||||
|
newly_joined_room=newly_joined,
|
||||||
|
)
|
||||||
|
|
||||||
state = yield self.compute_state_delta(
|
state = yield self.compute_state_delta(
|
||||||
room_id, batch, sync_config, since_token, now_token,
|
room_id, batch, sync_config, since_token, now_token,
|
||||||
full_state=full_state
|
full_state=full_state
|
||||||
@@ -1010,6 +1281,7 @@ class SyncHandler(object):
|
|||||||
ephemeral=ephemeral,
|
ephemeral=ephemeral,
|
||||||
account_data=account_data_events,
|
account_data=account_data_events,
|
||||||
unread_notifications=unread_notifications,
|
unread_notifications=unread_notifications,
|
||||||
|
synced=room_builder.synced,
|
||||||
)
|
)
|
||||||
|
|
||||||
if room_sync or always_include:
|
if room_sync or always_include:
|
||||||
@@ -1034,6 +1306,90 @@ class SyncHandler(object):
|
|||||||
else:
|
else:
|
||||||
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
|
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _get_room_timestamps_at_token(self, room_ids, token, sync_config, limit):
|
||||||
|
"""For each room, get the last origin_server_ts timestamp the client
|
||||||
|
would see (after filtering) at a particular token.
|
||||||
|
|
||||||
|
Only attempts finds the latest `limit` room timestamps.
|
||||||
|
"""
|
||||||
|
room_to_entries = {}
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _get_last_ts(room_id):
|
||||||
|
entry = yield self.store.get_last_event_id_ts_for_room(
|
||||||
|
room_id, token.room_key
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: Is this ever possible?
|
||||||
|
room_to_entries[room_id] = entry if entry else {
|
||||||
|
"origin_server_ts": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
yield concurrently_execute(_get_last_ts, room_ids, 10)
|
||||||
|
|
||||||
|
if len(room_to_entries) <= limit:
|
||||||
|
defer.returnValue({
|
||||||
|
room_id: entry["origin_server_ts"]
|
||||||
|
for room_id, entry in room_to_entries.items()
|
||||||
|
})
|
||||||
|
|
||||||
|
queued_events = sorted(
|
||||||
|
room_to_entries.items(),
|
||||||
|
key=lambda e: -e[1]["origin_server_ts"]
|
||||||
|
)
|
||||||
|
|
||||||
|
to_return = {}
|
||||||
|
|
||||||
|
while len(to_return) < limit and len(queued_events) > 0:
|
||||||
|
to_fetch = queued_events[:limit - len(to_return)]
|
||||||
|
event_to_q = {
|
||||||
|
e["event_id"]: (room_id, e) for room_id, e in to_fetch
|
||||||
|
if "event_id" in e
|
||||||
|
}
|
||||||
|
|
||||||
|
# Now we fetch each event to check if its been filtered out
|
||||||
|
event_map = yield self.store.get_events(event_to_q.keys())
|
||||||
|
|
||||||
|
recents = sync_config.filter_collection.filter_room_timeline(
|
||||||
|
event_map.values()
|
||||||
|
)
|
||||||
|
recents = yield filter_events_for_client(
|
||||||
|
self.store,
|
||||||
|
sync_config.user.to_string(),
|
||||||
|
recents,
|
||||||
|
)
|
||||||
|
|
||||||
|
to_return.update({r.room_id: r.origin_server_ts for r in recents})
|
||||||
|
|
||||||
|
for ev_id in set(event_map.keys()) - set(r.event_id for r in recents):
|
||||||
|
queued_events.append(event_to_q[ev_id])
|
||||||
|
|
||||||
|
# FIXME: Need to refetch TS
|
||||||
|
queued_events.sort(key=lambda e: -e[1]["origin_server_ts"])
|
||||||
|
|
||||||
|
defer.returnValue(to_return)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _get_rooms_that_need_full_state(self, room_ids, sync_config, since_token,
|
||||||
|
pagination_state):
|
||||||
|
"""Work out which rooms we haven't sent to the client yet, so would
|
||||||
|
require us to send down the full state
|
||||||
|
"""
|
||||||
|
start_ts = yield self._get_room_timestamps_at_token(
|
||||||
|
room_ids, since_token,
|
||||||
|
sync_config=sync_config,
|
||||||
|
limit=len(room_ids),
|
||||||
|
)
|
||||||
|
|
||||||
|
missing_list = frozenset(
|
||||||
|
room_id for room_id, ts in
|
||||||
|
sorted(start_ts.items(), key=lambda item: -item[1])
|
||||||
|
if ts < pagination_state.value
|
||||||
|
)
|
||||||
|
|
||||||
|
defer.returnValue(missing_list)
|
||||||
|
|
||||||
|
|
||||||
def _action_has_highlight(actions):
|
def _action_has_highlight(actions):
|
||||||
for action in actions:
|
for action in actions:
|
||||||
@@ -1085,31 +1441,53 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
|
|||||||
|
|
||||||
class SyncResultBuilder(object):
|
class SyncResultBuilder(object):
|
||||||
"Used to help build up a new SyncResult for a user"
|
"Used to help build up a new SyncResult for a user"
|
||||||
def __init__(self, sync_config, full_state, since_token, now_token):
|
|
||||||
|
__slots__ = (
|
||||||
|
"sync_config", "full_state", "batch_token", "since_token", "pagination_state",
|
||||||
|
"now_token", "presence", "account_data", "joined", "invited", "archived",
|
||||||
|
"pagination_info", "errors", "all_joined_rooms", "unread_notifications",
|
||||||
|
)
|
||||||
|
|
||||||
|
def __init__(self, sync_config, full_state, batch_token, now_token,
|
||||||
|
all_joined_rooms):
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
sync_config(SyncConfig)
|
sync_config(SyncConfig)
|
||||||
full_state(bool): The full_state flag as specified by user
|
full_state(bool): The full_state flag as specified by user
|
||||||
since_token(StreamToken): The token supplied by user, or None.
|
batch_token(SyncNextBatchToken): The token supplied by user, or None.
|
||||||
now_token(StreamToken): The token to sync up to.
|
now_token(StreamToken): The token to sync up to.
|
||||||
|
all_joined_rooms(list(str)): List of all joined room ids.
|
||||||
"""
|
"""
|
||||||
self.sync_config = sync_config
|
self.sync_config = sync_config
|
||||||
self.full_state = full_state
|
self.full_state = full_state
|
||||||
self.since_token = since_token
|
self.batch_token = batch_token
|
||||||
|
self.since_token = batch_token.stream_token if batch_token else None
|
||||||
|
self.pagination_state = batch_token.pagination_state if batch_token else None
|
||||||
self.now_token = now_token
|
self.now_token = now_token
|
||||||
|
self.all_joined_rooms = all_joined_rooms
|
||||||
|
|
||||||
self.presence = []
|
self.presence = []
|
||||||
self.account_data = []
|
self.account_data = []
|
||||||
self.joined = []
|
self.joined = []
|
||||||
self.invited = []
|
self.invited = []
|
||||||
self.archived = []
|
self.archived = []
|
||||||
|
self.errors = []
|
||||||
|
|
||||||
|
self.pagination_info = {}
|
||||||
|
self.unread_notifications = {}
|
||||||
|
|
||||||
|
|
||||||
class RoomSyncResultBuilder(object):
|
class RoomSyncResultBuilder(object):
|
||||||
"""Stores information needed to create either a `JoinedSyncResult` or
|
"""Stores information needed to create either a `JoinedSyncResult` or
|
||||||
`ArchivedSyncResult`.
|
`ArchivedSyncResult`.
|
||||||
"""
|
"""
|
||||||
def __init__(self, room_id, rtype, events, newly_joined, full_state,
|
|
||||||
|
__slots__ = (
|
||||||
|
"room_id", "rtype", "events", "newly_joined", "since_token",
|
||||||
|
"upto_token", "always_include", "would_require_resync", "synced",
|
||||||
|
)
|
||||||
|
|
||||||
|
def __init__(self, room_id, rtype, events, newly_joined,
|
||||||
since_token, upto_token):
|
since_token, upto_token):
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
@@ -1118,7 +1496,6 @@ class RoomSyncResultBuilder(object):
|
|||||||
events(list): List of events to include in the room, (more events
|
events(list): List of events to include in the room, (more events
|
||||||
may be added when generating result).
|
may be added when generating result).
|
||||||
newly_joined(bool): If the user has newly joined the room
|
newly_joined(bool): If the user has newly joined the room
|
||||||
full_state(bool): Whether the full state should be sent in result
|
|
||||||
since_token(StreamToken): Earliest point to return events from, or None
|
since_token(StreamToken): Earliest point to return events from, or None
|
||||||
upto_token(StreamToken): Latest point to return events from.
|
upto_token(StreamToken): Latest point to return events from.
|
||||||
"""
|
"""
|
||||||
@@ -1126,6 +1503,12 @@ class RoomSyncResultBuilder(object):
|
|||||||
self.rtype = rtype
|
self.rtype = rtype
|
||||||
self.events = events
|
self.events = events
|
||||||
self.newly_joined = newly_joined
|
self.newly_joined = newly_joined
|
||||||
self.full_state = full_state
|
|
||||||
self.since_token = since_token
|
self.since_token = since_token
|
||||||
self.upto_token = upto_token
|
self.upto_token = upto_token
|
||||||
|
|
||||||
|
# Should this room always be included in the sync?
|
||||||
|
self.always_include = False
|
||||||
|
# If we send down this room, should we send down the full state?
|
||||||
|
self.would_require_resync = False
|
||||||
|
# Should the client consider this room "synced"?
|
||||||
|
self.synced = True
|
||||||
|
|||||||
@@ -36,6 +36,7 @@ REQUIREMENTS = {
|
|||||||
"blist": ["blist"],
|
"blist": ["blist"],
|
||||||
"pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"],
|
"pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"],
|
||||||
"pymacaroons-pynacl": ["pymacaroons"],
|
"pymacaroons-pynacl": ["pymacaroons"],
|
||||||
|
"cbor2": ["cbor2"],
|
||||||
}
|
}
|
||||||
CONDITIONAL_REQUIREMENTS = {
|
CONDITIONAL_REQUIREMENTS = {
|
||||||
"web_client": {
|
"web_client": {
|
||||||
|
|||||||
@@ -51,6 +51,9 @@ class SlavedAccountDataStore(BaseSlavedStore):
|
|||||||
get_updated_account_data_for_user = (
|
get_updated_account_data_for_user = (
|
||||||
DataStore.get_updated_account_data_for_user.__func__
|
DataStore.get_updated_account_data_for_user.__func__
|
||||||
)
|
)
|
||||||
|
get_room_tags_changed = (
|
||||||
|
DataStore.get_room_tags_changed.__func__
|
||||||
|
)
|
||||||
|
|
||||||
def get_max_account_data_stream_id(self):
|
def get_max_account_data_stream_id(self):
|
||||||
return self._account_data_id_gen.get_current_token()
|
return self._account_data_id_gen.get_current_token()
|
||||||
|
|||||||
@@ -144,6 +144,8 @@ class SlavedEventStore(BaseSlavedStore):
|
|||||||
_get_events_around_txn = DataStore._get_events_around_txn.__func__
|
_get_events_around_txn = DataStore._get_events_around_txn.__func__
|
||||||
_get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__
|
_get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__
|
||||||
|
|
||||||
|
get_last_event_id_ts_for_room = DataStore.get_last_event_id_ts_for_room.__func__
|
||||||
|
|
||||||
def stream_positions(self):
|
def stream_positions(self):
|
||||||
result = super(SlavedEventStore, self).stream_positions()
|
result = super(SlavedEventStore, self).stream_positions()
|
||||||
result["events"] = self._stream_id_gen.get_current_token()
|
result["events"] = self._stream_id_gen.get_current_token()
|
||||||
|
|||||||
@@ -16,10 +16,14 @@
|
|||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.http.servlet import (
|
from synapse.http.servlet import (
|
||||||
RestServlet, parse_string, parse_integer, parse_boolean
|
RestServlet, parse_string, parse_integer, parse_boolean,
|
||||||
|
parse_json_object_from_request,
|
||||||
)
|
)
|
||||||
from synapse.handlers.sync import SyncConfig
|
from synapse.handlers.sync import (
|
||||||
from synapse.types import StreamToken
|
SyncConfig, SyncPaginationConfig, SYNC_PAGINATION_TAGS_IGNORE, SyncExtras,
|
||||||
|
DEFAULT_SYNC_EXTRAS,
|
||||||
|
)
|
||||||
|
from synapse.types import SyncNextBatchToken
|
||||||
from synapse.events.utils import (
|
from synapse.events.utils import (
|
||||||
serialize_event, format_event_for_client_v2_without_room_id,
|
serialize_event, format_event_for_client_v2_without_room_id,
|
||||||
)
|
)
|
||||||
@@ -84,6 +88,94 @@ class SyncRestServlet(RestServlet):
|
|||||||
self.filtering = hs.get_filtering()
|
self.filtering = hs.get_filtering()
|
||||||
self.presence_handler = hs.get_presence_handler()
|
self.presence_handler = hs.get_presence_handler()
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def on_POST(self, request):
|
||||||
|
requester = yield self.auth.get_user_by_req(
|
||||||
|
request, allow_guest=True
|
||||||
|
)
|
||||||
|
user = requester.user
|
||||||
|
|
||||||
|
body = parse_json_object_from_request(request)
|
||||||
|
|
||||||
|
timeout = body.get("timeout", 0)
|
||||||
|
since = body.get("since", None)
|
||||||
|
|
||||||
|
extras = body.get("extras", {})
|
||||||
|
extras = SyncExtras(
|
||||||
|
paginate=extras.get("paginate", {}),
|
||||||
|
peek=extras.get("peek", {}),
|
||||||
|
)
|
||||||
|
|
||||||
|
if "from" in body:
|
||||||
|
# /events used to use 'from', but /sync uses 'since'.
|
||||||
|
# Lets be helpful and whine if we see a 'from'.
|
||||||
|
raise SynapseError(
|
||||||
|
400, "'from' is not a valid parameter. Did you mean 'since'?"
|
||||||
|
)
|
||||||
|
|
||||||
|
set_presence = body.get("set_presence", "online")
|
||||||
|
if set_presence not in self.ALLOWED_PRESENCE:
|
||||||
|
message = "Parameter 'set_presence' must be one of [%s]" % (
|
||||||
|
", ".join(repr(v) for v in self.ALLOWED_PRESENCE)
|
||||||
|
)
|
||||||
|
raise SynapseError(400, message)
|
||||||
|
|
||||||
|
full_state = body.get("full_state", False)
|
||||||
|
|
||||||
|
filter_id = body.get("filter_id", None)
|
||||||
|
filter_dict = body.get("filter", None)
|
||||||
|
pagination_config = body.get("pagination_config", None)
|
||||||
|
|
||||||
|
if filter_dict is not None and filter_id is not None:
|
||||||
|
raise SynapseError(
|
||||||
|
400,
|
||||||
|
"Can only specify one of `filter` and `filter_id` paramters"
|
||||||
|
)
|
||||||
|
|
||||||
|
if filter_id:
|
||||||
|
filter_collection = yield self.filtering.get_user_filter(
|
||||||
|
user.localpart, filter_id
|
||||||
|
)
|
||||||
|
filter_key = filter_id
|
||||||
|
elif filter_dict:
|
||||||
|
self.filtering.check_valid_filter(filter_dict)
|
||||||
|
filter_collection = FilterCollection(filter_dict)
|
||||||
|
filter_key = json.dumps(filter_dict)
|
||||||
|
else:
|
||||||
|
filter_collection = DEFAULT_FILTER_COLLECTION
|
||||||
|
filter_key = None
|
||||||
|
|
||||||
|
request_key = (user, timeout, since, filter_key, full_state)
|
||||||
|
|
||||||
|
sync_config = SyncConfig(
|
||||||
|
user=user,
|
||||||
|
filter_collection=filter_collection,
|
||||||
|
is_guest=requester.is_guest,
|
||||||
|
request_key=request_key,
|
||||||
|
pagination_config=SyncPaginationConfig(
|
||||||
|
order=pagination_config["order"],
|
||||||
|
limit=pagination_config["limit"],
|
||||||
|
tags=pagination_config.get("tags", SYNC_PAGINATION_TAGS_IGNORE),
|
||||||
|
) if pagination_config else None,
|
||||||
|
)
|
||||||
|
|
||||||
|
if since is not None:
|
||||||
|
batch_token = SyncNextBatchToken.from_string(since)
|
||||||
|
else:
|
||||||
|
batch_token = None
|
||||||
|
|
||||||
|
sync_result = yield self._handle_sync(
|
||||||
|
requester=requester,
|
||||||
|
sync_config=sync_config,
|
||||||
|
batch_token=batch_token,
|
||||||
|
set_presence=set_presence,
|
||||||
|
full_state=full_state,
|
||||||
|
timeout=timeout,
|
||||||
|
extras=extras,
|
||||||
|
)
|
||||||
|
|
||||||
|
defer.returnValue(sync_result)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_GET(self, request):
|
def on_GET(self, request):
|
||||||
if "from" in request.args:
|
if "from" in request.args:
|
||||||
@@ -107,13 +199,6 @@ class SyncRestServlet(RestServlet):
|
|||||||
filter_id = parse_string(request, "filter", default=None)
|
filter_id = parse_string(request, "filter", default=None)
|
||||||
full_state = parse_boolean(request, "full_state", default=False)
|
full_state = parse_boolean(request, "full_state", default=False)
|
||||||
|
|
||||||
logger.info(
|
|
||||||
"/sync: user=%r, timeout=%r, since=%r,"
|
|
||||||
" set_presence=%r, filter_id=%r" % (
|
|
||||||
user, timeout, since, set_presence, filter_id
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
request_key = (user, timeout, since, filter_id, full_state)
|
request_key = (user, timeout, since, filter_id, full_state)
|
||||||
|
|
||||||
if filter_id:
|
if filter_id:
|
||||||
@@ -136,15 +221,39 @@ class SyncRestServlet(RestServlet):
|
|||||||
filter_collection=filter,
|
filter_collection=filter,
|
||||||
is_guest=requester.is_guest,
|
is_guest=requester.is_guest,
|
||||||
request_key=request_key,
|
request_key=request_key,
|
||||||
|
pagination_config=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
if since is not None:
|
if since is not None:
|
||||||
since_token = StreamToken.from_string(since)
|
batch_token = SyncNextBatchToken.from_string(since)
|
||||||
else:
|
else:
|
||||||
since_token = None
|
batch_token = None
|
||||||
|
|
||||||
|
sync_result = yield self._handle_sync(
|
||||||
|
requester=requester,
|
||||||
|
sync_config=sync_config,
|
||||||
|
batch_token=batch_token,
|
||||||
|
set_presence=set_presence,
|
||||||
|
full_state=full_state,
|
||||||
|
timeout=timeout,
|
||||||
|
)
|
||||||
|
|
||||||
|
defer.returnValue(sync_result)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _handle_sync(self, requester, sync_config, batch_token, set_presence,
|
||||||
|
full_state, timeout, extras=DEFAULT_SYNC_EXTRAS):
|
||||||
affect_presence = set_presence != PresenceState.OFFLINE
|
affect_presence = set_presence != PresenceState.OFFLINE
|
||||||
|
|
||||||
|
user = sync_config.user
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"/sync: user=%r, timeout=%r, since=%r,"
|
||||||
|
" set_presence=%r" % (
|
||||||
|
user, timeout, batch_token, set_presence
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
if affect_presence:
|
if affect_presence:
|
||||||
yield self.presence_handler.set_state(user, {"presence": set_presence})
|
yield self.presence_handler.set_state(user, {"presence": set_presence})
|
||||||
|
|
||||||
@@ -153,8 +262,8 @@ class SyncRestServlet(RestServlet):
|
|||||||
)
|
)
|
||||||
with context:
|
with context:
|
||||||
sync_result = yield self.sync_handler.wait_for_sync_for_user(
|
sync_result = yield self.sync_handler.wait_for_sync_for_user(
|
||||||
sync_config, since_token=since_token, timeout=timeout,
|
sync_config, batch_token=batch_token, timeout=timeout,
|
||||||
full_state=full_state
|
full_state=full_state, extras=extras,
|
||||||
)
|
)
|
||||||
|
|
||||||
time_now = self.clock.time_msec()
|
time_now = self.clock.time_msec()
|
||||||
@@ -182,8 +291,15 @@ class SyncRestServlet(RestServlet):
|
|||||||
"leave": archived,
|
"leave": archived,
|
||||||
},
|
},
|
||||||
"next_batch": sync_result.next_batch.to_string(),
|
"next_batch": sync_result.next_batch.to_string(),
|
||||||
|
"unread_notifications": sync_result.unread_notifications,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if sync_result.errors:
|
||||||
|
response_content["rooms"]["errors"] = self.encode_errors(sync_result.errors)
|
||||||
|
|
||||||
|
if sync_result.pagination_info:
|
||||||
|
response_content["pagination_info"] = sync_result.pagination_info
|
||||||
|
|
||||||
defer.returnValue((200, response_content))
|
defer.returnValue((200, response_content))
|
||||||
|
|
||||||
def encode_presence(self, events, time_now):
|
def encode_presence(self, events, time_now):
|
||||||
@@ -194,6 +310,15 @@ class SyncRestServlet(RestServlet):
|
|||||||
formatted.append(event)
|
formatted.append(event)
|
||||||
return {"events": formatted}
|
return {"events": formatted}
|
||||||
|
|
||||||
|
def encode_errors(self, errors):
|
||||||
|
return {
|
||||||
|
e.room_id: {
|
||||||
|
"errcode": e.errcode,
|
||||||
|
"error": e.error
|
||||||
|
}
|
||||||
|
for e in errors
|
||||||
|
}
|
||||||
|
|
||||||
def encode_joined(self, rooms, time_now, token_id):
|
def encode_joined(self, rooms, time_now, token_id):
|
||||||
"""
|
"""
|
||||||
Encode the joined rooms in a sync result
|
Encode the joined rooms in a sync result
|
||||||
@@ -215,6 +340,7 @@ class SyncRestServlet(RestServlet):
|
|||||||
joined[room.room_id] = self.encode_room(
|
joined[room.room_id] = self.encode_room(
|
||||||
room, time_now, token_id
|
room, time_now, token_id
|
||||||
)
|
)
|
||||||
|
joined[room.room_id]["synced"] = room.synced
|
||||||
|
|
||||||
return joined
|
return joined
|
||||||
|
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
# Remember to update this number every time a change is made to database
|
# Remember to update this number every time a change is made to database
|
||||||
# schema files, so the users will be informed on server restarts.
|
# schema files, so the users will be informed on server restarts.
|
||||||
SCHEMA_VERSION = 32
|
SCHEMA_VERSION = 33
|
||||||
|
|
||||||
dir_path = os.path.abspath(os.path.dirname(__file__))
|
dir_path = os.path.abspath(os.path.dirname(__file__))
|
||||||
|
|
||||||
|
|||||||
24
synapse/storage/schema/delta/33/tag_changes.sql
Normal file
24
synapse/storage/schema/delta/33/tag_changes.sql
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
/* Copyright 2016 OpenMarket Ltd
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
CREATE TABLE room_tags_change_revisions(
|
||||||
|
user_id TEXT NOT NULL,
|
||||||
|
room_id TEXT NOT NULL,
|
||||||
|
stream_id BIGINT NOT NULL,
|
||||||
|
change TEXT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX room_tags_change_revisions_rm_idx ON room_tags_change_revisions(user_id, room_id, stream_id);
|
||||||
|
CREATE INDEX room_tags_change_revisions_idx ON room_tags_change_revisions(user_id, stream_id);
|
||||||
@@ -525,6 +525,36 @@ class StreamStore(SQLBaseStore):
|
|||||||
int(stream),
|
int(stream),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def get_last_event_id_ts_for_room(self, room_id, token):
|
||||||
|
"""Get the latest event_id and origin_server_ts for a room_id before a
|
||||||
|
given token.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id (str)
|
||||||
|
token (str)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with ``event_id`` and ``origin_server_ts`` keys.
|
||||||
|
"""
|
||||||
|
stream_ordering = RoomStreamToken.parse_stream_token(token).stream
|
||||||
|
|
||||||
|
sql = (
|
||||||
|
"SELECT event_id, origin_server_ts FROM events"
|
||||||
|
" WHERE room_id = ? AND stream_ordering <= ?"
|
||||||
|
" ORDER BY topological_ordering DESC, stream_ordering DESC"
|
||||||
|
" LIMIT 1"
|
||||||
|
)
|
||||||
|
|
||||||
|
def f(txn):
|
||||||
|
txn.execute(sql, (room_id, stream_ordering))
|
||||||
|
rows = self.cursor_to_dict(txn)
|
||||||
|
if rows:
|
||||||
|
return rows[0]
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return self.runInteraction("get_last_event_id_ts_for_room", f)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_events_around(self, room_id, event_id, before_limit, after_limit):
|
def get_events_around(self, room_id, event_id, before_limit, after_limit):
|
||||||
"""Retrieve events and pagination tokens around a given event in a
|
"""Retrieve events and pagination tokens around a given event in a
|
||||||
|
|||||||
@@ -17,12 +17,18 @@ from ._base import SQLBaseStore
|
|||||||
from synapse.util.caches.descriptors import cached
|
from synapse.util.caches.descriptors import cached
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
|
from collections import Counter
|
||||||
|
|
||||||
import ujson as json
|
import ujson as json
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
TAG_CHANGE_NEWLY_TAGGED = "newly_tagged"
|
||||||
|
TAG_CHANGE_ALL_REMOVED = "all_removed"
|
||||||
|
|
||||||
|
|
||||||
class TagsStore(SQLBaseStore):
|
class TagsStore(SQLBaseStore):
|
||||||
def get_max_account_data_stream_id(self):
|
def get_max_account_data_stream_id(self):
|
||||||
"""Get the current max stream id for the private user data stream
|
"""Get the current max stream id for the private user data stream
|
||||||
@@ -170,6 +176,45 @@ class TagsStore(SQLBaseStore):
|
|||||||
row["tag"]: json.loads(row["content"]) for row in rows
|
row["tag"]: json.loads(row["content"]) for row in rows
|
||||||
})
|
})
|
||||||
|
|
||||||
|
def get_room_tags_changed(self, user_id, stream_id, now_id):
|
||||||
|
"""Returns the rooms that have been newly tagged or had all their tags
|
||||||
|
removed since `stream_id`.
|
||||||
|
|
||||||
|
Collapses multiple changes into one. For example, if a room has gone
|
||||||
|
from untagged to tagged back to untagged, the room_id won't be returned.
|
||||||
|
"""
|
||||||
|
changed = self._account_data_stream_cache.has_entity_changed(
|
||||||
|
user_id, int(stream_id)
|
||||||
|
)
|
||||||
|
|
||||||
|
if not changed:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def _get_room_tags_changed(txn):
|
||||||
|
txn.execute(
|
||||||
|
"SELECT room_id, change FROM room_tags_change_revisions"
|
||||||
|
" WHERE user_id = ? AND stream_id > ? AND stream_id <= ?",
|
||||||
|
(user_id, stream_id, now_id)
|
||||||
|
)
|
||||||
|
|
||||||
|
results = Counter()
|
||||||
|
|
||||||
|
for room_id, change in txn.fetchall():
|
||||||
|
if change == TAG_CHANGE_NEWLY_TAGGED:
|
||||||
|
results[room_id] += 1
|
||||||
|
elif change == TAG_CHANGE_ALL_REMOVED:
|
||||||
|
results[room_id] -= 1
|
||||||
|
else:
|
||||||
|
logger.warn("Unexpected tag change: %r", change)
|
||||||
|
|
||||||
|
return {
|
||||||
|
room_id: TAG_CHANGE_NEWLY_TAGGED if count > 0 else TAG_CHANGE_ALL_REMOVED
|
||||||
|
for room_id, count in results.items()
|
||||||
|
if count
|
||||||
|
}
|
||||||
|
|
||||||
|
return self.runInteraction("get_room_tags_changed", _get_room_tags_changed)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def add_tag_to_room(self, user_id, room_id, tag, content):
|
def add_tag_to_room(self, user_id, room_id, tag, content):
|
||||||
"""Add a tag to a room for a user.
|
"""Add a tag to a room for a user.
|
||||||
@@ -184,6 +229,12 @@ class TagsStore(SQLBaseStore):
|
|||||||
content_json = json.dumps(content)
|
content_json = json.dumps(content)
|
||||||
|
|
||||||
def add_tag_txn(txn, next_id):
|
def add_tag_txn(txn, next_id):
|
||||||
|
txn.execute(
|
||||||
|
"SELECT count(*) FROM room_tags WHERE user_id = ? AND room_id = ?",
|
||||||
|
(user_id, room_id),
|
||||||
|
)
|
||||||
|
existing_tags, = txn.fetchone()
|
||||||
|
|
||||||
self._simple_upsert_txn(
|
self._simple_upsert_txn(
|
||||||
txn,
|
txn,
|
||||||
table="room_tags",
|
table="room_tags",
|
||||||
@@ -197,6 +248,17 @@ class TagsStore(SQLBaseStore):
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
self._update_revision_txn(txn, user_id, room_id, next_id)
|
self._update_revision_txn(txn, user_id, room_id, next_id)
|
||||||
|
if not existing_tags:
|
||||||
|
self._simple_insert_txn(
|
||||||
|
txn,
|
||||||
|
table="room_tags_change_revisions",
|
||||||
|
values={
|
||||||
|
"user_id": user_id,
|
||||||
|
"room_id": room_id,
|
||||||
|
"stream_id": next_id,
|
||||||
|
"change": TAG_CHANGE_NEWLY_TAGGED,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
with self._account_data_id_gen.get_next() as next_id:
|
with self._account_data_id_gen.get_next() as next_id:
|
||||||
yield self.runInteraction("add_tag", add_tag_txn, next_id)
|
yield self.runInteraction("add_tag", add_tag_txn, next_id)
|
||||||
@@ -218,6 +280,24 @@ class TagsStore(SQLBaseStore):
|
|||||||
" WHERE user_id = ? AND room_id = ? AND tag = ?"
|
" WHERE user_id = ? AND room_id = ? AND tag = ?"
|
||||||
)
|
)
|
||||||
txn.execute(sql, (user_id, room_id, tag))
|
txn.execute(sql, (user_id, room_id, tag))
|
||||||
|
if txn.rowcount > 0:
|
||||||
|
txn.execute(
|
||||||
|
"SELECT count(*) FROM room_tags WHERE user_id = ? AND room_id = ?",
|
||||||
|
(user_id, room_id),
|
||||||
|
)
|
||||||
|
existing_tags, = txn.fetchone()
|
||||||
|
if not existing_tags:
|
||||||
|
self._simple_insert_txn(
|
||||||
|
txn,
|
||||||
|
table="room_tags_change_revisions",
|
||||||
|
values={
|
||||||
|
"user_id": user_id,
|
||||||
|
"room_id": room_id,
|
||||||
|
"stream_id": next_id,
|
||||||
|
"change": TAG_CHANGE_ALL_REMOVED,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
self._update_revision_txn(txn, user_id, room_id, next_id)
|
self._update_revision_txn(txn, user_id, room_id, next_id)
|
||||||
|
|
||||||
with self._account_data_id_gen.get_next() as next_id:
|
with self._account_data_id_gen.get_next() as next_id:
|
||||||
|
|||||||
@@ -14,7 +14,7 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from synapse.api.errors import SynapseError
|
from synapse.api.errors import SynapseError
|
||||||
from synapse.types import StreamToken
|
from synapse.types import StreamToken, SyncNextBatchToken
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
@@ -72,14 +72,18 @@ class PaginationConfig(object):
|
|||||||
if direction not in ['f', 'b']:
|
if direction not in ['f', 'b']:
|
||||||
raise SynapseError(400, "'dir' parameter is invalid.")
|
raise SynapseError(400, "'dir' parameter is invalid.")
|
||||||
|
|
||||||
from_tok = get_param("from")
|
raw_from_tok = get_param("from")
|
||||||
to_tok = get_param("to")
|
to_tok = get_param("to")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if from_tok == "END":
|
from_tok = None
|
||||||
|
if raw_from_tok == "END":
|
||||||
from_tok = None # For backwards compat.
|
from_tok = None # For backwards compat.
|
||||||
elif from_tok:
|
elif raw_from_tok:
|
||||||
from_tok = StreamToken.from_string(from_tok)
|
try:
|
||||||
|
from_tok = SyncNextBatchToken.from_string(raw_from_tok).stream_token
|
||||||
|
except:
|
||||||
|
from_tok = StreamToken.from_string(raw_from_tok)
|
||||||
except:
|
except:
|
||||||
raise SynapseError(400, "'from' paramater is invalid")
|
raise SynapseError(400, "'from' paramater is invalid")
|
||||||
|
|
||||||
|
|||||||
@@ -17,6 +17,9 @@ from synapse.api.errors import SynapseError
|
|||||||
|
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
|
|
||||||
|
from unpaddedbase64 import encode_base64, decode_base64
|
||||||
|
import cbor2 as serializer
|
||||||
|
|
||||||
|
|
||||||
Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"])
|
Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"])
|
||||||
|
|
||||||
@@ -115,8 +118,71 @@ class EventID(DomainSpecificString):
|
|||||||
SIGIL = "$"
|
SIGIL = "$"
|
||||||
|
|
||||||
|
|
||||||
|
class SyncNextBatchToken(
|
||||||
|
namedtuple("SyncNextBatchToken", (
|
||||||
|
"stream_token",
|
||||||
|
"pagination_state",
|
||||||
|
))
|
||||||
|
):
|
||||||
|
@classmethod
|
||||||
|
def from_string(cls, string):
|
||||||
|
try:
|
||||||
|
d = serializer.loads(decode_base64(string))
|
||||||
|
pa = d.get("pa", None)
|
||||||
|
if pa:
|
||||||
|
pa = SyncPaginationState.from_dict(pa)
|
||||||
|
return cls(
|
||||||
|
stream_token=StreamToken.from_arr(d["t"]),
|
||||||
|
pagination_state=pa,
|
||||||
|
)
|
||||||
|
except:
|
||||||
|
raise SynapseError(400, "Invalid Token")
|
||||||
|
|
||||||
|
def to_string(self):
|
||||||
|
return encode_base64(serializer.dumps({
|
||||||
|
"t": self.stream_token.to_arr(),
|
||||||
|
"pa": self.pagination_state.to_dict() if self.pagination_state else None,
|
||||||
|
}))
|
||||||
|
|
||||||
|
def replace(self, **kwargs):
|
||||||
|
return self._replace(**kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
_ORDER_ENCODE = {"m.origin_server_ts": "o"}
|
||||||
|
_ORDER_DECODE = {v: k for k, v in _ORDER_ENCODE.items()}
|
||||||
|
_TAG_ENCODE = {"m.include_all": "i", "m.ignore": "x"}
|
||||||
|
_TAG_DECODE = {v: k for k, v in _TAG_ENCODE.items()}
|
||||||
|
|
||||||
|
|
||||||
|
class SyncPaginationState(
|
||||||
|
namedtuple("SyncPaginationState", (
|
||||||
|
"order",
|
||||||
|
"value",
|
||||||
|
"limit",
|
||||||
|
"tags",
|
||||||
|
))
|
||||||
|
):
|
||||||
|
@classmethod
|
||||||
|
def from_dict(cls, d):
|
||||||
|
try:
|
||||||
|
return cls(_ORDER_DECODE[d["o"]], d["v"], d["l"], _TAG_DECODE[d["t"]])
|
||||||
|
except:
|
||||||
|
raise SynapseError(400, "Invalid Token")
|
||||||
|
|
||||||
|
def to_dict(self):
|
||||||
|
return {
|
||||||
|
"o": _ORDER_ENCODE[self.order],
|
||||||
|
"v": self.value,
|
||||||
|
"l": self.limit,
|
||||||
|
"t": _TAG_ENCODE[self.tags],
|
||||||
|
}
|
||||||
|
|
||||||
|
def replace(self, **kwargs):
|
||||||
|
return self._replace(**kwargs)
|
||||||
|
|
||||||
|
|
||||||
class StreamToken(
|
class StreamToken(
|
||||||
namedtuple("Token", (
|
namedtuple("StreamToken", (
|
||||||
"room_key",
|
"room_key",
|
||||||
"presence_key",
|
"presence_key",
|
||||||
"typing_key",
|
"typing_key",
|
||||||
@@ -141,6 +207,20 @@ class StreamToken(
|
|||||||
def to_string(self):
|
def to_string(self):
|
||||||
return self._SEPARATOR.join([str(k) for k in self])
|
return self._SEPARATOR.join([str(k) for k in self])
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_arr(cls, arr):
|
||||||
|
try:
|
||||||
|
keys = arr
|
||||||
|
while len(keys) < len(cls._fields):
|
||||||
|
# i.e. old token from before receipt_key
|
||||||
|
keys.append("0")
|
||||||
|
return cls(*keys)
|
||||||
|
except:
|
||||||
|
raise SynapseError(400, "Invalid Token")
|
||||||
|
|
||||||
|
def to_arr(self):
|
||||||
|
return self
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def room_stream_id(self):
|
def room_stream_id(self):
|
||||||
# TODO(markjh): Awful hack to work around hacks in the presence tests
|
# TODO(markjh): Awful hack to work around hacks in the presence tests
|
||||||
|
|||||||
Reference in New Issue
Block a user