Compare commits

...

43 Commits

Author SHA1 Message Date
Erik Johnston
d2b59f2482 Implement top-level unread_notifications 2016-06-28 10:55:54 +01:00
Erik Johnston
6c137b321d Encode batch tokens better 2016-06-27 15:21:12 +01:00
Erik Johnston
4b7abedfd9 Comments 2016-06-27 15:10:39 +01:00
Erik Johnston
f07f99387e Use cbor 2016-06-27 14:24:50 +01:00
Erik Johnston
4c67e06dfb Use JSON instead of msgpack 2016-06-27 13:18:04 +01:00
Erik Johnston
92c58932d1 More logging 2016-06-27 11:51:06 +01:00
Erik Johnston
3263e12d73 Try serializing as json rather than msgpack 2016-06-27 11:24:58 +01:00
Erik Johnston
c0b2f33dc2 Logging 2016-06-27 10:34:52 +01:00
Erik Johnston
3ace9bdff9 Empty commit 2016-06-24 16:34:37 +01:00
Erik Johnston
434c51d538 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/paginate_sync 2016-06-24 16:09:23 +01:00
Erik Johnston
a72919b748 Add get_last_event_id_ts_for_room to slave DataStore 2016-06-24 13:54:06 +01:00
Erik Johnston
62050d2dfb Comments 2016-06-24 11:11:53 +01:00
Erik Johnston
bf0edf7a16 Make jenkins-unittests.sh install deps 2016-06-24 11:05:09 +01:00
Erik Johnston
9df5f81687 Make get_room_tags_changed take a now position. Comments 2016-06-23 17:50:30 +01:00
Erik Johnston
a7e6ad9f3e Use SyncExtras 2016-06-23 17:26:27 +01:00
Erik Johnston
6c8c061c2f Move stuff into separate function 2016-06-23 16:25:11 +01:00
Erik Johnston
7b3324e252 Get rid of per room full_state flag 2016-06-23 15:48:33 +01:00
Erik Johnston
8c3fca8b28 Correctly handle tags changing in paginated sync 2016-06-23 13:43:25 +01:00
Erik Johnston
a90140358b Change default tag handling 2016-06-23 10:40:43 +01:00
Erik Johnston
baab93b0dd Implement 'synced' flag 2016-06-22 11:40:06 +01:00
Erik Johnston
839088e2e7 Support streaming peek 2016-06-22 11:02:27 +01:00
Erik Johnston
6a101e512f Add tag handling 2016-06-22 10:59:24 +01:00
Erik Johnston
cdd379b6df Use msgpack for shorter tokens 2016-06-21 11:36:28 +01:00
Erik Johnston
3b6027dbc1 Always include tags 2016-06-21 11:18:09 +01:00
Erik Johnston
6992fb9bc1 Implement error responses 2016-06-21 10:29:44 +01:00
Erik Johnston
22dea0ca37 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/paginate_sync 2016-06-20 14:39:58 +01:00
Erik Johnston
96d6fff447 Fix 'A next_batch token can be used in the v1 messages API' 2016-06-16 11:33:53 +01:00
Erik Johnston
2b0f9bddcf Merge branch 'develop' of github.com:matrix-org/synapse into erikj/paginate_sync 2016-06-16 10:40:17 +01:00
Erik Johnston
3b52bd1cf6 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/paginate_sync 2016-06-14 15:22:21 +01:00
Erik Johnston
e5b3034fc4 Indicate if /sync was limited or not 2016-05-25 17:00:59 +01:00
Erik Johnston
43cbde4653 Basic extra include pagination impl 2016-05-25 15:54:32 +01:00
Erik Johnston
26c7f08465 Implement basic pagination 2016-05-25 10:14:38 +01:00
Erik Johnston
4902770e32 Merge branch 'erikj/sync_refactor' of github.com:matrix-org/synapse into erikj/paginate_sync 2016-05-24 16:28:51 +01:00
Erik Johnston
38d90e0d7d Add POST /sync API endpoint 2016-05-20 14:42:25 +01:00
Erik Johnston
99a7205093 Change name 2016-05-20 11:11:42 +01:00
Erik Johnston
5941346c5b Merge branch 'develop' of github.com:matrix-org/synapse into erikj/paginate_sync 2016-05-18 11:38:24 +01:00
Erik Johnston
573e51cc0b Correctly order recents 2016-05-18 11:33:26 +01:00
Erik Johnston
39182c3594 Typo 2016-05-18 11:30:01 +01:00
Erik Johnston
b999adcaa2 Filter before ordering 2016-05-18 11:28:26 +01:00
Erik Johnston
d1e9655f75 Call get_last_ts less 2016-05-17 15:37:48 +01:00
Erik Johnston
64df836067 Correctly figure out which rooms we've sent down 2016-05-17 14:23:13 +01:00
Erik Johnston
32d476d4f1 Change token format 2016-05-16 16:59:18 +01:00
Erik Johnston
a2decbdd66 Only load the last N joined room 2016-05-16 13:31:22 +01:00
13 changed files with 803 additions and 65 deletions

View File

@@ -20,6 +20,10 @@ export DUMP_COVERAGE_COMMAND="coverage help"
# UNSTABLE or FAILURE this build.
export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?"
TOX_BIN=$WORKSPACE/.tox/py27/bin
python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install
$TOX_BIN/pip install lxml
rm .coverage* || echo "No coverage files to remove"
tox -e py27

View File

@@ -44,6 +44,7 @@ class Codes(object):
THREEPID_AUTH_FAILED = "M_THREEPID_AUTH_FAILED"
THREEPID_IN_USE = "THREEPID_IN_USE"
INVALID_USERNAME = "M_INVALID_USERNAME"
CANNOT_PEEK = "M_CANNOT_PEEK"
class CodeMessageException(RuntimeError):

View File

@@ -20,6 +20,9 @@ from synapse.util.metrics import Measure
from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
from synapse.visibility import filter_events_for_client
from synapse.types import SyncNextBatchToken, SyncPaginationState
from synapse.api.errors import Codes, SynapseError
from synapse.storage.tags import (TAG_CHANGE_NEWLY_TAGGED, TAG_CHANGE_ALL_REMOVED)
from twisted.internet import defer
@@ -35,9 +38,48 @@ SyncConfig = collections.namedtuple("SyncConfig", [
"filter_collection",
"is_guest",
"request_key",
"pagination_config",
])
class SyncPaginationConfig(collections.namedtuple("SyncPaginationConfig", [
"order",
"limit",
"tags",
])):
"Initial pagination configuration from initial sync."
def __init__(self, order, limit, tags):
if order not in SYNC_PAGINATION_VALID_ORDERS:
raise SynapseError(400, "Invalid 'order'")
if tags not in SYNC_PAGINATION_VALID_TAGS_OPTIONS:
raise SynapseError(400, "Invalid 'tags'")
try:
limit = int(limit)
except:
raise SynapseError(400, "Invalid 'limit'")
super(SyncPaginationConfig, self).__init__(order, limit, tags)
SYNC_PAGINATION_TAGS_INCLUDE_ALL = "m.include_all"
SYNC_PAGINATION_TAGS_IGNORE = "m.ignore"
SYNC_PAGINATION_VALID_TAGS_OPTIONS = (
SYNC_PAGINATION_TAGS_INCLUDE_ALL, SYNC_PAGINATION_TAGS_IGNORE,
)
SYNC_PAGINATION_ORDER_TS = "m.origin_server_ts"
SYNC_PAGINATION_VALID_ORDERS = (SYNC_PAGINATION_ORDER_TS,)
SyncExtras = collections.namedtuple("SyncExtras", [
"paginate", # dict with "limit" key
"peek", # dict of room_id -> dict
])
DEFAULT_SYNC_EXTRAS = SyncExtras(paginate={}, peek={})
class TimelineBatch(collections.namedtuple("TimelineBatch", [
"prev_batch",
"events",
@@ -59,6 +101,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
"ephemeral",
"account_data",
"unread_notifications",
"synced", # bool
])):
__slots__ = []
@@ -106,6 +149,18 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
return True
class ErrorSyncResult(collections.namedtuple("ErrorSyncResult", [
"room_id", # str
"errcode", # str
"error", # str
])):
__slots__ = []
def __nonzero__(self):
"""Errors should always be reported to the client"""
return True
class SyncResult(collections.namedtuple("SyncResult", [
"next_batch", # Token for the next sync
"presence", # List of presence events for the user.
@@ -113,6 +168,9 @@ class SyncResult(collections.namedtuple("SyncResult", [
"joined", # JoinedSyncResult for each joined room.
"invited", # InvitedSyncResult for each invited room.
"archived", # ArchivedSyncResult for each archived room.
"errors", # ErrorSyncResult
"pagination_info",
"unread_notifications",
])):
__slots__ = []
@@ -140,8 +198,8 @@ class SyncHandler(object):
self.clock = hs.get_clock()
self.response_cache = ResponseCache()
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0,
full_state=False, extras=DEFAULT_SYNC_EXTRAS):
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
@@ -153,48 +211,42 @@ class SyncHandler(object):
result = self.response_cache.set(
sync_config.request_key,
self._wait_for_sync_for_user(
sync_config, since_token, timeout, full_state
sync_config, batch_token, timeout, full_state, extras,
)
)
return result
@defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
full_state):
def _wait_for_sync_for_user(self, sync_config, batch_token, timeout,
full_state, extras=DEFAULT_SYNC_EXTRAS):
context = LoggingContext.current_context()
if context:
if since_token is None:
if batch_token is None:
context.tag = "initial_sync"
elif full_state:
context.tag = "full_state_sync"
else:
context.tag = "incremental_sync"
if timeout == 0 or since_token is None or full_state:
if timeout == 0 or batch_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result = yield self.current_sync_for_user(
sync_config, since_token, full_state=full_state,
result = yield self.generate_sync_result(
sync_config, batch_token, full_state=full_state, extras=extras,
)
defer.returnValue(result)
else:
def current_sync_callback(before_token, after_token):
return self.current_sync_for_user(sync_config, since_token)
return self.generate_sync_result(
sync_config, batch_token, full_state=False, extras=extras,
)
result = yield self.notifier.wait_for_events(
sync_config.user.to_string(), timeout, current_sync_callback,
from_token=since_token,
from_token=batch_token.stream_token,
)
defer.returnValue(result)
def current_sync_for_user(self, sync_config, since_token=None,
full_state=False):
"""Get the sync for client needed to match what the server has now.
Returns:
A Deferred SyncResult.
"""
return self.generate_sync_result(sync_config, since_token, full_state)
@defer.inlineCallbacks
def push_rules_for_user(self, user):
user_id = user.to_string()
@@ -490,13 +542,15 @@ class SyncHandler(object):
defer.returnValue(None)
@defer.inlineCallbacks
def generate_sync_result(self, sync_config, since_token=None, full_state=False):
def generate_sync_result(self, sync_config, batch_token=None, full_state=False,
extras=DEFAULT_SYNC_EXTRAS):
"""Generates a sync result.
Args:
sync_config (SyncConfig)
since_token (StreamToken)
full_state (bool)
extras (SyncExtras)
Returns:
Deferred(SyncResult)
@@ -508,10 +562,16 @@ class SyncHandler(object):
# Always use the `now_token` in `SyncResultBuilder`
now_token = yield self.event_sources.get_current_token()
all_joined_rooms = yield self.store.get_rooms_for_user(
sync_config.user.to_string()
)
all_joined_rooms = [room.room_id for room in all_joined_rooms]
sync_result_builder = SyncResultBuilder(
sync_config, full_state,
since_token=since_token,
batch_token=batch_token,
now_token=now_token,
all_joined_rooms=all_joined_rooms,
)
account_data_by_room = yield self._generate_sync_entry_for_account_data(
@@ -519,7 +579,7 @@ class SyncHandler(object):
)
res = yield self._generate_sync_entry_for_rooms(
sync_result_builder, account_data_by_room
sync_result_builder, account_data_by_room, extras,
)
newly_joined_rooms, newly_joined_users = res
@@ -527,15 +587,55 @@ class SyncHandler(object):
sync_result_builder, newly_joined_rooms, newly_joined_users
)
yield self._generate_notification_counts(sync_result_builder)
defer.returnValue(SyncResult(
presence=sync_result_builder.presence,
account_data=sync_result_builder.account_data,
joined=sync_result_builder.joined,
invited=sync_result_builder.invited,
archived=sync_result_builder.archived,
next_batch=sync_result_builder.now_token,
errors=sync_result_builder.errors,
next_batch=SyncNextBatchToken(
stream_token=sync_result_builder.now_token,
pagination_state=sync_result_builder.pagination_state,
),
pagination_info=sync_result_builder.pagination_info,
unread_notifications=sync_result_builder.unread_notifications,
))
@defer.inlineCallbacks
def _generate_notification_counts(self, sync_result_builder):
rooms = sync_result_builder.all_joined_rooms
total_notif_count = [0]
rooms_with_notifs = set()
total_highlight_count = [0]
rooms_with_highlights = set()
@defer.inlineCallbacks
def notif_for_room(room_id):
notifs = yield self.unread_notifs_for_room_id(
room_id, sync_result_builder.sync_config
)
if notifs is not None:
total_notif_count[0] += notifs["notify_count"]
total_highlight_count[0] += notifs["highlight_count"]
if notifs["notify_count"]:
rooms_with_notifs.add(room_id)
if notifs["highlight_count"]:
rooms_with_highlights.add(room_id)
yield concurrently_execute(notif_for_room, rooms, 10)
sync_result_builder.unread_notifications = {
"total_notification_count": total_notif_count[0],
"rooms_notification_count": len(rooms_with_notifs),
"total_highlight_count": total_highlight_count[0],
"rooms_highlight_count": len(rooms_with_highlights),
}
@defer.inlineCallbacks
def _generate_sync_entry_for_account_data(self, sync_result_builder):
"""Generates the account data portion of the sync response. Populates
@@ -646,7 +746,8 @@ class SyncHandler(object):
sync_result_builder.presence = presence
@defer.inlineCallbacks
def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room):
def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room,
extras):
"""Generates the rooms portion of the sync response. Populates the
`sync_result_builder` with the result.
@@ -690,6 +791,12 @@ class SyncHandler(object):
tags_by_room = yield self.store.get_tags_for_user(user_id)
yield self._update_room_entries_for_paginated_sync(
sync_result_builder, room_entries, extras
)
sync_result_builder.full_state |= sync_result_builder.since_token is None
def handle_room_entries(room_entry):
return self._generate_room_entry(
sync_result_builder,
@@ -698,7 +805,6 @@ class SyncHandler(object):
ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
tags=tags_by_room.get(room_entry.room_id),
account_data=account_data_by_room.get(room_entry.room_id, {}),
always_include=sync_result_builder.full_state,
)
yield concurrently_execute(handle_room_entries, room_entries, 10)
@@ -719,6 +825,162 @@ class SyncHandler(object):
defer.returnValue((newly_joined_rooms, newly_joined_users))
@defer.inlineCallbacks
def _update_room_entries_for_paginated_sync(self, sync_result_builder,
room_entries, extras):
"""Works out which room_entries should be synced to the client, which
would need to be resynced if they were sent down, etc.
Mutates room_entries.
Args:
sync_result_builder (SyncResultBuilder)
room_entries (list(RoomSyncResultBuilder))
extras (SyncExtras)
"""
user_id = sync_result_builder.sync_config.user.to_string()
sync_config = sync_result_builder.sync_config
if sync_config.pagination_config:
pagination_config = sync_config.pagination_config
old_pagination_value = 0
include_all_tags = pagination_config.tags == SYNC_PAGINATION_TAGS_INCLUDE_ALL
elif sync_result_builder.pagination_state:
pagination_config = SyncPaginationConfig(
order=sync_result_builder.pagination_state.order,
limit=sync_result_builder.pagination_state.limit,
tags=sync_result_builder.pagination_state.tags,
)
old_pagination_value = sync_result_builder.pagination_state.value
include_all_tags = pagination_config.tags == SYNC_PAGINATION_TAGS_INCLUDE_ALL
else:
pagination_config = None
old_pagination_value = 0
include_all_tags = False
if sync_result_builder.pagination_state:
missing_state = yield self._get_rooms_that_need_full_state(
room_ids=[r.room_id for r in room_entries],
sync_config=sync_config,
since_token=sync_result_builder.since_token,
pagination_state=sync_result_builder.pagination_state,
)
all_tags = yield self.store.get_tags_for_user(user_id)
if sync_result_builder.since_token:
stream_id = sync_result_builder.since_token.account_data_key
now_stream_id = sync_result_builder.now_token.account_data_key
tag_changes = yield self.store.get_room_tags_changed(
user_id, stream_id, now_stream_id
)
else:
tag_changes = {}
if missing_state:
for r in room_entries:
if r.room_id in missing_state:
if include_all_tags:
# If we're always including tagged rooms, then only
# resync rooms which are newly tagged.
change = tag_changes.get(r.room_id)
if change == TAG_CHANGE_NEWLY_TAGGED:
r.always_include = True
r.would_require_resync = True
r.synced = True
continue
elif change == TAG_CHANGE_ALL_REMOVED:
r.always_include = True
r.synced = False
continue
elif r.room_id in all_tags:
r.always_include = True
continue
if r.room_id in extras.peek:
since = extras.peek[r.room_id].get("since", None)
if since:
tok = SyncNextBatchToken.from_string(since)
r.since_token = tok.stream_token
else:
r.always_include = True
r.would_require_resync = True
r.synced = False
else:
r.would_require_resync = True
elif pagination_config and include_all_tags:
all_tags = yield self.store.get_tags_for_user(user_id)
for r in room_entries:
if r.room_id in all_tags:
r.always_include = True
for room_id in set(extras.peek.keys()) - {r.room_id for r in room_entries}:
sync_result_builder.errors.append(ErrorSyncResult(
room_id=room_id,
errcode=Codes.CANNOT_PEEK,
error="Cannot peek into requested room",
))
if pagination_config:
room_ids = [r.room_id for r in room_entries]
pagination_limit = pagination_config.limit
extra_limit = extras.paginate.get("limit", 0)
room_map = yield self._get_room_timestamps_at_token(
room_ids, sync_result_builder.now_token, sync_config,
pagination_limit + extra_limit + 1,
)
limited = False
if room_map:
sorted_list = sorted(
room_map.items(),
key=lambda item: -item[1]
)
cutoff_list = sorted_list[:pagination_limit + extra_limit]
if cutoff_list[pagination_limit:]:
new_room_ids = set(r[0] for r in cutoff_list[pagination_limit:])
for r in room_entries:
if r.room_id in new_room_ids:
r.always_include = True
r.would_require_resync = True
_, bottom_ts = cutoff_list[-1]
new_pagination_value = bottom_ts
# We're limited if there are any rooms that are after cutoff
# in the list, but still have an origin server ts from after
# the pagination value from the since token.
limited = any(
old_pagination_value < r[1]
for r in sorted_list[pagination_limit + extra_limit:]
)
sync_result_builder.pagination_state = SyncPaginationState(
order=pagination_config.order, value=new_pagination_value,
limit=pagination_limit + extra_limit,
tags=pagination_config.tags,
)
to_sync_map = dict(cutoff_list)
else:
to_sync_map = {}
sync_result_builder.pagination_info["limited"] = limited
if len(room_map) == len(room_entries):
sync_result_builder.pagination_state = None
room_entries[:] = [
r for r in room_entries
if r.room_id in to_sync_map or r.always_include
]
@defer.inlineCallbacks
def _get_rooms_changed(self, sync_result_builder, ignored_users):
"""Gets the the changes that have happened since the last sync.
@@ -809,7 +1071,6 @@ class SyncHandler(object):
rtype="archived",
events=None,
newly_joined=room_id in newly_joined_rooms,
full_state=False,
since_token=since_token,
upto_token=leave_token,
))
@@ -839,7 +1100,6 @@ class SyncHandler(object):
rtype="joined",
events=events,
newly_joined=room_id in newly_joined_rooms,
full_state=False,
since_token=None if room_id in newly_joined_rooms else since_token,
upto_token=prev_batch_token,
))
@@ -849,7 +1109,6 @@ class SyncHandler(object):
rtype="joined",
events=[],
newly_joined=room_id in newly_joined_rooms,
full_state=False,
since_token=since_token,
upto_token=since_token,
))
@@ -893,7 +1152,6 @@ class SyncHandler(object):
rtype="joined",
events=None,
newly_joined=False,
full_state=True,
since_token=since_token,
upto_token=now_token,
))
@@ -920,7 +1178,6 @@ class SyncHandler(object):
rtype="archived",
events=None,
newly_joined=False,
full_state=True,
since_token=since_token,
upto_token=leave_token,
))
@@ -929,8 +1186,7 @@ class SyncHandler(object):
@defer.inlineCallbacks
def _generate_room_entry(self, sync_result_builder, ignored_users,
room_builder, ephemeral, tags, account_data,
always_include=False):
room_builder, ephemeral, tags, account_data):
"""Populates the `joined` and `archived` section of `sync_result_builder`
based on the `room_builder`.
@@ -946,19 +1202,23 @@ class SyncHandler(object):
even if empty.
"""
newly_joined = room_builder.newly_joined
full_state = (
room_builder.full_state
or newly_joined
always_include = (
newly_joined
or sync_result_builder.full_state
or room_builder.always_include
)
full_state = (
newly_joined
or sync_result_builder.full_state
or room_builder.would_require_resync
)
events = room_builder.events
# We want to shortcut out as early as possible.
if not (always_include or account_data or ephemeral or full_state):
if not (always_include or account_data or ephemeral):
if events == [] and tags is None:
return
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config
@@ -993,9 +1253,20 @@ class SyncHandler(object):
ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
if not (always_include or batch or account_data or ephemeral or full_state):
if not (always_include or batch or account_data or ephemeral):
return
# At this point we're guarenteed (?) to send down the room, so if we
# need to resync the entire room do so now.
if room_builder.would_require_resync:
batch = yield self._load_filtered_recents(
room_id, sync_config,
now_token=upto_token,
since_token=None,
recents=None,
newly_joined_room=newly_joined,
)
state = yield self.compute_state_delta(
room_id, batch, sync_config, since_token, now_token,
full_state=full_state
@@ -1010,6 +1281,7 @@ class SyncHandler(object):
ephemeral=ephemeral,
account_data=account_data_events,
unread_notifications=unread_notifications,
synced=room_builder.synced,
)
if room_sync or always_include:
@@ -1034,6 +1306,90 @@ class SyncHandler(object):
else:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
@defer.inlineCallbacks
def _get_room_timestamps_at_token(self, room_ids, token, sync_config, limit):
"""For each room, get the last origin_server_ts timestamp the client
would see (after filtering) at a particular token.
Only attempts finds the latest `limit` room timestamps.
"""
room_to_entries = {}
@defer.inlineCallbacks
def _get_last_ts(room_id):
entry = yield self.store.get_last_event_id_ts_for_room(
room_id, token.room_key
)
# TODO: Is this ever possible?
room_to_entries[room_id] = entry if entry else {
"origin_server_ts": 0,
}
yield concurrently_execute(_get_last_ts, room_ids, 10)
if len(room_to_entries) <= limit:
defer.returnValue({
room_id: entry["origin_server_ts"]
for room_id, entry in room_to_entries.items()
})
queued_events = sorted(
room_to_entries.items(),
key=lambda e: -e[1]["origin_server_ts"]
)
to_return = {}
while len(to_return) < limit and len(queued_events) > 0:
to_fetch = queued_events[:limit - len(to_return)]
event_to_q = {
e["event_id"]: (room_id, e) for room_id, e in to_fetch
if "event_id" in e
}
# Now we fetch each event to check if its been filtered out
event_map = yield self.store.get_events(event_to_q.keys())
recents = sync_config.filter_collection.filter_room_timeline(
event_map.values()
)
recents = yield filter_events_for_client(
self.store,
sync_config.user.to_string(),
recents,
)
to_return.update({r.room_id: r.origin_server_ts for r in recents})
for ev_id in set(event_map.keys()) - set(r.event_id for r in recents):
queued_events.append(event_to_q[ev_id])
# FIXME: Need to refetch TS
queued_events.sort(key=lambda e: -e[1]["origin_server_ts"])
defer.returnValue(to_return)
@defer.inlineCallbacks
def _get_rooms_that_need_full_state(self, room_ids, sync_config, since_token,
pagination_state):
"""Work out which rooms we haven't sent to the client yet, so would
require us to send down the full state
"""
start_ts = yield self._get_room_timestamps_at_token(
room_ids, since_token,
sync_config=sync_config,
limit=len(room_ids),
)
missing_list = frozenset(
room_id for room_id, ts in
sorted(start_ts.items(), key=lambda item: -item[1])
if ts < pagination_state.value
)
defer.returnValue(missing_list)
def _action_has_highlight(actions):
for action in actions:
@@ -1085,31 +1441,53 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
class SyncResultBuilder(object):
"Used to help build up a new SyncResult for a user"
def __init__(self, sync_config, full_state, since_token, now_token):
__slots__ = (
"sync_config", "full_state", "batch_token", "since_token", "pagination_state",
"now_token", "presence", "account_data", "joined", "invited", "archived",
"pagination_info", "errors", "all_joined_rooms", "unread_notifications",
)
def __init__(self, sync_config, full_state, batch_token, now_token,
all_joined_rooms):
"""
Args:
sync_config(SyncConfig)
full_state(bool): The full_state flag as specified by user
since_token(StreamToken): The token supplied by user, or None.
batch_token(SyncNextBatchToken): The token supplied by user, or None.
now_token(StreamToken): The token to sync up to.
all_joined_rooms(list(str)): List of all joined room ids.
"""
self.sync_config = sync_config
self.full_state = full_state
self.since_token = since_token
self.batch_token = batch_token
self.since_token = batch_token.stream_token if batch_token else None
self.pagination_state = batch_token.pagination_state if batch_token else None
self.now_token = now_token
self.all_joined_rooms = all_joined_rooms
self.presence = []
self.account_data = []
self.joined = []
self.invited = []
self.archived = []
self.errors = []
self.pagination_info = {}
self.unread_notifications = {}
class RoomSyncResultBuilder(object):
"""Stores information needed to create either a `JoinedSyncResult` or
`ArchivedSyncResult`.
"""
def __init__(self, room_id, rtype, events, newly_joined, full_state,
__slots__ = (
"room_id", "rtype", "events", "newly_joined", "since_token",
"upto_token", "always_include", "would_require_resync", "synced",
)
def __init__(self, room_id, rtype, events, newly_joined,
since_token, upto_token):
"""
Args:
@@ -1118,7 +1496,6 @@ class RoomSyncResultBuilder(object):
events(list): List of events to include in the room, (more events
may be added when generating result).
newly_joined(bool): If the user has newly joined the room
full_state(bool): Whether the full state should be sent in result
since_token(StreamToken): Earliest point to return events from, or None
upto_token(StreamToken): Latest point to return events from.
"""
@@ -1126,6 +1503,12 @@ class RoomSyncResultBuilder(object):
self.rtype = rtype
self.events = events
self.newly_joined = newly_joined
self.full_state = full_state
self.since_token = since_token
self.upto_token = upto_token
# Should this room always be included in the sync?
self.always_include = False
# If we send down this room, should we send down the full state?
self.would_require_resync = False
# Should the client consider this room "synced"?
self.synced = True

View File

@@ -36,6 +36,7 @@ REQUIREMENTS = {
"blist": ["blist"],
"pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"],
"pymacaroons-pynacl": ["pymacaroons"],
"cbor2": ["cbor2"],
}
CONDITIONAL_REQUIREMENTS = {
"web_client": {

View File

@@ -51,6 +51,9 @@ class SlavedAccountDataStore(BaseSlavedStore):
get_updated_account_data_for_user = (
DataStore.get_updated_account_data_for_user.__func__
)
get_room_tags_changed = (
DataStore.get_room_tags_changed.__func__
)
def get_max_account_data_stream_id(self):
return self._account_data_id_gen.get_current_token()

View File

@@ -144,6 +144,8 @@ class SlavedEventStore(BaseSlavedStore):
_get_events_around_txn = DataStore._get_events_around_txn.__func__
_get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__
get_last_event_id_ts_for_room = DataStore.get_last_event_id_ts_for_room.__func__
def stream_positions(self):
result = super(SlavedEventStore, self).stream_positions()
result["events"] = self._stream_id_gen.get_current_token()

View File

@@ -16,10 +16,14 @@
from twisted.internet import defer
from synapse.http.servlet import (
RestServlet, parse_string, parse_integer, parse_boolean
RestServlet, parse_string, parse_integer, parse_boolean,
parse_json_object_from_request,
)
from synapse.handlers.sync import SyncConfig
from synapse.types import StreamToken
from synapse.handlers.sync import (
SyncConfig, SyncPaginationConfig, SYNC_PAGINATION_TAGS_IGNORE, SyncExtras,
DEFAULT_SYNC_EXTRAS,
)
from synapse.types import SyncNextBatchToken
from synapse.events.utils import (
serialize_event, format_event_for_client_v2_without_room_id,
)
@@ -84,6 +88,94 @@ class SyncRestServlet(RestServlet):
self.filtering = hs.get_filtering()
self.presence_handler = hs.get_presence_handler()
@defer.inlineCallbacks
def on_POST(self, request):
requester = yield self.auth.get_user_by_req(
request, allow_guest=True
)
user = requester.user
body = parse_json_object_from_request(request)
timeout = body.get("timeout", 0)
since = body.get("since", None)
extras = body.get("extras", {})
extras = SyncExtras(
paginate=extras.get("paginate", {}),
peek=extras.get("peek", {}),
)
if "from" in body:
# /events used to use 'from', but /sync uses 'since'.
# Lets be helpful and whine if we see a 'from'.
raise SynapseError(
400, "'from' is not a valid parameter. Did you mean 'since'?"
)
set_presence = body.get("set_presence", "online")
if set_presence not in self.ALLOWED_PRESENCE:
message = "Parameter 'set_presence' must be one of [%s]" % (
", ".join(repr(v) for v in self.ALLOWED_PRESENCE)
)
raise SynapseError(400, message)
full_state = body.get("full_state", False)
filter_id = body.get("filter_id", None)
filter_dict = body.get("filter", None)
pagination_config = body.get("pagination_config", None)
if filter_dict is not None and filter_id is not None:
raise SynapseError(
400,
"Can only specify one of `filter` and `filter_id` paramters"
)
if filter_id:
filter_collection = yield self.filtering.get_user_filter(
user.localpart, filter_id
)
filter_key = filter_id
elif filter_dict:
self.filtering.check_valid_filter(filter_dict)
filter_collection = FilterCollection(filter_dict)
filter_key = json.dumps(filter_dict)
else:
filter_collection = DEFAULT_FILTER_COLLECTION
filter_key = None
request_key = (user, timeout, since, filter_key, full_state)
sync_config = SyncConfig(
user=user,
filter_collection=filter_collection,
is_guest=requester.is_guest,
request_key=request_key,
pagination_config=SyncPaginationConfig(
order=pagination_config["order"],
limit=pagination_config["limit"],
tags=pagination_config.get("tags", SYNC_PAGINATION_TAGS_IGNORE),
) if pagination_config else None,
)
if since is not None:
batch_token = SyncNextBatchToken.from_string(since)
else:
batch_token = None
sync_result = yield self._handle_sync(
requester=requester,
sync_config=sync_config,
batch_token=batch_token,
set_presence=set_presence,
full_state=full_state,
timeout=timeout,
extras=extras,
)
defer.returnValue(sync_result)
@defer.inlineCallbacks
def on_GET(self, request):
if "from" in request.args:
@@ -107,13 +199,6 @@ class SyncRestServlet(RestServlet):
filter_id = parse_string(request, "filter", default=None)
full_state = parse_boolean(request, "full_state", default=False)
logger.info(
"/sync: user=%r, timeout=%r, since=%r,"
" set_presence=%r, filter_id=%r" % (
user, timeout, since, set_presence, filter_id
)
)
request_key = (user, timeout, since, filter_id, full_state)
if filter_id:
@@ -136,15 +221,39 @@ class SyncRestServlet(RestServlet):
filter_collection=filter,
is_guest=requester.is_guest,
request_key=request_key,
pagination_config=None,
)
if since is not None:
since_token = StreamToken.from_string(since)
batch_token = SyncNextBatchToken.from_string(since)
else:
since_token = None
batch_token = None
sync_result = yield self._handle_sync(
requester=requester,
sync_config=sync_config,
batch_token=batch_token,
set_presence=set_presence,
full_state=full_state,
timeout=timeout,
)
defer.returnValue(sync_result)
@defer.inlineCallbacks
def _handle_sync(self, requester, sync_config, batch_token, set_presence,
full_state, timeout, extras=DEFAULT_SYNC_EXTRAS):
affect_presence = set_presence != PresenceState.OFFLINE
user = sync_config.user
logger.info(
"/sync: user=%r, timeout=%r, since=%r,"
" set_presence=%r" % (
user, timeout, batch_token, set_presence
)
)
if affect_presence:
yield self.presence_handler.set_state(user, {"presence": set_presence})
@@ -153,8 +262,8 @@ class SyncRestServlet(RestServlet):
)
with context:
sync_result = yield self.sync_handler.wait_for_sync_for_user(
sync_config, since_token=since_token, timeout=timeout,
full_state=full_state
sync_config, batch_token=batch_token, timeout=timeout,
full_state=full_state, extras=extras,
)
time_now = self.clock.time_msec()
@@ -182,8 +291,15 @@ class SyncRestServlet(RestServlet):
"leave": archived,
},
"next_batch": sync_result.next_batch.to_string(),
"unread_notifications": sync_result.unread_notifications,
}
if sync_result.errors:
response_content["rooms"]["errors"] = self.encode_errors(sync_result.errors)
if sync_result.pagination_info:
response_content["pagination_info"] = sync_result.pagination_info
defer.returnValue((200, response_content))
def encode_presence(self, events, time_now):
@@ -194,6 +310,15 @@ class SyncRestServlet(RestServlet):
formatted.append(event)
return {"events": formatted}
def encode_errors(self, errors):
return {
e.room_id: {
"errcode": e.errcode,
"error": e.error
}
for e in errors
}
def encode_joined(self, rooms, time_now, token_id):
"""
Encode the joined rooms in a sync result
@@ -215,6 +340,7 @@ class SyncRestServlet(RestServlet):
joined[room.room_id] = self.encode_room(
room, time_now, token_id
)
joined[room.room_id]["synced"] = room.synced
return joined

View File

@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 32
SCHEMA_VERSION = 33
dir_path = os.path.abspath(os.path.dirname(__file__))

View File

@@ -0,0 +1,24 @@
/* Copyright 2016 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE TABLE room_tags_change_revisions(
user_id TEXT NOT NULL,
room_id TEXT NOT NULL,
stream_id BIGINT NOT NULL,
change TEXT NOT NULL
);
CREATE INDEX room_tags_change_revisions_rm_idx ON room_tags_change_revisions(user_id, room_id, stream_id);
CREATE INDEX room_tags_change_revisions_idx ON room_tags_change_revisions(user_id, stream_id);

View File

@@ -525,6 +525,36 @@ class StreamStore(SQLBaseStore):
int(stream),
)
def get_last_event_id_ts_for_room(self, room_id, token):
"""Get the latest event_id and origin_server_ts for a room_id before a
given token.
Args:
room_id (str)
token (str)
Returns:
Dictionary with ``event_id`` and ``origin_server_ts`` keys.
"""
stream_ordering = RoomStreamToken.parse_stream_token(token).stream
sql = (
"SELECT event_id, origin_server_ts FROM events"
" WHERE room_id = ? AND stream_ordering <= ?"
" ORDER BY topological_ordering DESC, stream_ordering DESC"
" LIMIT 1"
)
def f(txn):
txn.execute(sql, (room_id, stream_ordering))
rows = self.cursor_to_dict(txn)
if rows:
return rows[0]
else:
return None
return self.runInteraction("get_last_event_id_ts_for_room", f)
@defer.inlineCallbacks
def get_events_around(self, room_id, event_id, before_limit, after_limit):
"""Retrieve events and pagination tokens around a given event in a

View File

@@ -17,12 +17,18 @@ from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached
from twisted.internet import defer
from collections import Counter
import ujson as json
import logging
logger = logging.getLogger(__name__)
TAG_CHANGE_NEWLY_TAGGED = "newly_tagged"
TAG_CHANGE_ALL_REMOVED = "all_removed"
class TagsStore(SQLBaseStore):
def get_max_account_data_stream_id(self):
"""Get the current max stream id for the private user data stream
@@ -170,6 +176,45 @@ class TagsStore(SQLBaseStore):
row["tag"]: json.loads(row["content"]) for row in rows
})
def get_room_tags_changed(self, user_id, stream_id, now_id):
"""Returns the rooms that have been newly tagged or had all their tags
removed since `stream_id`.
Collapses multiple changes into one. For example, if a room has gone
from untagged to tagged back to untagged, the room_id won't be returned.
"""
changed = self._account_data_stream_cache.has_entity_changed(
user_id, int(stream_id)
)
if not changed:
return {}
def _get_room_tags_changed(txn):
txn.execute(
"SELECT room_id, change FROM room_tags_change_revisions"
" WHERE user_id = ? AND stream_id > ? AND stream_id <= ?",
(user_id, stream_id, now_id)
)
results = Counter()
for room_id, change in txn.fetchall():
if change == TAG_CHANGE_NEWLY_TAGGED:
results[room_id] += 1
elif change == TAG_CHANGE_ALL_REMOVED:
results[room_id] -= 1
else:
logger.warn("Unexpected tag change: %r", change)
return {
room_id: TAG_CHANGE_NEWLY_TAGGED if count > 0 else TAG_CHANGE_ALL_REMOVED
for room_id, count in results.items()
if count
}
return self.runInteraction("get_room_tags_changed", _get_room_tags_changed)
@defer.inlineCallbacks
def add_tag_to_room(self, user_id, room_id, tag, content):
"""Add a tag to a room for a user.
@@ -184,6 +229,12 @@ class TagsStore(SQLBaseStore):
content_json = json.dumps(content)
def add_tag_txn(txn, next_id):
txn.execute(
"SELECT count(*) FROM room_tags WHERE user_id = ? AND room_id = ?",
(user_id, room_id),
)
existing_tags, = txn.fetchone()
self._simple_upsert_txn(
txn,
table="room_tags",
@@ -197,6 +248,17 @@ class TagsStore(SQLBaseStore):
}
)
self._update_revision_txn(txn, user_id, room_id, next_id)
if not existing_tags:
self._simple_insert_txn(
txn,
table="room_tags_change_revisions",
values={
"user_id": user_id,
"room_id": room_id,
"stream_id": next_id,
"change": TAG_CHANGE_NEWLY_TAGGED,
}
)
with self._account_data_id_gen.get_next() as next_id:
yield self.runInteraction("add_tag", add_tag_txn, next_id)
@@ -218,6 +280,24 @@ class TagsStore(SQLBaseStore):
" WHERE user_id = ? AND room_id = ? AND tag = ?"
)
txn.execute(sql, (user_id, room_id, tag))
if txn.rowcount > 0:
txn.execute(
"SELECT count(*) FROM room_tags WHERE user_id = ? AND room_id = ?",
(user_id, room_id),
)
existing_tags, = txn.fetchone()
if not existing_tags:
self._simple_insert_txn(
txn,
table="room_tags_change_revisions",
values={
"user_id": user_id,
"room_id": room_id,
"stream_id": next_id,
"change": TAG_CHANGE_ALL_REMOVED,
}
)
self._update_revision_txn(txn, user_id, room_id, next_id)
with self._account_data_id_gen.get_next() as next_id:

View File

@@ -14,7 +14,7 @@
# limitations under the License.
from synapse.api.errors import SynapseError
from synapse.types import StreamToken
from synapse.types import StreamToken, SyncNextBatchToken
import logging
@@ -72,14 +72,18 @@ class PaginationConfig(object):
if direction not in ['f', 'b']:
raise SynapseError(400, "'dir' parameter is invalid.")
from_tok = get_param("from")
raw_from_tok = get_param("from")
to_tok = get_param("to")
try:
if from_tok == "END":
from_tok = None
if raw_from_tok == "END":
from_tok = None # For backwards compat.
elif from_tok:
from_tok = StreamToken.from_string(from_tok)
elif raw_from_tok:
try:
from_tok = SyncNextBatchToken.from_string(raw_from_tok).stream_token
except:
from_tok = StreamToken.from_string(raw_from_tok)
except:
raise SynapseError(400, "'from' paramater is invalid")

View File

@@ -17,6 +17,9 @@ from synapse.api.errors import SynapseError
from collections import namedtuple
from unpaddedbase64 import encode_base64, decode_base64
import cbor2 as serializer
Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"])
@@ -115,8 +118,71 @@ class EventID(DomainSpecificString):
SIGIL = "$"
class SyncNextBatchToken(
namedtuple("SyncNextBatchToken", (
"stream_token",
"pagination_state",
))
):
@classmethod
def from_string(cls, string):
try:
d = serializer.loads(decode_base64(string))
pa = d.get("pa", None)
if pa:
pa = SyncPaginationState.from_dict(pa)
return cls(
stream_token=StreamToken.from_arr(d["t"]),
pagination_state=pa,
)
except:
raise SynapseError(400, "Invalid Token")
def to_string(self):
return encode_base64(serializer.dumps({
"t": self.stream_token.to_arr(),
"pa": self.pagination_state.to_dict() if self.pagination_state else None,
}))
def replace(self, **kwargs):
return self._replace(**kwargs)
_ORDER_ENCODE = {"m.origin_server_ts": "o"}
_ORDER_DECODE = {v: k for k, v in _ORDER_ENCODE.items()}
_TAG_ENCODE = {"m.include_all": "i", "m.ignore": "x"}
_TAG_DECODE = {v: k for k, v in _TAG_ENCODE.items()}
class SyncPaginationState(
namedtuple("SyncPaginationState", (
"order",
"value",
"limit",
"tags",
))
):
@classmethod
def from_dict(cls, d):
try:
return cls(_ORDER_DECODE[d["o"]], d["v"], d["l"], _TAG_DECODE[d["t"]])
except:
raise SynapseError(400, "Invalid Token")
def to_dict(self):
return {
"o": _ORDER_ENCODE[self.order],
"v": self.value,
"l": self.limit,
"t": _TAG_ENCODE[self.tags],
}
def replace(self, **kwargs):
return self._replace(**kwargs)
class StreamToken(
namedtuple("Token", (
namedtuple("StreamToken", (
"room_key",
"presence_key",
"typing_key",
@@ -141,6 +207,20 @@ class StreamToken(
def to_string(self):
return self._SEPARATOR.join([str(k) for k in self])
@classmethod
def from_arr(cls, arr):
try:
keys = arr
while len(keys) < len(cls._fields):
# i.e. old token from before receipt_key
keys.append("0")
return cls(*keys)
except:
raise SynapseError(400, "Invalid Token")
def to_arr(self):
return self
@property
def room_stream_id(self):
# TODO(markjh): Awful hack to work around hacks in the presence tests