Compare commits

...

43 Commits

Author SHA1 Message Date
Erik Johnston
d2b59f2482 Implement top-level unread_notifications 2016-06-28 10:55:54 +01:00
Erik Johnston
6c137b321d Encode batch tokens better 2016-06-27 15:21:12 +01:00
Erik Johnston
4b7abedfd9 Comments 2016-06-27 15:10:39 +01:00
Erik Johnston
f07f99387e Use cbor 2016-06-27 14:24:50 +01:00
Erik Johnston
4c67e06dfb Use JSON instead of msgpack 2016-06-27 13:18:04 +01:00
Erik Johnston
92c58932d1 More logging 2016-06-27 11:51:06 +01:00
Erik Johnston
3263e12d73 Try serializing as json rather than msgpack 2016-06-27 11:24:58 +01:00
Erik Johnston
c0b2f33dc2 Logging 2016-06-27 10:34:52 +01:00
Erik Johnston
3ace9bdff9 Empty commit 2016-06-24 16:34:37 +01:00
Erik Johnston
434c51d538 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/paginate_sync 2016-06-24 16:09:23 +01:00
Erik Johnston
a72919b748 Add get_last_event_id_ts_for_room to slave DataStore 2016-06-24 13:54:06 +01:00
Erik Johnston
62050d2dfb Comments 2016-06-24 11:11:53 +01:00
Erik Johnston
bf0edf7a16 Make jenkins-unittests.sh install deps 2016-06-24 11:05:09 +01:00
Erik Johnston
9df5f81687 Make get_room_tags_changed take a now position. Comments 2016-06-23 17:50:30 +01:00
Erik Johnston
a7e6ad9f3e Use SyncExtras 2016-06-23 17:26:27 +01:00
Erik Johnston
6c8c061c2f Move stuff into separate function 2016-06-23 16:25:11 +01:00
Erik Johnston
7b3324e252 Get rid of per room full_state flag 2016-06-23 15:48:33 +01:00
Erik Johnston
8c3fca8b28 Correctly handle tags changing in paginated sync 2016-06-23 13:43:25 +01:00
Erik Johnston
a90140358b Change default tag handling 2016-06-23 10:40:43 +01:00
Erik Johnston
baab93b0dd Implement 'synced' flag 2016-06-22 11:40:06 +01:00
Erik Johnston
839088e2e7 Support streaming peek 2016-06-22 11:02:27 +01:00
Erik Johnston
6a101e512f Add tag handling 2016-06-22 10:59:24 +01:00
Erik Johnston
cdd379b6df Use msgpack for shorter tokens 2016-06-21 11:36:28 +01:00
Erik Johnston
3b6027dbc1 Always include tags 2016-06-21 11:18:09 +01:00
Erik Johnston
6992fb9bc1 Implement error responses 2016-06-21 10:29:44 +01:00
Erik Johnston
22dea0ca37 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/paginate_sync 2016-06-20 14:39:58 +01:00
Erik Johnston
96d6fff447 Fix 'A next_batch token can be used in the v1 messages API' 2016-06-16 11:33:53 +01:00
Erik Johnston
2b0f9bddcf Merge branch 'develop' of github.com:matrix-org/synapse into erikj/paginate_sync 2016-06-16 10:40:17 +01:00
Erik Johnston
3b52bd1cf6 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/paginate_sync 2016-06-14 15:22:21 +01:00
Erik Johnston
e5b3034fc4 Indicate if /sync was limited or not 2016-05-25 17:00:59 +01:00
Erik Johnston
43cbde4653 Basic extra include pagination impl 2016-05-25 15:54:32 +01:00
Erik Johnston
26c7f08465 Implement basic pagination 2016-05-25 10:14:38 +01:00
Erik Johnston
4902770e32 Merge branch 'erikj/sync_refactor' of github.com:matrix-org/synapse into erikj/paginate_sync 2016-05-24 16:28:51 +01:00
Erik Johnston
38d90e0d7d Add POST /sync API endpoint 2016-05-20 14:42:25 +01:00
Erik Johnston
99a7205093 Change name 2016-05-20 11:11:42 +01:00
Erik Johnston
5941346c5b Merge branch 'develop' of github.com:matrix-org/synapse into erikj/paginate_sync 2016-05-18 11:38:24 +01:00
Erik Johnston
573e51cc0b Correctly order recents 2016-05-18 11:33:26 +01:00
Erik Johnston
39182c3594 Typo 2016-05-18 11:30:01 +01:00
Erik Johnston
b999adcaa2 Filter before ordering 2016-05-18 11:28:26 +01:00
Erik Johnston
d1e9655f75 Call get_last_ts less 2016-05-17 15:37:48 +01:00
Erik Johnston
64df836067 Correctly figure out which rooms we've sent down 2016-05-17 14:23:13 +01:00
Erik Johnston
32d476d4f1 Change token format 2016-05-16 16:59:18 +01:00
Erik Johnston
a2decbdd66 Only load the last N joined room 2016-05-16 13:31:22 +01:00
13 changed files with 803 additions and 65 deletions

View File

@@ -20,6 +20,10 @@ export DUMP_COVERAGE_COMMAND="coverage help"
# UNSTABLE or FAILURE this build. # UNSTABLE or FAILURE this build.
export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?" export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?"
TOX_BIN=$WORKSPACE/.tox/py27/bin
python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install
$TOX_BIN/pip install lxml
rm .coverage* || echo "No coverage files to remove" rm .coverage* || echo "No coverage files to remove"
tox -e py27 tox -e py27

View File

@@ -44,6 +44,7 @@ class Codes(object):
THREEPID_AUTH_FAILED = "M_THREEPID_AUTH_FAILED" THREEPID_AUTH_FAILED = "M_THREEPID_AUTH_FAILED"
THREEPID_IN_USE = "THREEPID_IN_USE" THREEPID_IN_USE = "THREEPID_IN_USE"
INVALID_USERNAME = "M_INVALID_USERNAME" INVALID_USERNAME = "M_INVALID_USERNAME"
CANNOT_PEEK = "M_CANNOT_PEEK"
class CodeMessageException(RuntimeError): class CodeMessageException(RuntimeError):

View File

@@ -20,6 +20,9 @@ from synapse.util.metrics import Measure
from synapse.util.caches.response_cache import ResponseCache from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user from synapse.push.clientformat import format_push_rules_for_user
from synapse.visibility import filter_events_for_client from synapse.visibility import filter_events_for_client
from synapse.types import SyncNextBatchToken, SyncPaginationState
from synapse.api.errors import Codes, SynapseError
from synapse.storage.tags import (TAG_CHANGE_NEWLY_TAGGED, TAG_CHANGE_ALL_REMOVED)
from twisted.internet import defer from twisted.internet import defer
@@ -35,9 +38,48 @@ SyncConfig = collections.namedtuple("SyncConfig", [
"filter_collection", "filter_collection",
"is_guest", "is_guest",
"request_key", "request_key",
"pagination_config",
]) ])
class SyncPaginationConfig(collections.namedtuple("SyncPaginationConfig", [
"order",
"limit",
"tags",
])):
"Initial pagination configuration from initial sync."
def __init__(self, order, limit, tags):
if order not in SYNC_PAGINATION_VALID_ORDERS:
raise SynapseError(400, "Invalid 'order'")
if tags not in SYNC_PAGINATION_VALID_TAGS_OPTIONS:
raise SynapseError(400, "Invalid 'tags'")
try:
limit = int(limit)
except:
raise SynapseError(400, "Invalid 'limit'")
super(SyncPaginationConfig, self).__init__(order, limit, tags)
SYNC_PAGINATION_TAGS_INCLUDE_ALL = "m.include_all"
SYNC_PAGINATION_TAGS_IGNORE = "m.ignore"
SYNC_PAGINATION_VALID_TAGS_OPTIONS = (
SYNC_PAGINATION_TAGS_INCLUDE_ALL, SYNC_PAGINATION_TAGS_IGNORE,
)
SYNC_PAGINATION_ORDER_TS = "m.origin_server_ts"
SYNC_PAGINATION_VALID_ORDERS = (SYNC_PAGINATION_ORDER_TS,)
SyncExtras = collections.namedtuple("SyncExtras", [
"paginate", # dict with "limit" key
"peek", # dict of room_id -> dict
])
DEFAULT_SYNC_EXTRAS = SyncExtras(paginate={}, peek={})
class TimelineBatch(collections.namedtuple("TimelineBatch", [ class TimelineBatch(collections.namedtuple("TimelineBatch", [
"prev_batch", "prev_batch",
"events", "events",
@@ -59,6 +101,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
"ephemeral", "ephemeral",
"account_data", "account_data",
"unread_notifications", "unread_notifications",
"synced", # bool
])): ])):
__slots__ = [] __slots__ = []
@@ -106,6 +149,18 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
return True return True
class ErrorSyncResult(collections.namedtuple("ErrorSyncResult", [
"room_id", # str
"errcode", # str
"error", # str
])):
__slots__ = []
def __nonzero__(self):
"""Errors should always be reported to the client"""
return True
class SyncResult(collections.namedtuple("SyncResult", [ class SyncResult(collections.namedtuple("SyncResult", [
"next_batch", # Token for the next sync "next_batch", # Token for the next sync
"presence", # List of presence events for the user. "presence", # List of presence events for the user.
@@ -113,6 +168,9 @@ class SyncResult(collections.namedtuple("SyncResult", [
"joined", # JoinedSyncResult for each joined room. "joined", # JoinedSyncResult for each joined room.
"invited", # InvitedSyncResult for each invited room. "invited", # InvitedSyncResult for each invited room.
"archived", # ArchivedSyncResult for each archived room. "archived", # ArchivedSyncResult for each archived room.
"errors", # ErrorSyncResult
"pagination_info",
"unread_notifications",
])): ])):
__slots__ = [] __slots__ = []
@@ -140,8 +198,8 @@ class SyncHandler(object):
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.response_cache = ResponseCache() self.response_cache = ResponseCache()
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0,
full_state=False): full_state=False, extras=DEFAULT_SYNC_EXTRAS):
"""Get the sync for a client if we have new data for it now. Otherwise """Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result. return an empty sync result.
@@ -153,48 +211,42 @@ class SyncHandler(object):
result = self.response_cache.set( result = self.response_cache.set(
sync_config.request_key, sync_config.request_key,
self._wait_for_sync_for_user( self._wait_for_sync_for_user(
sync_config, since_token, timeout, full_state sync_config, batch_token, timeout, full_state, extras,
) )
) )
return result return result
@defer.inlineCallbacks @defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, since_token, timeout, def _wait_for_sync_for_user(self, sync_config, batch_token, timeout,
full_state): full_state, extras=DEFAULT_SYNC_EXTRAS):
context = LoggingContext.current_context() context = LoggingContext.current_context()
if context: if context:
if since_token is None: if batch_token is None:
context.tag = "initial_sync" context.tag = "initial_sync"
elif full_state: elif full_state:
context.tag = "full_state_sync" context.tag = "full_state_sync"
else: else:
context.tag = "incremental_sync" context.tag = "incremental_sync"
if timeout == 0 or since_token is None or full_state: if timeout == 0 or batch_token is None or full_state:
# we are going to return immediately, so don't bother calling # we are going to return immediately, so don't bother calling
# notifier.wait_for_events. # notifier.wait_for_events.
result = yield self.current_sync_for_user( result = yield self.generate_sync_result(
sync_config, since_token, full_state=full_state, sync_config, batch_token, full_state=full_state, extras=extras,
) )
defer.returnValue(result) defer.returnValue(result)
else: else:
def current_sync_callback(before_token, after_token): def current_sync_callback(before_token, after_token):
return self.current_sync_for_user(sync_config, since_token) return self.generate_sync_result(
sync_config, batch_token, full_state=False, extras=extras,
)
result = yield self.notifier.wait_for_events( result = yield self.notifier.wait_for_events(
sync_config.user.to_string(), timeout, current_sync_callback, sync_config.user.to_string(), timeout, current_sync_callback,
from_token=since_token, from_token=batch_token.stream_token,
) )
defer.returnValue(result) defer.returnValue(result)
def current_sync_for_user(self, sync_config, since_token=None,
full_state=False):
"""Get the sync for client needed to match what the server has now.
Returns:
A Deferred SyncResult.
"""
return self.generate_sync_result(sync_config, since_token, full_state)
@defer.inlineCallbacks @defer.inlineCallbacks
def push_rules_for_user(self, user): def push_rules_for_user(self, user):
user_id = user.to_string() user_id = user.to_string()
@@ -490,13 +542,15 @@ class SyncHandler(object):
defer.returnValue(None) defer.returnValue(None)
@defer.inlineCallbacks @defer.inlineCallbacks
def generate_sync_result(self, sync_config, since_token=None, full_state=False): def generate_sync_result(self, sync_config, batch_token=None, full_state=False,
extras=DEFAULT_SYNC_EXTRAS):
"""Generates a sync result. """Generates a sync result.
Args: Args:
sync_config (SyncConfig) sync_config (SyncConfig)
since_token (StreamToken) since_token (StreamToken)
full_state (bool) full_state (bool)
extras (SyncExtras)
Returns: Returns:
Deferred(SyncResult) Deferred(SyncResult)
@@ -508,10 +562,16 @@ class SyncHandler(object):
# Always use the `now_token` in `SyncResultBuilder` # Always use the `now_token` in `SyncResultBuilder`
now_token = yield self.event_sources.get_current_token() now_token = yield self.event_sources.get_current_token()
all_joined_rooms = yield self.store.get_rooms_for_user(
sync_config.user.to_string()
)
all_joined_rooms = [room.room_id for room in all_joined_rooms]
sync_result_builder = SyncResultBuilder( sync_result_builder = SyncResultBuilder(
sync_config, full_state, sync_config, full_state,
since_token=since_token, batch_token=batch_token,
now_token=now_token, now_token=now_token,
all_joined_rooms=all_joined_rooms,
) )
account_data_by_room = yield self._generate_sync_entry_for_account_data( account_data_by_room = yield self._generate_sync_entry_for_account_data(
@@ -519,7 +579,7 @@ class SyncHandler(object):
) )
res = yield self._generate_sync_entry_for_rooms( res = yield self._generate_sync_entry_for_rooms(
sync_result_builder, account_data_by_room sync_result_builder, account_data_by_room, extras,
) )
newly_joined_rooms, newly_joined_users = res newly_joined_rooms, newly_joined_users = res
@@ -527,15 +587,55 @@ class SyncHandler(object):
sync_result_builder, newly_joined_rooms, newly_joined_users sync_result_builder, newly_joined_rooms, newly_joined_users
) )
yield self._generate_notification_counts(sync_result_builder)
defer.returnValue(SyncResult( defer.returnValue(SyncResult(
presence=sync_result_builder.presence, presence=sync_result_builder.presence,
account_data=sync_result_builder.account_data, account_data=sync_result_builder.account_data,
joined=sync_result_builder.joined, joined=sync_result_builder.joined,
invited=sync_result_builder.invited, invited=sync_result_builder.invited,
archived=sync_result_builder.archived, archived=sync_result_builder.archived,
next_batch=sync_result_builder.now_token, errors=sync_result_builder.errors,
next_batch=SyncNextBatchToken(
stream_token=sync_result_builder.now_token,
pagination_state=sync_result_builder.pagination_state,
),
pagination_info=sync_result_builder.pagination_info,
unread_notifications=sync_result_builder.unread_notifications,
)) ))
@defer.inlineCallbacks
def _generate_notification_counts(self, sync_result_builder):
rooms = sync_result_builder.all_joined_rooms
total_notif_count = [0]
rooms_with_notifs = set()
total_highlight_count = [0]
rooms_with_highlights = set()
@defer.inlineCallbacks
def notif_for_room(room_id):
notifs = yield self.unread_notifs_for_room_id(
room_id, sync_result_builder.sync_config
)
if notifs is not None:
total_notif_count[0] += notifs["notify_count"]
total_highlight_count[0] += notifs["highlight_count"]
if notifs["notify_count"]:
rooms_with_notifs.add(room_id)
if notifs["highlight_count"]:
rooms_with_highlights.add(room_id)
yield concurrently_execute(notif_for_room, rooms, 10)
sync_result_builder.unread_notifications = {
"total_notification_count": total_notif_count[0],
"rooms_notification_count": len(rooms_with_notifs),
"total_highlight_count": total_highlight_count[0],
"rooms_highlight_count": len(rooms_with_highlights),
}
@defer.inlineCallbacks @defer.inlineCallbacks
def _generate_sync_entry_for_account_data(self, sync_result_builder): def _generate_sync_entry_for_account_data(self, sync_result_builder):
"""Generates the account data portion of the sync response. Populates """Generates the account data portion of the sync response. Populates
@@ -646,7 +746,8 @@ class SyncHandler(object):
sync_result_builder.presence = presence sync_result_builder.presence = presence
@defer.inlineCallbacks @defer.inlineCallbacks
def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room): def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room,
extras):
"""Generates the rooms portion of the sync response. Populates the """Generates the rooms portion of the sync response. Populates the
`sync_result_builder` with the result. `sync_result_builder` with the result.
@@ -690,6 +791,12 @@ class SyncHandler(object):
tags_by_room = yield self.store.get_tags_for_user(user_id) tags_by_room = yield self.store.get_tags_for_user(user_id)
yield self._update_room_entries_for_paginated_sync(
sync_result_builder, room_entries, extras
)
sync_result_builder.full_state |= sync_result_builder.since_token is None
def handle_room_entries(room_entry): def handle_room_entries(room_entry):
return self._generate_room_entry( return self._generate_room_entry(
sync_result_builder, sync_result_builder,
@@ -698,7 +805,6 @@ class SyncHandler(object):
ephemeral=ephemeral_by_room.get(room_entry.room_id, []), ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
tags=tags_by_room.get(room_entry.room_id), tags=tags_by_room.get(room_entry.room_id),
account_data=account_data_by_room.get(room_entry.room_id, {}), account_data=account_data_by_room.get(room_entry.room_id, {}),
always_include=sync_result_builder.full_state,
) )
yield concurrently_execute(handle_room_entries, room_entries, 10) yield concurrently_execute(handle_room_entries, room_entries, 10)
@@ -719,6 +825,162 @@ class SyncHandler(object):
defer.returnValue((newly_joined_rooms, newly_joined_users)) defer.returnValue((newly_joined_rooms, newly_joined_users))
@defer.inlineCallbacks
def _update_room_entries_for_paginated_sync(self, sync_result_builder,
room_entries, extras):
"""Works out which room_entries should be synced to the client, which
would need to be resynced if they were sent down, etc.
Mutates room_entries.
Args:
sync_result_builder (SyncResultBuilder)
room_entries (list(RoomSyncResultBuilder))
extras (SyncExtras)
"""
user_id = sync_result_builder.sync_config.user.to_string()
sync_config = sync_result_builder.sync_config
if sync_config.pagination_config:
pagination_config = sync_config.pagination_config
old_pagination_value = 0
include_all_tags = pagination_config.tags == SYNC_PAGINATION_TAGS_INCLUDE_ALL
elif sync_result_builder.pagination_state:
pagination_config = SyncPaginationConfig(
order=sync_result_builder.pagination_state.order,
limit=sync_result_builder.pagination_state.limit,
tags=sync_result_builder.pagination_state.tags,
)
old_pagination_value = sync_result_builder.pagination_state.value
include_all_tags = pagination_config.tags == SYNC_PAGINATION_TAGS_INCLUDE_ALL
else:
pagination_config = None
old_pagination_value = 0
include_all_tags = False
if sync_result_builder.pagination_state:
missing_state = yield self._get_rooms_that_need_full_state(
room_ids=[r.room_id for r in room_entries],
sync_config=sync_config,
since_token=sync_result_builder.since_token,
pagination_state=sync_result_builder.pagination_state,
)
all_tags = yield self.store.get_tags_for_user(user_id)
if sync_result_builder.since_token:
stream_id = sync_result_builder.since_token.account_data_key
now_stream_id = sync_result_builder.now_token.account_data_key
tag_changes = yield self.store.get_room_tags_changed(
user_id, stream_id, now_stream_id
)
else:
tag_changes = {}
if missing_state:
for r in room_entries:
if r.room_id in missing_state:
if include_all_tags:
# If we're always including tagged rooms, then only
# resync rooms which are newly tagged.
change = tag_changes.get(r.room_id)
if change == TAG_CHANGE_NEWLY_TAGGED:
r.always_include = True
r.would_require_resync = True
r.synced = True
continue
elif change == TAG_CHANGE_ALL_REMOVED:
r.always_include = True
r.synced = False
continue
elif r.room_id in all_tags:
r.always_include = True
continue
if r.room_id in extras.peek:
since = extras.peek[r.room_id].get("since", None)
if since:
tok = SyncNextBatchToken.from_string(since)
r.since_token = tok.stream_token
else:
r.always_include = True
r.would_require_resync = True
r.synced = False
else:
r.would_require_resync = True
elif pagination_config and include_all_tags:
all_tags = yield self.store.get_tags_for_user(user_id)
for r in room_entries:
if r.room_id in all_tags:
r.always_include = True
for room_id in set(extras.peek.keys()) - {r.room_id for r in room_entries}:
sync_result_builder.errors.append(ErrorSyncResult(
room_id=room_id,
errcode=Codes.CANNOT_PEEK,
error="Cannot peek into requested room",
))
if pagination_config:
room_ids = [r.room_id for r in room_entries]
pagination_limit = pagination_config.limit
extra_limit = extras.paginate.get("limit", 0)
room_map = yield self._get_room_timestamps_at_token(
room_ids, sync_result_builder.now_token, sync_config,
pagination_limit + extra_limit + 1,
)
limited = False
if room_map:
sorted_list = sorted(
room_map.items(),
key=lambda item: -item[1]
)
cutoff_list = sorted_list[:pagination_limit + extra_limit]
if cutoff_list[pagination_limit:]:
new_room_ids = set(r[0] for r in cutoff_list[pagination_limit:])
for r in room_entries:
if r.room_id in new_room_ids:
r.always_include = True
r.would_require_resync = True
_, bottom_ts = cutoff_list[-1]
new_pagination_value = bottom_ts
# We're limited if there are any rooms that are after cutoff
# in the list, but still have an origin server ts from after
# the pagination value from the since token.
limited = any(
old_pagination_value < r[1]
for r in sorted_list[pagination_limit + extra_limit:]
)
sync_result_builder.pagination_state = SyncPaginationState(
order=pagination_config.order, value=new_pagination_value,
limit=pagination_limit + extra_limit,
tags=pagination_config.tags,
)
to_sync_map = dict(cutoff_list)
else:
to_sync_map = {}
sync_result_builder.pagination_info["limited"] = limited
if len(room_map) == len(room_entries):
sync_result_builder.pagination_state = None
room_entries[:] = [
r for r in room_entries
if r.room_id in to_sync_map or r.always_include
]
@defer.inlineCallbacks @defer.inlineCallbacks
def _get_rooms_changed(self, sync_result_builder, ignored_users): def _get_rooms_changed(self, sync_result_builder, ignored_users):
"""Gets the the changes that have happened since the last sync. """Gets the the changes that have happened since the last sync.
@@ -809,7 +1071,6 @@ class SyncHandler(object):
rtype="archived", rtype="archived",
events=None, events=None,
newly_joined=room_id in newly_joined_rooms, newly_joined=room_id in newly_joined_rooms,
full_state=False,
since_token=since_token, since_token=since_token,
upto_token=leave_token, upto_token=leave_token,
)) ))
@@ -839,7 +1100,6 @@ class SyncHandler(object):
rtype="joined", rtype="joined",
events=events, events=events,
newly_joined=room_id in newly_joined_rooms, newly_joined=room_id in newly_joined_rooms,
full_state=False,
since_token=None if room_id in newly_joined_rooms else since_token, since_token=None if room_id in newly_joined_rooms else since_token,
upto_token=prev_batch_token, upto_token=prev_batch_token,
)) ))
@@ -849,7 +1109,6 @@ class SyncHandler(object):
rtype="joined", rtype="joined",
events=[], events=[],
newly_joined=room_id in newly_joined_rooms, newly_joined=room_id in newly_joined_rooms,
full_state=False,
since_token=since_token, since_token=since_token,
upto_token=since_token, upto_token=since_token,
)) ))
@@ -893,7 +1152,6 @@ class SyncHandler(object):
rtype="joined", rtype="joined",
events=None, events=None,
newly_joined=False, newly_joined=False,
full_state=True,
since_token=since_token, since_token=since_token,
upto_token=now_token, upto_token=now_token,
)) ))
@@ -920,7 +1178,6 @@ class SyncHandler(object):
rtype="archived", rtype="archived",
events=None, events=None,
newly_joined=False, newly_joined=False,
full_state=True,
since_token=since_token, since_token=since_token,
upto_token=leave_token, upto_token=leave_token,
)) ))
@@ -929,8 +1186,7 @@ class SyncHandler(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _generate_room_entry(self, sync_result_builder, ignored_users, def _generate_room_entry(self, sync_result_builder, ignored_users,
room_builder, ephemeral, tags, account_data, room_builder, ephemeral, tags, account_data):
always_include=False):
"""Populates the `joined` and `archived` section of `sync_result_builder` """Populates the `joined` and `archived` section of `sync_result_builder`
based on the `room_builder`. based on the `room_builder`.
@@ -946,19 +1202,23 @@ class SyncHandler(object):
even if empty. even if empty.
""" """
newly_joined = room_builder.newly_joined newly_joined = room_builder.newly_joined
full_state = ( always_include = (
room_builder.full_state newly_joined
or newly_joined
or sync_result_builder.full_state or sync_result_builder.full_state
or room_builder.always_include
)
full_state = (
newly_joined
or sync_result_builder.full_state
or room_builder.would_require_resync
) )
events = room_builder.events events = room_builder.events
# We want to shortcut out as early as possible. # We want to shortcut out as early as possible.
if not (always_include or account_data or ephemeral or full_state): if not (always_include or account_data or ephemeral):
if events == [] and tags is None: if events == [] and tags is None:
return return
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config sync_config = sync_result_builder.sync_config
@@ -993,9 +1253,20 @@ class SyncHandler(object):
ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral) ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
if not (always_include or batch or account_data or ephemeral or full_state): if not (always_include or batch or account_data or ephemeral):
return return
# At this point we're guarenteed (?) to send down the room, so if we
# need to resync the entire room do so now.
if room_builder.would_require_resync:
batch = yield self._load_filtered_recents(
room_id, sync_config,
now_token=upto_token,
since_token=None,
recents=None,
newly_joined_room=newly_joined,
)
state = yield self.compute_state_delta( state = yield self.compute_state_delta(
room_id, batch, sync_config, since_token, now_token, room_id, batch, sync_config, since_token, now_token,
full_state=full_state full_state=full_state
@@ -1010,6 +1281,7 @@ class SyncHandler(object):
ephemeral=ephemeral, ephemeral=ephemeral,
account_data=account_data_events, account_data=account_data_events,
unread_notifications=unread_notifications, unread_notifications=unread_notifications,
synced=room_builder.synced,
) )
if room_sync or always_include: if room_sync or always_include:
@@ -1034,6 +1306,90 @@ class SyncHandler(object):
else: else:
raise Exception("Unrecognized rtype: %r", room_builder.rtype) raise Exception("Unrecognized rtype: %r", room_builder.rtype)
@defer.inlineCallbacks
def _get_room_timestamps_at_token(self, room_ids, token, sync_config, limit):
"""For each room, get the last origin_server_ts timestamp the client
would see (after filtering) at a particular token.
Only attempts finds the latest `limit` room timestamps.
"""
room_to_entries = {}
@defer.inlineCallbacks
def _get_last_ts(room_id):
entry = yield self.store.get_last_event_id_ts_for_room(
room_id, token.room_key
)
# TODO: Is this ever possible?
room_to_entries[room_id] = entry if entry else {
"origin_server_ts": 0,
}
yield concurrently_execute(_get_last_ts, room_ids, 10)
if len(room_to_entries) <= limit:
defer.returnValue({
room_id: entry["origin_server_ts"]
for room_id, entry in room_to_entries.items()
})
queued_events = sorted(
room_to_entries.items(),
key=lambda e: -e[1]["origin_server_ts"]
)
to_return = {}
while len(to_return) < limit and len(queued_events) > 0:
to_fetch = queued_events[:limit - len(to_return)]
event_to_q = {
e["event_id"]: (room_id, e) for room_id, e in to_fetch
if "event_id" in e
}
# Now we fetch each event to check if its been filtered out
event_map = yield self.store.get_events(event_to_q.keys())
recents = sync_config.filter_collection.filter_room_timeline(
event_map.values()
)
recents = yield filter_events_for_client(
self.store,
sync_config.user.to_string(),
recents,
)
to_return.update({r.room_id: r.origin_server_ts for r in recents})
for ev_id in set(event_map.keys()) - set(r.event_id for r in recents):
queued_events.append(event_to_q[ev_id])
# FIXME: Need to refetch TS
queued_events.sort(key=lambda e: -e[1]["origin_server_ts"])
defer.returnValue(to_return)
@defer.inlineCallbacks
def _get_rooms_that_need_full_state(self, room_ids, sync_config, since_token,
pagination_state):
"""Work out which rooms we haven't sent to the client yet, so would
require us to send down the full state
"""
start_ts = yield self._get_room_timestamps_at_token(
room_ids, since_token,
sync_config=sync_config,
limit=len(room_ids),
)
missing_list = frozenset(
room_id for room_id, ts in
sorted(start_ts.items(), key=lambda item: -item[1])
if ts < pagination_state.value
)
defer.returnValue(missing_list)
def _action_has_highlight(actions): def _action_has_highlight(actions):
for action in actions: for action in actions:
@@ -1085,31 +1441,53 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
class SyncResultBuilder(object): class SyncResultBuilder(object):
"Used to help build up a new SyncResult for a user" "Used to help build up a new SyncResult for a user"
def __init__(self, sync_config, full_state, since_token, now_token):
__slots__ = (
"sync_config", "full_state", "batch_token", "since_token", "pagination_state",
"now_token", "presence", "account_data", "joined", "invited", "archived",
"pagination_info", "errors", "all_joined_rooms", "unread_notifications",
)
def __init__(self, sync_config, full_state, batch_token, now_token,
all_joined_rooms):
""" """
Args: Args:
sync_config(SyncConfig) sync_config(SyncConfig)
full_state(bool): The full_state flag as specified by user full_state(bool): The full_state flag as specified by user
since_token(StreamToken): The token supplied by user, or None. batch_token(SyncNextBatchToken): The token supplied by user, or None.
now_token(StreamToken): The token to sync up to. now_token(StreamToken): The token to sync up to.
all_joined_rooms(list(str)): List of all joined room ids.
""" """
self.sync_config = sync_config self.sync_config = sync_config
self.full_state = full_state self.full_state = full_state
self.since_token = since_token self.batch_token = batch_token
self.since_token = batch_token.stream_token if batch_token else None
self.pagination_state = batch_token.pagination_state if batch_token else None
self.now_token = now_token self.now_token = now_token
self.all_joined_rooms = all_joined_rooms
self.presence = [] self.presence = []
self.account_data = [] self.account_data = []
self.joined = [] self.joined = []
self.invited = [] self.invited = []
self.archived = [] self.archived = []
self.errors = []
self.pagination_info = {}
self.unread_notifications = {}
class RoomSyncResultBuilder(object): class RoomSyncResultBuilder(object):
"""Stores information needed to create either a `JoinedSyncResult` or """Stores information needed to create either a `JoinedSyncResult` or
`ArchivedSyncResult`. `ArchivedSyncResult`.
""" """
def __init__(self, room_id, rtype, events, newly_joined, full_state,
__slots__ = (
"room_id", "rtype", "events", "newly_joined", "since_token",
"upto_token", "always_include", "would_require_resync", "synced",
)
def __init__(self, room_id, rtype, events, newly_joined,
since_token, upto_token): since_token, upto_token):
""" """
Args: Args:
@@ -1118,7 +1496,6 @@ class RoomSyncResultBuilder(object):
events(list): List of events to include in the room, (more events events(list): List of events to include in the room, (more events
may be added when generating result). may be added when generating result).
newly_joined(bool): If the user has newly joined the room newly_joined(bool): If the user has newly joined the room
full_state(bool): Whether the full state should be sent in result
since_token(StreamToken): Earliest point to return events from, or None since_token(StreamToken): Earliest point to return events from, or None
upto_token(StreamToken): Latest point to return events from. upto_token(StreamToken): Latest point to return events from.
""" """
@@ -1126,6 +1503,12 @@ class RoomSyncResultBuilder(object):
self.rtype = rtype self.rtype = rtype
self.events = events self.events = events
self.newly_joined = newly_joined self.newly_joined = newly_joined
self.full_state = full_state
self.since_token = since_token self.since_token = since_token
self.upto_token = upto_token self.upto_token = upto_token
# Should this room always be included in the sync?
self.always_include = False
# If we send down this room, should we send down the full state?
self.would_require_resync = False
# Should the client consider this room "synced"?
self.synced = True

View File

@@ -36,6 +36,7 @@ REQUIREMENTS = {
"blist": ["blist"], "blist": ["blist"],
"pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"], "pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"],
"pymacaroons-pynacl": ["pymacaroons"], "pymacaroons-pynacl": ["pymacaroons"],
"cbor2": ["cbor2"],
} }
CONDITIONAL_REQUIREMENTS = { CONDITIONAL_REQUIREMENTS = {
"web_client": { "web_client": {

View File

@@ -51,6 +51,9 @@ class SlavedAccountDataStore(BaseSlavedStore):
get_updated_account_data_for_user = ( get_updated_account_data_for_user = (
DataStore.get_updated_account_data_for_user.__func__ DataStore.get_updated_account_data_for_user.__func__
) )
get_room_tags_changed = (
DataStore.get_room_tags_changed.__func__
)
def get_max_account_data_stream_id(self): def get_max_account_data_stream_id(self):
return self._account_data_id_gen.get_current_token() return self._account_data_id_gen.get_current_token()

View File

@@ -144,6 +144,8 @@ class SlavedEventStore(BaseSlavedStore):
_get_events_around_txn = DataStore._get_events_around_txn.__func__ _get_events_around_txn = DataStore._get_events_around_txn.__func__
_get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__ _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__
get_last_event_id_ts_for_room = DataStore.get_last_event_id_ts_for_room.__func__
def stream_positions(self): def stream_positions(self):
result = super(SlavedEventStore, self).stream_positions() result = super(SlavedEventStore, self).stream_positions()
result["events"] = self._stream_id_gen.get_current_token() result["events"] = self._stream_id_gen.get_current_token()

View File

@@ -16,10 +16,14 @@
from twisted.internet import defer from twisted.internet import defer
from synapse.http.servlet import ( from synapse.http.servlet import (
RestServlet, parse_string, parse_integer, parse_boolean RestServlet, parse_string, parse_integer, parse_boolean,
parse_json_object_from_request,
) )
from synapse.handlers.sync import SyncConfig from synapse.handlers.sync import (
from synapse.types import StreamToken SyncConfig, SyncPaginationConfig, SYNC_PAGINATION_TAGS_IGNORE, SyncExtras,
DEFAULT_SYNC_EXTRAS,
)
from synapse.types import SyncNextBatchToken
from synapse.events.utils import ( from synapse.events.utils import (
serialize_event, format_event_for_client_v2_without_room_id, serialize_event, format_event_for_client_v2_without_room_id,
) )
@@ -84,6 +88,94 @@ class SyncRestServlet(RestServlet):
self.filtering = hs.get_filtering() self.filtering = hs.get_filtering()
self.presence_handler = hs.get_presence_handler() self.presence_handler = hs.get_presence_handler()
@defer.inlineCallbacks
def on_POST(self, request):
requester = yield self.auth.get_user_by_req(
request, allow_guest=True
)
user = requester.user
body = parse_json_object_from_request(request)
timeout = body.get("timeout", 0)
since = body.get("since", None)
extras = body.get("extras", {})
extras = SyncExtras(
paginate=extras.get("paginate", {}),
peek=extras.get("peek", {}),
)
if "from" in body:
# /events used to use 'from', but /sync uses 'since'.
# Lets be helpful and whine if we see a 'from'.
raise SynapseError(
400, "'from' is not a valid parameter. Did you mean 'since'?"
)
set_presence = body.get("set_presence", "online")
if set_presence not in self.ALLOWED_PRESENCE:
message = "Parameter 'set_presence' must be one of [%s]" % (
", ".join(repr(v) for v in self.ALLOWED_PRESENCE)
)
raise SynapseError(400, message)
full_state = body.get("full_state", False)
filter_id = body.get("filter_id", None)
filter_dict = body.get("filter", None)
pagination_config = body.get("pagination_config", None)
if filter_dict is not None and filter_id is not None:
raise SynapseError(
400,
"Can only specify one of `filter` and `filter_id` paramters"
)
if filter_id:
filter_collection = yield self.filtering.get_user_filter(
user.localpart, filter_id
)
filter_key = filter_id
elif filter_dict:
self.filtering.check_valid_filter(filter_dict)
filter_collection = FilterCollection(filter_dict)
filter_key = json.dumps(filter_dict)
else:
filter_collection = DEFAULT_FILTER_COLLECTION
filter_key = None
request_key = (user, timeout, since, filter_key, full_state)
sync_config = SyncConfig(
user=user,
filter_collection=filter_collection,
is_guest=requester.is_guest,
request_key=request_key,
pagination_config=SyncPaginationConfig(
order=pagination_config["order"],
limit=pagination_config["limit"],
tags=pagination_config.get("tags", SYNC_PAGINATION_TAGS_IGNORE),
) if pagination_config else None,
)
if since is not None:
batch_token = SyncNextBatchToken.from_string(since)
else:
batch_token = None
sync_result = yield self._handle_sync(
requester=requester,
sync_config=sync_config,
batch_token=batch_token,
set_presence=set_presence,
full_state=full_state,
timeout=timeout,
extras=extras,
)
defer.returnValue(sync_result)
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request): def on_GET(self, request):
if "from" in request.args: if "from" in request.args:
@@ -107,13 +199,6 @@ class SyncRestServlet(RestServlet):
filter_id = parse_string(request, "filter", default=None) filter_id = parse_string(request, "filter", default=None)
full_state = parse_boolean(request, "full_state", default=False) full_state = parse_boolean(request, "full_state", default=False)
logger.info(
"/sync: user=%r, timeout=%r, since=%r,"
" set_presence=%r, filter_id=%r" % (
user, timeout, since, set_presence, filter_id
)
)
request_key = (user, timeout, since, filter_id, full_state) request_key = (user, timeout, since, filter_id, full_state)
if filter_id: if filter_id:
@@ -136,15 +221,39 @@ class SyncRestServlet(RestServlet):
filter_collection=filter, filter_collection=filter,
is_guest=requester.is_guest, is_guest=requester.is_guest,
request_key=request_key, request_key=request_key,
pagination_config=None,
) )
if since is not None: if since is not None:
since_token = StreamToken.from_string(since) batch_token = SyncNextBatchToken.from_string(since)
else: else:
since_token = None batch_token = None
sync_result = yield self._handle_sync(
requester=requester,
sync_config=sync_config,
batch_token=batch_token,
set_presence=set_presence,
full_state=full_state,
timeout=timeout,
)
defer.returnValue(sync_result)
@defer.inlineCallbacks
def _handle_sync(self, requester, sync_config, batch_token, set_presence,
full_state, timeout, extras=DEFAULT_SYNC_EXTRAS):
affect_presence = set_presence != PresenceState.OFFLINE affect_presence = set_presence != PresenceState.OFFLINE
user = sync_config.user
logger.info(
"/sync: user=%r, timeout=%r, since=%r,"
" set_presence=%r" % (
user, timeout, batch_token, set_presence
)
)
if affect_presence: if affect_presence:
yield self.presence_handler.set_state(user, {"presence": set_presence}) yield self.presence_handler.set_state(user, {"presence": set_presence})
@@ -153,8 +262,8 @@ class SyncRestServlet(RestServlet):
) )
with context: with context:
sync_result = yield self.sync_handler.wait_for_sync_for_user( sync_result = yield self.sync_handler.wait_for_sync_for_user(
sync_config, since_token=since_token, timeout=timeout, sync_config, batch_token=batch_token, timeout=timeout,
full_state=full_state full_state=full_state, extras=extras,
) )
time_now = self.clock.time_msec() time_now = self.clock.time_msec()
@@ -182,8 +291,15 @@ class SyncRestServlet(RestServlet):
"leave": archived, "leave": archived,
}, },
"next_batch": sync_result.next_batch.to_string(), "next_batch": sync_result.next_batch.to_string(),
"unread_notifications": sync_result.unread_notifications,
} }
if sync_result.errors:
response_content["rooms"]["errors"] = self.encode_errors(sync_result.errors)
if sync_result.pagination_info:
response_content["pagination_info"] = sync_result.pagination_info
defer.returnValue((200, response_content)) defer.returnValue((200, response_content))
def encode_presence(self, events, time_now): def encode_presence(self, events, time_now):
@@ -194,6 +310,15 @@ class SyncRestServlet(RestServlet):
formatted.append(event) formatted.append(event)
return {"events": formatted} return {"events": formatted}
def encode_errors(self, errors):
return {
e.room_id: {
"errcode": e.errcode,
"error": e.error
}
for e in errors
}
def encode_joined(self, rooms, time_now, token_id): def encode_joined(self, rooms, time_now, token_id):
""" """
Encode the joined rooms in a sync result Encode the joined rooms in a sync result
@@ -215,6 +340,7 @@ class SyncRestServlet(RestServlet):
joined[room.room_id] = self.encode_room( joined[room.room_id] = self.encode_room(
room, time_now, token_id room, time_now, token_id
) )
joined[room.room_id]["synced"] = room.synced
return joined return joined

View File

@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database # Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts. # schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 32 SCHEMA_VERSION = 33
dir_path = os.path.abspath(os.path.dirname(__file__)) dir_path = os.path.abspath(os.path.dirname(__file__))

View File

@@ -0,0 +1,24 @@
/* Copyright 2016 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE TABLE room_tags_change_revisions(
user_id TEXT NOT NULL,
room_id TEXT NOT NULL,
stream_id BIGINT NOT NULL,
change TEXT NOT NULL
);
CREATE INDEX room_tags_change_revisions_rm_idx ON room_tags_change_revisions(user_id, room_id, stream_id);
CREATE INDEX room_tags_change_revisions_idx ON room_tags_change_revisions(user_id, stream_id);

View File

@@ -525,6 +525,36 @@ class StreamStore(SQLBaseStore):
int(stream), int(stream),
) )
def get_last_event_id_ts_for_room(self, room_id, token):
"""Get the latest event_id and origin_server_ts for a room_id before a
given token.
Args:
room_id (str)
token (str)
Returns:
Dictionary with ``event_id`` and ``origin_server_ts`` keys.
"""
stream_ordering = RoomStreamToken.parse_stream_token(token).stream
sql = (
"SELECT event_id, origin_server_ts FROM events"
" WHERE room_id = ? AND stream_ordering <= ?"
" ORDER BY topological_ordering DESC, stream_ordering DESC"
" LIMIT 1"
)
def f(txn):
txn.execute(sql, (room_id, stream_ordering))
rows = self.cursor_to_dict(txn)
if rows:
return rows[0]
else:
return None
return self.runInteraction("get_last_event_id_ts_for_room", f)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_events_around(self, room_id, event_id, before_limit, after_limit): def get_events_around(self, room_id, event_id, before_limit, after_limit):
"""Retrieve events and pagination tokens around a given event in a """Retrieve events and pagination tokens around a given event in a

View File

@@ -17,12 +17,18 @@ from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached from synapse.util.caches.descriptors import cached
from twisted.internet import defer from twisted.internet import defer
from collections import Counter
import ujson as json import ujson as json
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
TAG_CHANGE_NEWLY_TAGGED = "newly_tagged"
TAG_CHANGE_ALL_REMOVED = "all_removed"
class TagsStore(SQLBaseStore): class TagsStore(SQLBaseStore):
def get_max_account_data_stream_id(self): def get_max_account_data_stream_id(self):
"""Get the current max stream id for the private user data stream """Get the current max stream id for the private user data stream
@@ -170,6 +176,45 @@ class TagsStore(SQLBaseStore):
row["tag"]: json.loads(row["content"]) for row in rows row["tag"]: json.loads(row["content"]) for row in rows
}) })
def get_room_tags_changed(self, user_id, stream_id, now_id):
"""Returns the rooms that have been newly tagged or had all their tags
removed since `stream_id`.
Collapses multiple changes into one. For example, if a room has gone
from untagged to tagged back to untagged, the room_id won't be returned.
"""
changed = self._account_data_stream_cache.has_entity_changed(
user_id, int(stream_id)
)
if not changed:
return {}
def _get_room_tags_changed(txn):
txn.execute(
"SELECT room_id, change FROM room_tags_change_revisions"
" WHERE user_id = ? AND stream_id > ? AND stream_id <= ?",
(user_id, stream_id, now_id)
)
results = Counter()
for room_id, change in txn.fetchall():
if change == TAG_CHANGE_NEWLY_TAGGED:
results[room_id] += 1
elif change == TAG_CHANGE_ALL_REMOVED:
results[room_id] -= 1
else:
logger.warn("Unexpected tag change: %r", change)
return {
room_id: TAG_CHANGE_NEWLY_TAGGED if count > 0 else TAG_CHANGE_ALL_REMOVED
for room_id, count in results.items()
if count
}
return self.runInteraction("get_room_tags_changed", _get_room_tags_changed)
@defer.inlineCallbacks @defer.inlineCallbacks
def add_tag_to_room(self, user_id, room_id, tag, content): def add_tag_to_room(self, user_id, room_id, tag, content):
"""Add a tag to a room for a user. """Add a tag to a room for a user.
@@ -184,6 +229,12 @@ class TagsStore(SQLBaseStore):
content_json = json.dumps(content) content_json = json.dumps(content)
def add_tag_txn(txn, next_id): def add_tag_txn(txn, next_id):
txn.execute(
"SELECT count(*) FROM room_tags WHERE user_id = ? AND room_id = ?",
(user_id, room_id),
)
existing_tags, = txn.fetchone()
self._simple_upsert_txn( self._simple_upsert_txn(
txn, txn,
table="room_tags", table="room_tags",
@@ -197,6 +248,17 @@ class TagsStore(SQLBaseStore):
} }
) )
self._update_revision_txn(txn, user_id, room_id, next_id) self._update_revision_txn(txn, user_id, room_id, next_id)
if not existing_tags:
self._simple_insert_txn(
txn,
table="room_tags_change_revisions",
values={
"user_id": user_id,
"room_id": room_id,
"stream_id": next_id,
"change": TAG_CHANGE_NEWLY_TAGGED,
}
)
with self._account_data_id_gen.get_next() as next_id: with self._account_data_id_gen.get_next() as next_id:
yield self.runInteraction("add_tag", add_tag_txn, next_id) yield self.runInteraction("add_tag", add_tag_txn, next_id)
@@ -218,6 +280,24 @@ class TagsStore(SQLBaseStore):
" WHERE user_id = ? AND room_id = ? AND tag = ?" " WHERE user_id = ? AND room_id = ? AND tag = ?"
) )
txn.execute(sql, (user_id, room_id, tag)) txn.execute(sql, (user_id, room_id, tag))
if txn.rowcount > 0:
txn.execute(
"SELECT count(*) FROM room_tags WHERE user_id = ? AND room_id = ?",
(user_id, room_id),
)
existing_tags, = txn.fetchone()
if not existing_tags:
self._simple_insert_txn(
txn,
table="room_tags_change_revisions",
values={
"user_id": user_id,
"room_id": room_id,
"stream_id": next_id,
"change": TAG_CHANGE_ALL_REMOVED,
}
)
self._update_revision_txn(txn, user_id, room_id, next_id) self._update_revision_txn(txn, user_id, room_id, next_id)
with self._account_data_id_gen.get_next() as next_id: with self._account_data_id_gen.get_next() as next_id:

View File

@@ -14,7 +14,7 @@
# limitations under the License. # limitations under the License.
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.types import StreamToken from synapse.types import StreamToken, SyncNextBatchToken
import logging import logging
@@ -72,14 +72,18 @@ class PaginationConfig(object):
if direction not in ['f', 'b']: if direction not in ['f', 'b']:
raise SynapseError(400, "'dir' parameter is invalid.") raise SynapseError(400, "'dir' parameter is invalid.")
from_tok = get_param("from") raw_from_tok = get_param("from")
to_tok = get_param("to") to_tok = get_param("to")
try: try:
if from_tok == "END": from_tok = None
if raw_from_tok == "END":
from_tok = None # For backwards compat. from_tok = None # For backwards compat.
elif from_tok: elif raw_from_tok:
from_tok = StreamToken.from_string(from_tok) try:
from_tok = SyncNextBatchToken.from_string(raw_from_tok).stream_token
except:
from_tok = StreamToken.from_string(raw_from_tok)
except: except:
raise SynapseError(400, "'from' paramater is invalid") raise SynapseError(400, "'from' paramater is invalid")

View File

@@ -17,6 +17,9 @@ from synapse.api.errors import SynapseError
from collections import namedtuple from collections import namedtuple
from unpaddedbase64 import encode_base64, decode_base64
import cbor2 as serializer
Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"]) Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"])
@@ -115,8 +118,71 @@ class EventID(DomainSpecificString):
SIGIL = "$" SIGIL = "$"
class SyncNextBatchToken(
namedtuple("SyncNextBatchToken", (
"stream_token",
"pagination_state",
))
):
@classmethod
def from_string(cls, string):
try:
d = serializer.loads(decode_base64(string))
pa = d.get("pa", None)
if pa:
pa = SyncPaginationState.from_dict(pa)
return cls(
stream_token=StreamToken.from_arr(d["t"]),
pagination_state=pa,
)
except:
raise SynapseError(400, "Invalid Token")
def to_string(self):
return encode_base64(serializer.dumps({
"t": self.stream_token.to_arr(),
"pa": self.pagination_state.to_dict() if self.pagination_state else None,
}))
def replace(self, **kwargs):
return self._replace(**kwargs)
_ORDER_ENCODE = {"m.origin_server_ts": "o"}
_ORDER_DECODE = {v: k for k, v in _ORDER_ENCODE.items()}
_TAG_ENCODE = {"m.include_all": "i", "m.ignore": "x"}
_TAG_DECODE = {v: k for k, v in _TAG_ENCODE.items()}
class SyncPaginationState(
namedtuple("SyncPaginationState", (
"order",
"value",
"limit",
"tags",
))
):
@classmethod
def from_dict(cls, d):
try:
return cls(_ORDER_DECODE[d["o"]], d["v"], d["l"], _TAG_DECODE[d["t"]])
except:
raise SynapseError(400, "Invalid Token")
def to_dict(self):
return {
"o": _ORDER_ENCODE[self.order],
"v": self.value,
"l": self.limit,
"t": _TAG_ENCODE[self.tags],
}
def replace(self, **kwargs):
return self._replace(**kwargs)
class StreamToken( class StreamToken(
namedtuple("Token", ( namedtuple("StreamToken", (
"room_key", "room_key",
"presence_key", "presence_key",
"typing_key", "typing_key",
@@ -141,6 +207,20 @@ class StreamToken(
def to_string(self): def to_string(self):
return self._SEPARATOR.join([str(k) for k in self]) return self._SEPARATOR.join([str(k) for k in self])
@classmethod
def from_arr(cls, arr):
try:
keys = arr
while len(keys) < len(cls._fields):
# i.e. old token from before receipt_key
keys.append("0")
return cls(*keys)
except:
raise SynapseError(400, "Invalid Token")
def to_arr(self):
return self
@property @property
def room_stream_id(self): def room_stream_id(self):
# TODO(markjh): Awful hack to work around hacks in the presence tests # TODO(markjh): Awful hack to work around hacks in the presence tests