mirror of
https://github.com/element-hq/synapse.git
synced 2025-12-05 01:10:13 +00:00
Compare commits
43 Commits
dmr/reject
...
erikj/pagi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d2b59f2482 | ||
|
|
6c137b321d | ||
|
|
4b7abedfd9 | ||
|
|
f07f99387e | ||
|
|
4c67e06dfb | ||
|
|
92c58932d1 | ||
|
|
3263e12d73 | ||
|
|
c0b2f33dc2 | ||
|
|
3ace9bdff9 | ||
|
|
434c51d538 | ||
|
|
a72919b748 | ||
|
|
62050d2dfb | ||
|
|
bf0edf7a16 | ||
|
|
9df5f81687 | ||
|
|
a7e6ad9f3e | ||
|
|
6c8c061c2f | ||
|
|
7b3324e252 | ||
|
|
8c3fca8b28 | ||
|
|
a90140358b | ||
|
|
baab93b0dd | ||
|
|
839088e2e7 | ||
|
|
6a101e512f | ||
|
|
cdd379b6df | ||
|
|
3b6027dbc1 | ||
|
|
6992fb9bc1 | ||
|
|
22dea0ca37 | ||
|
|
96d6fff447 | ||
|
|
2b0f9bddcf | ||
|
|
3b52bd1cf6 | ||
|
|
e5b3034fc4 | ||
|
|
43cbde4653 | ||
|
|
26c7f08465 | ||
|
|
4902770e32 | ||
|
|
38d90e0d7d | ||
|
|
99a7205093 | ||
|
|
5941346c5b | ||
|
|
573e51cc0b | ||
|
|
39182c3594 | ||
|
|
b999adcaa2 | ||
|
|
d1e9655f75 | ||
|
|
64df836067 | ||
|
|
32d476d4f1 | ||
|
|
a2decbdd66 |
@@ -20,6 +20,10 @@ export DUMP_COVERAGE_COMMAND="coverage help"
|
||||
# UNSTABLE or FAILURE this build.
|
||||
export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?"
|
||||
|
||||
TOX_BIN=$WORKSPACE/.tox/py27/bin
|
||||
python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install
|
||||
$TOX_BIN/pip install lxml
|
||||
|
||||
rm .coverage* || echo "No coverage files to remove"
|
||||
|
||||
tox -e py27
|
||||
|
||||
@@ -44,6 +44,7 @@ class Codes(object):
|
||||
THREEPID_AUTH_FAILED = "M_THREEPID_AUTH_FAILED"
|
||||
THREEPID_IN_USE = "THREEPID_IN_USE"
|
||||
INVALID_USERNAME = "M_INVALID_USERNAME"
|
||||
CANNOT_PEEK = "M_CANNOT_PEEK"
|
||||
|
||||
|
||||
class CodeMessageException(RuntimeError):
|
||||
|
||||
@@ -20,6 +20,9 @@ from synapse.util.metrics import Measure
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
from synapse.push.clientformat import format_push_rules_for_user
|
||||
from synapse.visibility import filter_events_for_client
|
||||
from synapse.types import SyncNextBatchToken, SyncPaginationState
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.storage.tags import (TAG_CHANGE_NEWLY_TAGGED, TAG_CHANGE_ALL_REMOVED)
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
@@ -35,9 +38,48 @@ SyncConfig = collections.namedtuple("SyncConfig", [
|
||||
"filter_collection",
|
||||
"is_guest",
|
||||
"request_key",
|
||||
"pagination_config",
|
||||
])
|
||||
|
||||
|
||||
class SyncPaginationConfig(collections.namedtuple("SyncPaginationConfig", [
|
||||
"order",
|
||||
"limit",
|
||||
"tags",
|
||||
])):
|
||||
"Initial pagination configuration from initial sync."
|
||||
def __init__(self, order, limit, tags):
|
||||
if order not in SYNC_PAGINATION_VALID_ORDERS:
|
||||
raise SynapseError(400, "Invalid 'order'")
|
||||
if tags not in SYNC_PAGINATION_VALID_TAGS_OPTIONS:
|
||||
raise SynapseError(400, "Invalid 'tags'")
|
||||
|
||||
try:
|
||||
limit = int(limit)
|
||||
except:
|
||||
raise SynapseError(400, "Invalid 'limit'")
|
||||
|
||||
super(SyncPaginationConfig, self).__init__(order, limit, tags)
|
||||
|
||||
|
||||
SYNC_PAGINATION_TAGS_INCLUDE_ALL = "m.include_all"
|
||||
SYNC_PAGINATION_TAGS_IGNORE = "m.ignore"
|
||||
SYNC_PAGINATION_VALID_TAGS_OPTIONS = (
|
||||
SYNC_PAGINATION_TAGS_INCLUDE_ALL, SYNC_PAGINATION_TAGS_IGNORE,
|
||||
)
|
||||
|
||||
SYNC_PAGINATION_ORDER_TS = "m.origin_server_ts"
|
||||
SYNC_PAGINATION_VALID_ORDERS = (SYNC_PAGINATION_ORDER_TS,)
|
||||
|
||||
|
||||
SyncExtras = collections.namedtuple("SyncExtras", [
|
||||
"paginate", # dict with "limit" key
|
||||
"peek", # dict of room_id -> dict
|
||||
])
|
||||
|
||||
DEFAULT_SYNC_EXTRAS = SyncExtras(paginate={}, peek={})
|
||||
|
||||
|
||||
class TimelineBatch(collections.namedtuple("TimelineBatch", [
|
||||
"prev_batch",
|
||||
"events",
|
||||
@@ -59,6 +101,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
|
||||
"ephemeral",
|
||||
"account_data",
|
||||
"unread_notifications",
|
||||
"synced", # bool
|
||||
])):
|
||||
__slots__ = []
|
||||
|
||||
@@ -106,6 +149,18 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
|
||||
return True
|
||||
|
||||
|
||||
class ErrorSyncResult(collections.namedtuple("ErrorSyncResult", [
|
||||
"room_id", # str
|
||||
"errcode", # str
|
||||
"error", # str
|
||||
])):
|
||||
__slots__ = []
|
||||
|
||||
def __nonzero__(self):
|
||||
"""Errors should always be reported to the client"""
|
||||
return True
|
||||
|
||||
|
||||
class SyncResult(collections.namedtuple("SyncResult", [
|
||||
"next_batch", # Token for the next sync
|
||||
"presence", # List of presence events for the user.
|
||||
@@ -113,6 +168,9 @@ class SyncResult(collections.namedtuple("SyncResult", [
|
||||
"joined", # JoinedSyncResult for each joined room.
|
||||
"invited", # InvitedSyncResult for each invited room.
|
||||
"archived", # ArchivedSyncResult for each archived room.
|
||||
"errors", # ErrorSyncResult
|
||||
"pagination_info",
|
||||
"unread_notifications",
|
||||
])):
|
||||
__slots__ = []
|
||||
|
||||
@@ -140,8 +198,8 @@ class SyncHandler(object):
|
||||
self.clock = hs.get_clock()
|
||||
self.response_cache = ResponseCache()
|
||||
|
||||
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
|
||||
full_state=False):
|
||||
def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0,
|
||||
full_state=False, extras=DEFAULT_SYNC_EXTRAS):
|
||||
"""Get the sync for a client if we have new data for it now. Otherwise
|
||||
wait for new data to arrive on the server. If the timeout expires, then
|
||||
return an empty sync result.
|
||||
@@ -153,48 +211,42 @@ class SyncHandler(object):
|
||||
result = self.response_cache.set(
|
||||
sync_config.request_key,
|
||||
self._wait_for_sync_for_user(
|
||||
sync_config, since_token, timeout, full_state
|
||||
sync_config, batch_token, timeout, full_state, extras,
|
||||
)
|
||||
)
|
||||
return result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
|
||||
full_state):
|
||||
def _wait_for_sync_for_user(self, sync_config, batch_token, timeout,
|
||||
full_state, extras=DEFAULT_SYNC_EXTRAS):
|
||||
context = LoggingContext.current_context()
|
||||
if context:
|
||||
if since_token is None:
|
||||
if batch_token is None:
|
||||
context.tag = "initial_sync"
|
||||
elif full_state:
|
||||
context.tag = "full_state_sync"
|
||||
else:
|
||||
context.tag = "incremental_sync"
|
||||
|
||||
if timeout == 0 or since_token is None or full_state:
|
||||
if timeout == 0 or batch_token is None or full_state:
|
||||
# we are going to return immediately, so don't bother calling
|
||||
# notifier.wait_for_events.
|
||||
result = yield self.current_sync_for_user(
|
||||
sync_config, since_token, full_state=full_state,
|
||||
result = yield self.generate_sync_result(
|
||||
sync_config, batch_token, full_state=full_state, extras=extras,
|
||||
)
|
||||
defer.returnValue(result)
|
||||
else:
|
||||
def current_sync_callback(before_token, after_token):
|
||||
return self.current_sync_for_user(sync_config, since_token)
|
||||
return self.generate_sync_result(
|
||||
sync_config, batch_token, full_state=False, extras=extras,
|
||||
)
|
||||
|
||||
result = yield self.notifier.wait_for_events(
|
||||
sync_config.user.to_string(), timeout, current_sync_callback,
|
||||
from_token=since_token,
|
||||
from_token=batch_token.stream_token,
|
||||
)
|
||||
defer.returnValue(result)
|
||||
|
||||
def current_sync_for_user(self, sync_config, since_token=None,
|
||||
full_state=False):
|
||||
"""Get the sync for client needed to match what the server has now.
|
||||
Returns:
|
||||
A Deferred SyncResult.
|
||||
"""
|
||||
return self.generate_sync_result(sync_config, since_token, full_state)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def push_rules_for_user(self, user):
|
||||
user_id = user.to_string()
|
||||
@@ -490,13 +542,15 @@ class SyncHandler(object):
|
||||
defer.returnValue(None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def generate_sync_result(self, sync_config, since_token=None, full_state=False):
|
||||
def generate_sync_result(self, sync_config, batch_token=None, full_state=False,
|
||||
extras=DEFAULT_SYNC_EXTRAS):
|
||||
"""Generates a sync result.
|
||||
|
||||
Args:
|
||||
sync_config (SyncConfig)
|
||||
since_token (StreamToken)
|
||||
full_state (bool)
|
||||
extras (SyncExtras)
|
||||
|
||||
Returns:
|
||||
Deferred(SyncResult)
|
||||
@@ -508,10 +562,16 @@ class SyncHandler(object):
|
||||
# Always use the `now_token` in `SyncResultBuilder`
|
||||
now_token = yield self.event_sources.get_current_token()
|
||||
|
||||
all_joined_rooms = yield self.store.get_rooms_for_user(
|
||||
sync_config.user.to_string()
|
||||
)
|
||||
all_joined_rooms = [room.room_id for room in all_joined_rooms]
|
||||
|
||||
sync_result_builder = SyncResultBuilder(
|
||||
sync_config, full_state,
|
||||
since_token=since_token,
|
||||
batch_token=batch_token,
|
||||
now_token=now_token,
|
||||
all_joined_rooms=all_joined_rooms,
|
||||
)
|
||||
|
||||
account_data_by_room = yield self._generate_sync_entry_for_account_data(
|
||||
@@ -519,7 +579,7 @@ class SyncHandler(object):
|
||||
)
|
||||
|
||||
res = yield self._generate_sync_entry_for_rooms(
|
||||
sync_result_builder, account_data_by_room
|
||||
sync_result_builder, account_data_by_room, extras,
|
||||
)
|
||||
newly_joined_rooms, newly_joined_users = res
|
||||
|
||||
@@ -527,15 +587,55 @@ class SyncHandler(object):
|
||||
sync_result_builder, newly_joined_rooms, newly_joined_users
|
||||
)
|
||||
|
||||
yield self._generate_notification_counts(sync_result_builder)
|
||||
|
||||
defer.returnValue(SyncResult(
|
||||
presence=sync_result_builder.presence,
|
||||
account_data=sync_result_builder.account_data,
|
||||
joined=sync_result_builder.joined,
|
||||
invited=sync_result_builder.invited,
|
||||
archived=sync_result_builder.archived,
|
||||
next_batch=sync_result_builder.now_token,
|
||||
errors=sync_result_builder.errors,
|
||||
next_batch=SyncNextBatchToken(
|
||||
stream_token=sync_result_builder.now_token,
|
||||
pagination_state=sync_result_builder.pagination_state,
|
||||
),
|
||||
pagination_info=sync_result_builder.pagination_info,
|
||||
unread_notifications=sync_result_builder.unread_notifications,
|
||||
))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _generate_notification_counts(self, sync_result_builder):
|
||||
rooms = sync_result_builder.all_joined_rooms
|
||||
|
||||
total_notif_count = [0]
|
||||
rooms_with_notifs = set()
|
||||
total_highlight_count = [0]
|
||||
rooms_with_highlights = set()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def notif_for_room(room_id):
|
||||
notifs = yield self.unread_notifs_for_room_id(
|
||||
room_id, sync_result_builder.sync_config
|
||||
)
|
||||
if notifs is not None:
|
||||
total_notif_count[0] += notifs["notify_count"]
|
||||
total_highlight_count[0] += notifs["highlight_count"]
|
||||
|
||||
if notifs["notify_count"]:
|
||||
rooms_with_notifs.add(room_id)
|
||||
if notifs["highlight_count"]:
|
||||
rooms_with_highlights.add(room_id)
|
||||
|
||||
yield concurrently_execute(notif_for_room, rooms, 10)
|
||||
|
||||
sync_result_builder.unread_notifications = {
|
||||
"total_notification_count": total_notif_count[0],
|
||||
"rooms_notification_count": len(rooms_with_notifs),
|
||||
"total_highlight_count": total_highlight_count[0],
|
||||
"rooms_highlight_count": len(rooms_with_highlights),
|
||||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _generate_sync_entry_for_account_data(self, sync_result_builder):
|
||||
"""Generates the account data portion of the sync response. Populates
|
||||
@@ -646,7 +746,8 @@ class SyncHandler(object):
|
||||
sync_result_builder.presence = presence
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room):
|
||||
def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room,
|
||||
extras):
|
||||
"""Generates the rooms portion of the sync response. Populates the
|
||||
`sync_result_builder` with the result.
|
||||
|
||||
@@ -690,6 +791,12 @@ class SyncHandler(object):
|
||||
|
||||
tags_by_room = yield self.store.get_tags_for_user(user_id)
|
||||
|
||||
yield self._update_room_entries_for_paginated_sync(
|
||||
sync_result_builder, room_entries, extras
|
||||
)
|
||||
|
||||
sync_result_builder.full_state |= sync_result_builder.since_token is None
|
||||
|
||||
def handle_room_entries(room_entry):
|
||||
return self._generate_room_entry(
|
||||
sync_result_builder,
|
||||
@@ -698,7 +805,6 @@ class SyncHandler(object):
|
||||
ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
|
||||
tags=tags_by_room.get(room_entry.room_id),
|
||||
account_data=account_data_by_room.get(room_entry.room_id, {}),
|
||||
always_include=sync_result_builder.full_state,
|
||||
)
|
||||
|
||||
yield concurrently_execute(handle_room_entries, room_entries, 10)
|
||||
@@ -719,6 +825,162 @@ class SyncHandler(object):
|
||||
|
||||
defer.returnValue((newly_joined_rooms, newly_joined_users))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _update_room_entries_for_paginated_sync(self, sync_result_builder,
|
||||
room_entries, extras):
|
||||
"""Works out which room_entries should be synced to the client, which
|
||||
would need to be resynced if they were sent down, etc.
|
||||
|
||||
Mutates room_entries.
|
||||
|
||||
Args:
|
||||
sync_result_builder (SyncResultBuilder)
|
||||
room_entries (list(RoomSyncResultBuilder))
|
||||
extras (SyncExtras)
|
||||
"""
|
||||
user_id = sync_result_builder.sync_config.user.to_string()
|
||||
sync_config = sync_result_builder.sync_config
|
||||
|
||||
if sync_config.pagination_config:
|
||||
pagination_config = sync_config.pagination_config
|
||||
old_pagination_value = 0
|
||||
include_all_tags = pagination_config.tags == SYNC_PAGINATION_TAGS_INCLUDE_ALL
|
||||
elif sync_result_builder.pagination_state:
|
||||
pagination_config = SyncPaginationConfig(
|
||||
order=sync_result_builder.pagination_state.order,
|
||||
limit=sync_result_builder.pagination_state.limit,
|
||||
tags=sync_result_builder.pagination_state.tags,
|
||||
)
|
||||
old_pagination_value = sync_result_builder.pagination_state.value
|
||||
include_all_tags = pagination_config.tags == SYNC_PAGINATION_TAGS_INCLUDE_ALL
|
||||
else:
|
||||
pagination_config = None
|
||||
old_pagination_value = 0
|
||||
include_all_tags = False
|
||||
|
||||
if sync_result_builder.pagination_state:
|
||||
missing_state = yield self._get_rooms_that_need_full_state(
|
||||
room_ids=[r.room_id for r in room_entries],
|
||||
sync_config=sync_config,
|
||||
since_token=sync_result_builder.since_token,
|
||||
pagination_state=sync_result_builder.pagination_state,
|
||||
)
|
||||
|
||||
all_tags = yield self.store.get_tags_for_user(user_id)
|
||||
|
||||
if sync_result_builder.since_token:
|
||||
stream_id = sync_result_builder.since_token.account_data_key
|
||||
now_stream_id = sync_result_builder.now_token.account_data_key
|
||||
tag_changes = yield self.store.get_room_tags_changed(
|
||||
user_id, stream_id, now_stream_id
|
||||
)
|
||||
else:
|
||||
tag_changes = {}
|
||||
|
||||
if missing_state:
|
||||
for r in room_entries:
|
||||
if r.room_id in missing_state:
|
||||
if include_all_tags:
|
||||
# If we're always including tagged rooms, then only
|
||||
# resync rooms which are newly tagged.
|
||||
change = tag_changes.get(r.room_id)
|
||||
if change == TAG_CHANGE_NEWLY_TAGGED:
|
||||
r.always_include = True
|
||||
r.would_require_resync = True
|
||||
r.synced = True
|
||||
continue
|
||||
elif change == TAG_CHANGE_ALL_REMOVED:
|
||||
r.always_include = True
|
||||
r.synced = False
|
||||
continue
|
||||
elif r.room_id in all_tags:
|
||||
r.always_include = True
|
||||
continue
|
||||
|
||||
if r.room_id in extras.peek:
|
||||
since = extras.peek[r.room_id].get("since", None)
|
||||
if since:
|
||||
tok = SyncNextBatchToken.from_string(since)
|
||||
r.since_token = tok.stream_token
|
||||
else:
|
||||
r.always_include = True
|
||||
r.would_require_resync = True
|
||||
r.synced = False
|
||||
else:
|
||||
r.would_require_resync = True
|
||||
|
||||
elif pagination_config and include_all_tags:
|
||||
all_tags = yield self.store.get_tags_for_user(user_id)
|
||||
|
||||
for r in room_entries:
|
||||
if r.room_id in all_tags:
|
||||
r.always_include = True
|
||||
|
||||
for room_id in set(extras.peek.keys()) - {r.room_id for r in room_entries}:
|
||||
sync_result_builder.errors.append(ErrorSyncResult(
|
||||
room_id=room_id,
|
||||
errcode=Codes.CANNOT_PEEK,
|
||||
error="Cannot peek into requested room",
|
||||
))
|
||||
|
||||
if pagination_config:
|
||||
room_ids = [r.room_id for r in room_entries]
|
||||
pagination_limit = pagination_config.limit
|
||||
|
||||
extra_limit = extras.paginate.get("limit", 0)
|
||||
|
||||
room_map = yield self._get_room_timestamps_at_token(
|
||||
room_ids, sync_result_builder.now_token, sync_config,
|
||||
pagination_limit + extra_limit + 1,
|
||||
)
|
||||
|
||||
limited = False
|
||||
if room_map:
|
||||
sorted_list = sorted(
|
||||
room_map.items(),
|
||||
key=lambda item: -item[1]
|
||||
)
|
||||
|
||||
cutoff_list = sorted_list[:pagination_limit + extra_limit]
|
||||
|
||||
if cutoff_list[pagination_limit:]:
|
||||
new_room_ids = set(r[0] for r in cutoff_list[pagination_limit:])
|
||||
for r in room_entries:
|
||||
if r.room_id in new_room_ids:
|
||||
r.always_include = True
|
||||
r.would_require_resync = True
|
||||
|
||||
_, bottom_ts = cutoff_list[-1]
|
||||
new_pagination_value = bottom_ts
|
||||
|
||||
# We're limited if there are any rooms that are after cutoff
|
||||
# in the list, but still have an origin server ts from after
|
||||
# the pagination value from the since token.
|
||||
limited = any(
|
||||
old_pagination_value < r[1]
|
||||
for r in sorted_list[pagination_limit + extra_limit:]
|
||||
)
|
||||
|
||||
sync_result_builder.pagination_state = SyncPaginationState(
|
||||
order=pagination_config.order, value=new_pagination_value,
|
||||
limit=pagination_limit + extra_limit,
|
||||
tags=pagination_config.tags,
|
||||
)
|
||||
|
||||
to_sync_map = dict(cutoff_list)
|
||||
else:
|
||||
to_sync_map = {}
|
||||
|
||||
sync_result_builder.pagination_info["limited"] = limited
|
||||
|
||||
if len(room_map) == len(room_entries):
|
||||
sync_result_builder.pagination_state = None
|
||||
|
||||
room_entries[:] = [
|
||||
r for r in room_entries
|
||||
if r.room_id in to_sync_map or r.always_include
|
||||
]
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_rooms_changed(self, sync_result_builder, ignored_users):
|
||||
"""Gets the the changes that have happened since the last sync.
|
||||
@@ -809,7 +1071,6 @@ class SyncHandler(object):
|
||||
rtype="archived",
|
||||
events=None,
|
||||
newly_joined=room_id in newly_joined_rooms,
|
||||
full_state=False,
|
||||
since_token=since_token,
|
||||
upto_token=leave_token,
|
||||
))
|
||||
@@ -839,7 +1100,6 @@ class SyncHandler(object):
|
||||
rtype="joined",
|
||||
events=events,
|
||||
newly_joined=room_id in newly_joined_rooms,
|
||||
full_state=False,
|
||||
since_token=None if room_id in newly_joined_rooms else since_token,
|
||||
upto_token=prev_batch_token,
|
||||
))
|
||||
@@ -849,7 +1109,6 @@ class SyncHandler(object):
|
||||
rtype="joined",
|
||||
events=[],
|
||||
newly_joined=room_id in newly_joined_rooms,
|
||||
full_state=False,
|
||||
since_token=since_token,
|
||||
upto_token=since_token,
|
||||
))
|
||||
@@ -893,7 +1152,6 @@ class SyncHandler(object):
|
||||
rtype="joined",
|
||||
events=None,
|
||||
newly_joined=False,
|
||||
full_state=True,
|
||||
since_token=since_token,
|
||||
upto_token=now_token,
|
||||
))
|
||||
@@ -920,7 +1178,6 @@ class SyncHandler(object):
|
||||
rtype="archived",
|
||||
events=None,
|
||||
newly_joined=False,
|
||||
full_state=True,
|
||||
since_token=since_token,
|
||||
upto_token=leave_token,
|
||||
))
|
||||
@@ -929,8 +1186,7 @@ class SyncHandler(object):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _generate_room_entry(self, sync_result_builder, ignored_users,
|
||||
room_builder, ephemeral, tags, account_data,
|
||||
always_include=False):
|
||||
room_builder, ephemeral, tags, account_data):
|
||||
"""Populates the `joined` and `archived` section of `sync_result_builder`
|
||||
based on the `room_builder`.
|
||||
|
||||
@@ -946,19 +1202,23 @@ class SyncHandler(object):
|
||||
even if empty.
|
||||
"""
|
||||
newly_joined = room_builder.newly_joined
|
||||
full_state = (
|
||||
room_builder.full_state
|
||||
or newly_joined
|
||||
always_include = (
|
||||
newly_joined
|
||||
or sync_result_builder.full_state
|
||||
or room_builder.always_include
|
||||
)
|
||||
full_state = (
|
||||
newly_joined
|
||||
or sync_result_builder.full_state
|
||||
or room_builder.would_require_resync
|
||||
)
|
||||
events = room_builder.events
|
||||
|
||||
# We want to shortcut out as early as possible.
|
||||
if not (always_include or account_data or ephemeral or full_state):
|
||||
if not (always_include or account_data or ephemeral):
|
||||
if events == [] and tags is None:
|
||||
return
|
||||
|
||||
since_token = sync_result_builder.since_token
|
||||
now_token = sync_result_builder.now_token
|
||||
sync_config = sync_result_builder.sync_config
|
||||
|
||||
@@ -993,9 +1253,20 @@ class SyncHandler(object):
|
||||
|
||||
ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
|
||||
|
||||
if not (always_include or batch or account_data or ephemeral or full_state):
|
||||
if not (always_include or batch or account_data or ephemeral):
|
||||
return
|
||||
|
||||
# At this point we're guarenteed (?) to send down the room, so if we
|
||||
# need to resync the entire room do so now.
|
||||
if room_builder.would_require_resync:
|
||||
batch = yield self._load_filtered_recents(
|
||||
room_id, sync_config,
|
||||
now_token=upto_token,
|
||||
since_token=None,
|
||||
recents=None,
|
||||
newly_joined_room=newly_joined,
|
||||
)
|
||||
|
||||
state = yield self.compute_state_delta(
|
||||
room_id, batch, sync_config, since_token, now_token,
|
||||
full_state=full_state
|
||||
@@ -1010,6 +1281,7 @@ class SyncHandler(object):
|
||||
ephemeral=ephemeral,
|
||||
account_data=account_data_events,
|
||||
unread_notifications=unread_notifications,
|
||||
synced=room_builder.synced,
|
||||
)
|
||||
|
||||
if room_sync or always_include:
|
||||
@@ -1034,6 +1306,90 @@ class SyncHandler(object):
|
||||
else:
|
||||
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_room_timestamps_at_token(self, room_ids, token, sync_config, limit):
|
||||
"""For each room, get the last origin_server_ts timestamp the client
|
||||
would see (after filtering) at a particular token.
|
||||
|
||||
Only attempts finds the latest `limit` room timestamps.
|
||||
"""
|
||||
room_to_entries = {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_last_ts(room_id):
|
||||
entry = yield self.store.get_last_event_id_ts_for_room(
|
||||
room_id, token.room_key
|
||||
)
|
||||
|
||||
# TODO: Is this ever possible?
|
||||
room_to_entries[room_id] = entry if entry else {
|
||||
"origin_server_ts": 0,
|
||||
}
|
||||
|
||||
yield concurrently_execute(_get_last_ts, room_ids, 10)
|
||||
|
||||
if len(room_to_entries) <= limit:
|
||||
defer.returnValue({
|
||||
room_id: entry["origin_server_ts"]
|
||||
for room_id, entry in room_to_entries.items()
|
||||
})
|
||||
|
||||
queued_events = sorted(
|
||||
room_to_entries.items(),
|
||||
key=lambda e: -e[1]["origin_server_ts"]
|
||||
)
|
||||
|
||||
to_return = {}
|
||||
|
||||
while len(to_return) < limit and len(queued_events) > 0:
|
||||
to_fetch = queued_events[:limit - len(to_return)]
|
||||
event_to_q = {
|
||||
e["event_id"]: (room_id, e) for room_id, e in to_fetch
|
||||
if "event_id" in e
|
||||
}
|
||||
|
||||
# Now we fetch each event to check if its been filtered out
|
||||
event_map = yield self.store.get_events(event_to_q.keys())
|
||||
|
||||
recents = sync_config.filter_collection.filter_room_timeline(
|
||||
event_map.values()
|
||||
)
|
||||
recents = yield filter_events_for_client(
|
||||
self.store,
|
||||
sync_config.user.to_string(),
|
||||
recents,
|
||||
)
|
||||
|
||||
to_return.update({r.room_id: r.origin_server_ts for r in recents})
|
||||
|
||||
for ev_id in set(event_map.keys()) - set(r.event_id for r in recents):
|
||||
queued_events.append(event_to_q[ev_id])
|
||||
|
||||
# FIXME: Need to refetch TS
|
||||
queued_events.sort(key=lambda e: -e[1]["origin_server_ts"])
|
||||
|
||||
defer.returnValue(to_return)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_rooms_that_need_full_state(self, room_ids, sync_config, since_token,
|
||||
pagination_state):
|
||||
"""Work out which rooms we haven't sent to the client yet, so would
|
||||
require us to send down the full state
|
||||
"""
|
||||
start_ts = yield self._get_room_timestamps_at_token(
|
||||
room_ids, since_token,
|
||||
sync_config=sync_config,
|
||||
limit=len(room_ids),
|
||||
)
|
||||
|
||||
missing_list = frozenset(
|
||||
room_id for room_id, ts in
|
||||
sorted(start_ts.items(), key=lambda item: -item[1])
|
||||
if ts < pagination_state.value
|
||||
)
|
||||
|
||||
defer.returnValue(missing_list)
|
||||
|
||||
|
||||
def _action_has_highlight(actions):
|
||||
for action in actions:
|
||||
@@ -1085,31 +1441,53 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
|
||||
|
||||
class SyncResultBuilder(object):
|
||||
"Used to help build up a new SyncResult for a user"
|
||||
def __init__(self, sync_config, full_state, since_token, now_token):
|
||||
|
||||
__slots__ = (
|
||||
"sync_config", "full_state", "batch_token", "since_token", "pagination_state",
|
||||
"now_token", "presence", "account_data", "joined", "invited", "archived",
|
||||
"pagination_info", "errors", "all_joined_rooms", "unread_notifications",
|
||||
)
|
||||
|
||||
def __init__(self, sync_config, full_state, batch_token, now_token,
|
||||
all_joined_rooms):
|
||||
"""
|
||||
Args:
|
||||
sync_config(SyncConfig)
|
||||
full_state(bool): The full_state flag as specified by user
|
||||
since_token(StreamToken): The token supplied by user, or None.
|
||||
batch_token(SyncNextBatchToken): The token supplied by user, or None.
|
||||
now_token(StreamToken): The token to sync up to.
|
||||
all_joined_rooms(list(str)): List of all joined room ids.
|
||||
"""
|
||||
self.sync_config = sync_config
|
||||
self.full_state = full_state
|
||||
self.since_token = since_token
|
||||
self.batch_token = batch_token
|
||||
self.since_token = batch_token.stream_token if batch_token else None
|
||||
self.pagination_state = batch_token.pagination_state if batch_token else None
|
||||
self.now_token = now_token
|
||||
self.all_joined_rooms = all_joined_rooms
|
||||
|
||||
self.presence = []
|
||||
self.account_data = []
|
||||
self.joined = []
|
||||
self.invited = []
|
||||
self.archived = []
|
||||
self.errors = []
|
||||
|
||||
self.pagination_info = {}
|
||||
self.unread_notifications = {}
|
||||
|
||||
|
||||
class RoomSyncResultBuilder(object):
|
||||
"""Stores information needed to create either a `JoinedSyncResult` or
|
||||
`ArchivedSyncResult`.
|
||||
"""
|
||||
def __init__(self, room_id, rtype, events, newly_joined, full_state,
|
||||
|
||||
__slots__ = (
|
||||
"room_id", "rtype", "events", "newly_joined", "since_token",
|
||||
"upto_token", "always_include", "would_require_resync", "synced",
|
||||
)
|
||||
|
||||
def __init__(self, room_id, rtype, events, newly_joined,
|
||||
since_token, upto_token):
|
||||
"""
|
||||
Args:
|
||||
@@ -1118,7 +1496,6 @@ class RoomSyncResultBuilder(object):
|
||||
events(list): List of events to include in the room, (more events
|
||||
may be added when generating result).
|
||||
newly_joined(bool): If the user has newly joined the room
|
||||
full_state(bool): Whether the full state should be sent in result
|
||||
since_token(StreamToken): Earliest point to return events from, or None
|
||||
upto_token(StreamToken): Latest point to return events from.
|
||||
"""
|
||||
@@ -1126,6 +1503,12 @@ class RoomSyncResultBuilder(object):
|
||||
self.rtype = rtype
|
||||
self.events = events
|
||||
self.newly_joined = newly_joined
|
||||
self.full_state = full_state
|
||||
self.since_token = since_token
|
||||
self.upto_token = upto_token
|
||||
|
||||
# Should this room always be included in the sync?
|
||||
self.always_include = False
|
||||
# If we send down this room, should we send down the full state?
|
||||
self.would_require_resync = False
|
||||
# Should the client consider this room "synced"?
|
||||
self.synced = True
|
||||
|
||||
@@ -36,6 +36,7 @@ REQUIREMENTS = {
|
||||
"blist": ["blist"],
|
||||
"pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"],
|
||||
"pymacaroons-pynacl": ["pymacaroons"],
|
||||
"cbor2": ["cbor2"],
|
||||
}
|
||||
CONDITIONAL_REQUIREMENTS = {
|
||||
"web_client": {
|
||||
|
||||
@@ -51,6 +51,9 @@ class SlavedAccountDataStore(BaseSlavedStore):
|
||||
get_updated_account_data_for_user = (
|
||||
DataStore.get_updated_account_data_for_user.__func__
|
||||
)
|
||||
get_room_tags_changed = (
|
||||
DataStore.get_room_tags_changed.__func__
|
||||
)
|
||||
|
||||
def get_max_account_data_stream_id(self):
|
||||
return self._account_data_id_gen.get_current_token()
|
||||
|
||||
@@ -144,6 +144,8 @@ class SlavedEventStore(BaseSlavedStore):
|
||||
_get_events_around_txn = DataStore._get_events_around_txn.__func__
|
||||
_get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__
|
||||
|
||||
get_last_event_id_ts_for_room = DataStore.get_last_event_id_ts_for_room.__func__
|
||||
|
||||
def stream_positions(self):
|
||||
result = super(SlavedEventStore, self).stream_positions()
|
||||
result["events"] = self._stream_id_gen.get_current_token()
|
||||
|
||||
@@ -16,10 +16,14 @@
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.http.servlet import (
|
||||
RestServlet, parse_string, parse_integer, parse_boolean
|
||||
RestServlet, parse_string, parse_integer, parse_boolean,
|
||||
parse_json_object_from_request,
|
||||
)
|
||||
from synapse.handlers.sync import SyncConfig
|
||||
from synapse.types import StreamToken
|
||||
from synapse.handlers.sync import (
|
||||
SyncConfig, SyncPaginationConfig, SYNC_PAGINATION_TAGS_IGNORE, SyncExtras,
|
||||
DEFAULT_SYNC_EXTRAS,
|
||||
)
|
||||
from synapse.types import SyncNextBatchToken
|
||||
from synapse.events.utils import (
|
||||
serialize_event, format_event_for_client_v2_without_room_id,
|
||||
)
|
||||
@@ -84,6 +88,94 @@ class SyncRestServlet(RestServlet):
|
||||
self.filtering = hs.get_filtering()
|
||||
self.presence_handler = hs.get_presence_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, request):
|
||||
requester = yield self.auth.get_user_by_req(
|
||||
request, allow_guest=True
|
||||
)
|
||||
user = requester.user
|
||||
|
||||
body = parse_json_object_from_request(request)
|
||||
|
||||
timeout = body.get("timeout", 0)
|
||||
since = body.get("since", None)
|
||||
|
||||
extras = body.get("extras", {})
|
||||
extras = SyncExtras(
|
||||
paginate=extras.get("paginate", {}),
|
||||
peek=extras.get("peek", {}),
|
||||
)
|
||||
|
||||
if "from" in body:
|
||||
# /events used to use 'from', but /sync uses 'since'.
|
||||
# Lets be helpful and whine if we see a 'from'.
|
||||
raise SynapseError(
|
||||
400, "'from' is not a valid parameter. Did you mean 'since'?"
|
||||
)
|
||||
|
||||
set_presence = body.get("set_presence", "online")
|
||||
if set_presence not in self.ALLOWED_PRESENCE:
|
||||
message = "Parameter 'set_presence' must be one of [%s]" % (
|
||||
", ".join(repr(v) for v in self.ALLOWED_PRESENCE)
|
||||
)
|
||||
raise SynapseError(400, message)
|
||||
|
||||
full_state = body.get("full_state", False)
|
||||
|
||||
filter_id = body.get("filter_id", None)
|
||||
filter_dict = body.get("filter", None)
|
||||
pagination_config = body.get("pagination_config", None)
|
||||
|
||||
if filter_dict is not None and filter_id is not None:
|
||||
raise SynapseError(
|
||||
400,
|
||||
"Can only specify one of `filter` and `filter_id` paramters"
|
||||
)
|
||||
|
||||
if filter_id:
|
||||
filter_collection = yield self.filtering.get_user_filter(
|
||||
user.localpart, filter_id
|
||||
)
|
||||
filter_key = filter_id
|
||||
elif filter_dict:
|
||||
self.filtering.check_valid_filter(filter_dict)
|
||||
filter_collection = FilterCollection(filter_dict)
|
||||
filter_key = json.dumps(filter_dict)
|
||||
else:
|
||||
filter_collection = DEFAULT_FILTER_COLLECTION
|
||||
filter_key = None
|
||||
|
||||
request_key = (user, timeout, since, filter_key, full_state)
|
||||
|
||||
sync_config = SyncConfig(
|
||||
user=user,
|
||||
filter_collection=filter_collection,
|
||||
is_guest=requester.is_guest,
|
||||
request_key=request_key,
|
||||
pagination_config=SyncPaginationConfig(
|
||||
order=pagination_config["order"],
|
||||
limit=pagination_config["limit"],
|
||||
tags=pagination_config.get("tags", SYNC_PAGINATION_TAGS_IGNORE),
|
||||
) if pagination_config else None,
|
||||
)
|
||||
|
||||
if since is not None:
|
||||
batch_token = SyncNextBatchToken.from_string(since)
|
||||
else:
|
||||
batch_token = None
|
||||
|
||||
sync_result = yield self._handle_sync(
|
||||
requester=requester,
|
||||
sync_config=sync_config,
|
||||
batch_token=batch_token,
|
||||
set_presence=set_presence,
|
||||
full_state=full_state,
|
||||
timeout=timeout,
|
||||
extras=extras,
|
||||
)
|
||||
|
||||
defer.returnValue(sync_result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request):
|
||||
if "from" in request.args:
|
||||
@@ -107,13 +199,6 @@ class SyncRestServlet(RestServlet):
|
||||
filter_id = parse_string(request, "filter", default=None)
|
||||
full_state = parse_boolean(request, "full_state", default=False)
|
||||
|
||||
logger.info(
|
||||
"/sync: user=%r, timeout=%r, since=%r,"
|
||||
" set_presence=%r, filter_id=%r" % (
|
||||
user, timeout, since, set_presence, filter_id
|
||||
)
|
||||
)
|
||||
|
||||
request_key = (user, timeout, since, filter_id, full_state)
|
||||
|
||||
if filter_id:
|
||||
@@ -136,15 +221,39 @@ class SyncRestServlet(RestServlet):
|
||||
filter_collection=filter,
|
||||
is_guest=requester.is_guest,
|
||||
request_key=request_key,
|
||||
pagination_config=None,
|
||||
)
|
||||
|
||||
if since is not None:
|
||||
since_token = StreamToken.from_string(since)
|
||||
batch_token = SyncNextBatchToken.from_string(since)
|
||||
else:
|
||||
since_token = None
|
||||
batch_token = None
|
||||
|
||||
sync_result = yield self._handle_sync(
|
||||
requester=requester,
|
||||
sync_config=sync_config,
|
||||
batch_token=batch_token,
|
||||
set_presence=set_presence,
|
||||
full_state=full_state,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
defer.returnValue(sync_result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _handle_sync(self, requester, sync_config, batch_token, set_presence,
|
||||
full_state, timeout, extras=DEFAULT_SYNC_EXTRAS):
|
||||
affect_presence = set_presence != PresenceState.OFFLINE
|
||||
|
||||
user = sync_config.user
|
||||
|
||||
logger.info(
|
||||
"/sync: user=%r, timeout=%r, since=%r,"
|
||||
" set_presence=%r" % (
|
||||
user, timeout, batch_token, set_presence
|
||||
)
|
||||
)
|
||||
|
||||
if affect_presence:
|
||||
yield self.presence_handler.set_state(user, {"presence": set_presence})
|
||||
|
||||
@@ -153,8 +262,8 @@ class SyncRestServlet(RestServlet):
|
||||
)
|
||||
with context:
|
||||
sync_result = yield self.sync_handler.wait_for_sync_for_user(
|
||||
sync_config, since_token=since_token, timeout=timeout,
|
||||
full_state=full_state
|
||||
sync_config, batch_token=batch_token, timeout=timeout,
|
||||
full_state=full_state, extras=extras,
|
||||
)
|
||||
|
||||
time_now = self.clock.time_msec()
|
||||
@@ -182,8 +291,15 @@ class SyncRestServlet(RestServlet):
|
||||
"leave": archived,
|
||||
},
|
||||
"next_batch": sync_result.next_batch.to_string(),
|
||||
"unread_notifications": sync_result.unread_notifications,
|
||||
}
|
||||
|
||||
if sync_result.errors:
|
||||
response_content["rooms"]["errors"] = self.encode_errors(sync_result.errors)
|
||||
|
||||
if sync_result.pagination_info:
|
||||
response_content["pagination_info"] = sync_result.pagination_info
|
||||
|
||||
defer.returnValue((200, response_content))
|
||||
|
||||
def encode_presence(self, events, time_now):
|
||||
@@ -194,6 +310,15 @@ class SyncRestServlet(RestServlet):
|
||||
formatted.append(event)
|
||||
return {"events": formatted}
|
||||
|
||||
def encode_errors(self, errors):
|
||||
return {
|
||||
e.room_id: {
|
||||
"errcode": e.errcode,
|
||||
"error": e.error
|
||||
}
|
||||
for e in errors
|
||||
}
|
||||
|
||||
def encode_joined(self, rooms, time_now, token_id):
|
||||
"""
|
||||
Encode the joined rooms in a sync result
|
||||
@@ -215,6 +340,7 @@ class SyncRestServlet(RestServlet):
|
||||
joined[room.room_id] = self.encode_room(
|
||||
room, time_now, token_id
|
||||
)
|
||||
joined[room.room_id]["synced"] = room.synced
|
||||
|
||||
return joined
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
# Remember to update this number every time a change is made to database
|
||||
# schema files, so the users will be informed on server restarts.
|
||||
SCHEMA_VERSION = 32
|
||||
SCHEMA_VERSION = 33
|
||||
|
||||
dir_path = os.path.abspath(os.path.dirname(__file__))
|
||||
|
||||
|
||||
24
synapse/storage/schema/delta/33/tag_changes.sql
Normal file
24
synapse/storage/schema/delta/33/tag_changes.sql
Normal file
@@ -0,0 +1,24 @@
|
||||
/* Copyright 2016 OpenMarket Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
CREATE TABLE room_tags_change_revisions(
|
||||
user_id TEXT NOT NULL,
|
||||
room_id TEXT NOT NULL,
|
||||
stream_id BIGINT NOT NULL,
|
||||
change TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX room_tags_change_revisions_rm_idx ON room_tags_change_revisions(user_id, room_id, stream_id);
|
||||
CREATE INDEX room_tags_change_revisions_idx ON room_tags_change_revisions(user_id, stream_id);
|
||||
@@ -525,6 +525,36 @@ class StreamStore(SQLBaseStore):
|
||||
int(stream),
|
||||
)
|
||||
|
||||
def get_last_event_id_ts_for_room(self, room_id, token):
|
||||
"""Get the latest event_id and origin_server_ts for a room_id before a
|
||||
given token.
|
||||
|
||||
Args:
|
||||
room_id (str)
|
||||
token (str)
|
||||
|
||||
Returns:
|
||||
Dictionary with ``event_id`` and ``origin_server_ts`` keys.
|
||||
"""
|
||||
stream_ordering = RoomStreamToken.parse_stream_token(token).stream
|
||||
|
||||
sql = (
|
||||
"SELECT event_id, origin_server_ts FROM events"
|
||||
" WHERE room_id = ? AND stream_ordering <= ?"
|
||||
" ORDER BY topological_ordering DESC, stream_ordering DESC"
|
||||
" LIMIT 1"
|
||||
)
|
||||
|
||||
def f(txn):
|
||||
txn.execute(sql, (room_id, stream_ordering))
|
||||
rows = self.cursor_to_dict(txn)
|
||||
if rows:
|
||||
return rows[0]
|
||||
else:
|
||||
return None
|
||||
|
||||
return self.runInteraction("get_last_event_id_ts_for_room", f)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_events_around(self, room_id, event_id, before_limit, after_limit):
|
||||
"""Retrieve events and pagination tokens around a given event in a
|
||||
|
||||
@@ -17,12 +17,18 @@ from ._base import SQLBaseStore
|
||||
from synapse.util.caches.descriptors import cached
|
||||
from twisted.internet import defer
|
||||
|
||||
from collections import Counter
|
||||
|
||||
import ujson as json
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
TAG_CHANGE_NEWLY_TAGGED = "newly_tagged"
|
||||
TAG_CHANGE_ALL_REMOVED = "all_removed"
|
||||
|
||||
|
||||
class TagsStore(SQLBaseStore):
|
||||
def get_max_account_data_stream_id(self):
|
||||
"""Get the current max stream id for the private user data stream
|
||||
@@ -170,6 +176,45 @@ class TagsStore(SQLBaseStore):
|
||||
row["tag"]: json.loads(row["content"]) for row in rows
|
||||
})
|
||||
|
||||
def get_room_tags_changed(self, user_id, stream_id, now_id):
|
||||
"""Returns the rooms that have been newly tagged or had all their tags
|
||||
removed since `stream_id`.
|
||||
|
||||
Collapses multiple changes into one. For example, if a room has gone
|
||||
from untagged to tagged back to untagged, the room_id won't be returned.
|
||||
"""
|
||||
changed = self._account_data_stream_cache.has_entity_changed(
|
||||
user_id, int(stream_id)
|
||||
)
|
||||
|
||||
if not changed:
|
||||
return {}
|
||||
|
||||
def _get_room_tags_changed(txn):
|
||||
txn.execute(
|
||||
"SELECT room_id, change FROM room_tags_change_revisions"
|
||||
" WHERE user_id = ? AND stream_id > ? AND stream_id <= ?",
|
||||
(user_id, stream_id, now_id)
|
||||
)
|
||||
|
||||
results = Counter()
|
||||
|
||||
for room_id, change in txn.fetchall():
|
||||
if change == TAG_CHANGE_NEWLY_TAGGED:
|
||||
results[room_id] += 1
|
||||
elif change == TAG_CHANGE_ALL_REMOVED:
|
||||
results[room_id] -= 1
|
||||
else:
|
||||
logger.warn("Unexpected tag change: %r", change)
|
||||
|
||||
return {
|
||||
room_id: TAG_CHANGE_NEWLY_TAGGED if count > 0 else TAG_CHANGE_ALL_REMOVED
|
||||
for room_id, count in results.items()
|
||||
if count
|
||||
}
|
||||
|
||||
return self.runInteraction("get_room_tags_changed", _get_room_tags_changed)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def add_tag_to_room(self, user_id, room_id, tag, content):
|
||||
"""Add a tag to a room for a user.
|
||||
@@ -184,6 +229,12 @@ class TagsStore(SQLBaseStore):
|
||||
content_json = json.dumps(content)
|
||||
|
||||
def add_tag_txn(txn, next_id):
|
||||
txn.execute(
|
||||
"SELECT count(*) FROM room_tags WHERE user_id = ? AND room_id = ?",
|
||||
(user_id, room_id),
|
||||
)
|
||||
existing_tags, = txn.fetchone()
|
||||
|
||||
self._simple_upsert_txn(
|
||||
txn,
|
||||
table="room_tags",
|
||||
@@ -197,6 +248,17 @@ class TagsStore(SQLBaseStore):
|
||||
}
|
||||
)
|
||||
self._update_revision_txn(txn, user_id, room_id, next_id)
|
||||
if not existing_tags:
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
table="room_tags_change_revisions",
|
||||
values={
|
||||
"user_id": user_id,
|
||||
"room_id": room_id,
|
||||
"stream_id": next_id,
|
||||
"change": TAG_CHANGE_NEWLY_TAGGED,
|
||||
}
|
||||
)
|
||||
|
||||
with self._account_data_id_gen.get_next() as next_id:
|
||||
yield self.runInteraction("add_tag", add_tag_txn, next_id)
|
||||
@@ -218,6 +280,24 @@ class TagsStore(SQLBaseStore):
|
||||
" WHERE user_id = ? AND room_id = ? AND tag = ?"
|
||||
)
|
||||
txn.execute(sql, (user_id, room_id, tag))
|
||||
if txn.rowcount > 0:
|
||||
txn.execute(
|
||||
"SELECT count(*) FROM room_tags WHERE user_id = ? AND room_id = ?",
|
||||
(user_id, room_id),
|
||||
)
|
||||
existing_tags, = txn.fetchone()
|
||||
if not existing_tags:
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
table="room_tags_change_revisions",
|
||||
values={
|
||||
"user_id": user_id,
|
||||
"room_id": room_id,
|
||||
"stream_id": next_id,
|
||||
"change": TAG_CHANGE_ALL_REMOVED,
|
||||
}
|
||||
)
|
||||
|
||||
self._update_revision_txn(txn, user_id, room_id, next_id)
|
||||
|
||||
with self._account_data_id_gen.get_next() as next_id:
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.types import StreamToken
|
||||
from synapse.types import StreamToken, SyncNextBatchToken
|
||||
|
||||
import logging
|
||||
|
||||
@@ -72,14 +72,18 @@ class PaginationConfig(object):
|
||||
if direction not in ['f', 'b']:
|
||||
raise SynapseError(400, "'dir' parameter is invalid.")
|
||||
|
||||
from_tok = get_param("from")
|
||||
raw_from_tok = get_param("from")
|
||||
to_tok = get_param("to")
|
||||
|
||||
try:
|
||||
if from_tok == "END":
|
||||
from_tok = None
|
||||
if raw_from_tok == "END":
|
||||
from_tok = None # For backwards compat.
|
||||
elif from_tok:
|
||||
from_tok = StreamToken.from_string(from_tok)
|
||||
elif raw_from_tok:
|
||||
try:
|
||||
from_tok = SyncNextBatchToken.from_string(raw_from_tok).stream_token
|
||||
except:
|
||||
from_tok = StreamToken.from_string(raw_from_tok)
|
||||
except:
|
||||
raise SynapseError(400, "'from' paramater is invalid")
|
||||
|
||||
|
||||
@@ -17,6 +17,9 @@ from synapse.api.errors import SynapseError
|
||||
|
||||
from collections import namedtuple
|
||||
|
||||
from unpaddedbase64 import encode_base64, decode_base64
|
||||
import cbor2 as serializer
|
||||
|
||||
|
||||
Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"])
|
||||
|
||||
@@ -115,8 +118,71 @@ class EventID(DomainSpecificString):
|
||||
SIGIL = "$"
|
||||
|
||||
|
||||
class SyncNextBatchToken(
|
||||
namedtuple("SyncNextBatchToken", (
|
||||
"stream_token",
|
||||
"pagination_state",
|
||||
))
|
||||
):
|
||||
@classmethod
|
||||
def from_string(cls, string):
|
||||
try:
|
||||
d = serializer.loads(decode_base64(string))
|
||||
pa = d.get("pa", None)
|
||||
if pa:
|
||||
pa = SyncPaginationState.from_dict(pa)
|
||||
return cls(
|
||||
stream_token=StreamToken.from_arr(d["t"]),
|
||||
pagination_state=pa,
|
||||
)
|
||||
except:
|
||||
raise SynapseError(400, "Invalid Token")
|
||||
|
||||
def to_string(self):
|
||||
return encode_base64(serializer.dumps({
|
||||
"t": self.stream_token.to_arr(),
|
||||
"pa": self.pagination_state.to_dict() if self.pagination_state else None,
|
||||
}))
|
||||
|
||||
def replace(self, **kwargs):
|
||||
return self._replace(**kwargs)
|
||||
|
||||
|
||||
_ORDER_ENCODE = {"m.origin_server_ts": "o"}
|
||||
_ORDER_DECODE = {v: k for k, v in _ORDER_ENCODE.items()}
|
||||
_TAG_ENCODE = {"m.include_all": "i", "m.ignore": "x"}
|
||||
_TAG_DECODE = {v: k for k, v in _TAG_ENCODE.items()}
|
||||
|
||||
|
||||
class SyncPaginationState(
|
||||
namedtuple("SyncPaginationState", (
|
||||
"order",
|
||||
"value",
|
||||
"limit",
|
||||
"tags",
|
||||
))
|
||||
):
|
||||
@classmethod
|
||||
def from_dict(cls, d):
|
||||
try:
|
||||
return cls(_ORDER_DECODE[d["o"]], d["v"], d["l"], _TAG_DECODE[d["t"]])
|
||||
except:
|
||||
raise SynapseError(400, "Invalid Token")
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"o": _ORDER_ENCODE[self.order],
|
||||
"v": self.value,
|
||||
"l": self.limit,
|
||||
"t": _TAG_ENCODE[self.tags],
|
||||
}
|
||||
|
||||
def replace(self, **kwargs):
|
||||
return self._replace(**kwargs)
|
||||
|
||||
|
||||
class StreamToken(
|
||||
namedtuple("Token", (
|
||||
namedtuple("StreamToken", (
|
||||
"room_key",
|
||||
"presence_key",
|
||||
"typing_key",
|
||||
@@ -141,6 +207,20 @@ class StreamToken(
|
||||
def to_string(self):
|
||||
return self._SEPARATOR.join([str(k) for k in self])
|
||||
|
||||
@classmethod
|
||||
def from_arr(cls, arr):
|
||||
try:
|
||||
keys = arr
|
||||
while len(keys) < len(cls._fields):
|
||||
# i.e. old token from before receipt_key
|
||||
keys.append("0")
|
||||
return cls(*keys)
|
||||
except:
|
||||
raise SynapseError(400, "Invalid Token")
|
||||
|
||||
def to_arr(self):
|
||||
return self
|
||||
|
||||
@property
|
||||
def room_stream_id(self):
|
||||
# TODO(markjh): Awful hack to work around hacks in the presence tests
|
||||
|
||||
Reference in New Issue
Block a user